In [2]:
import pandas as pd
import numpy as np
import os
import pyarrow as pa
import pyarrow.parquet as pq

### 因子构造样例

#### 预先读取merge_daily_info

In [5]:
merge_daily_info = pd.read_parquet(r'/Users/xuyanye/Desktop/quant_mm/factor/data/merge_daily_info.parquet')
merge_daily_info['TradingDate'] = pd.to_datetime(merge_daily_info['TradingDate'])
merge_daily_info = merge_daily_info.drop_duplicates(subset=['Stkcd','TradingDate'])
TRD_Dalyr = pd.read_parquet(r'/Users/xuyanye/Desktop/quant_mm/factor/data/TRD_Dalyr.parquet')

In [6]:
TRD_Dalyr.columns

Index(['Stkcd', 'TradingDate', 'Opnprc', 'Hiprc', 'Loprc', 'Clsprc',
       'Dnshrtrd', 'Dnvaltrd', 'Dsmvosd', 'Dsmvtll', 'Dretwd', 'Dretnd',
       'Adjprcwd', 'Adjprcnd', 'Markettype', 'Capchgdt', 'Trdsta',
       'Ahshrtrd_D', 'Ahvaltrd_D', 'PreClosePrice', 'ChangeRatio', 'LimitDown',
       'LimitUp', 'LimitStatus'],
      dtype='object')

In [7]:
#数据提取与合并
Dnvaltrd = TRD_Dalyr[['TradingDate','Stkcd','Dnvaltrd']]
Dnshrtrd = TRD_Dalyr[['TradingDate','Stkcd','Dnshrtrd']]
Clsprc = TRD_Dalyr[['TradingDate','Stkcd','Clsprc']]
calc_cvturn = pd.merge(Dnvaltrd,Dnshrtrd ,on=['TradingDate','Stkcd'],how='left')
calc_cvturn = pd.merge(calc_cvturn,Clsprc ,on=['TradingDate','Stkcd'],how='left')
calc_cvturn.columns

Index(['TradingDate', 'Stkcd', 'Dnvaltrd', 'Dnshrtrd', 'Clsprc'], dtype='object')

# alpha032

In [8]:
COLUMN_FACTOR_NAME = "alpha032" 
ALPHA_SAVE_PATH_DIR = "/Users/xuyanye/Desktop/quant mm/factor/alpha"
if not os.path.exists(ALPHA_SAVE_PATH_DIR):
    os.makedirs(ALPHA_SAVE_PATH_DIR)
# 定义最终因子在DataFrame中的列名，以及Parquet文件的基础名称
# 这两部分必须一致，才能被您的回测代码正确识别

In [10]:
# 假设 calc_factor_data 是原始数据
calc_factor_data = calc_cvturn.copy()

# 1. 确保数据按股票代码和交易日期排序
calc_factor_data = calc_factor_data.sort_values(by=['Stkcd', 'TradingDate']).copy()

# 2. 计算第一个主要部分：scale(((sum(close, 7) / 7) - close))
# 计算过去7日的简单移动平均 (SMA) 即 (sum(close, 7) / 7)
calc_factor_data['avg_close_7'] = calc_factor_data.groupby('Stkcd')['Clsprc'].rolling(window=7, min_periods=1).mean().reset_index(level=0, drop=True)

# 计算 (7日均线 - 当前日收盘价)
calc_factor_data['avg_minus_current_close'] = calc_factor_data['avg_close_7'] - calc_factor_data['Clsprc']

# scale操作：标准化 (Z-score scaling)，使得绝对值总和为1
def scale(x):
    total_abs = x.abs().sum()
    return x / total_abs if total_abs != 0 else 0

# 对第一部分进行scale
calc_factor_data['scaled_avg_minus_current_close'] = calc_factor_data.groupby('TradingDate')['avg_minus_current_close'].transform(scale)

# 3. 计算第二个主要部分：20 * scale(correlation(vwap, delay(close, 5), 230))
# 计算VWAP (成交量加权平均价)
calc_factor_data['vwap'] = calc_factor_data['Dnvaltrd'] / calc_factor_data['Dnshrtrd']

# 获取延迟5个交易日的收盘价
calc_factor_data['delay_close_5'] = calc_factor_data.groupby('Stkcd')['Clsprc'].shift(5)

# 计算过去230个交易日VWAP与延迟5日收盘价的相关性
calc_factor_data['corr_vwap_delay_close_230'] = calc_factor_data.groupby('Stkcd').apply(
    lambda x: x['vwap'].rolling(window=230, min_periods=1).corr(x['delay_close_5'])
).reset_index(level=0, drop=True)

# 对相关性进行scale操作
calc_factor_data['scaled_corr_vwap_delay_close_230'] = calc_factor_data.groupby('TradingDate')['corr_vwap_delay_close_230'].transform(scale)

# 4. 计算最终因子：20 * scale(correlation(vwap, delay(close, 5), 230))
calc_factor_data['weighted_scaled_corr'] = 20 * calc_factor_data['scaled_corr_vwap_delay_close_230']

# 5. 将两部分相加，得到原始因子值
calc_factor_data['raw_factor'] = calc_factor_data['scaled_avg_minus_current_close'] + calc_factor_data['weighted_scaled_corr']

# 6. 对最终的原始因子值进行整体归一化 (Z-score scaling)
calc_factor_data['alpha_new_factor'] = calc_factor_data.groupby('TradingDate')['raw_factor'].transform(
    lambda x: (x - x.mean()) / x.std() if x.std() != 0 else 0
)

# 将 inf 和 -inf 替换为 0
calc_factor_data['alpha_new_factor'].replace([float('inf'), float('-inf')], 0, inplace=True)

  calc_factor_data['corr_vwap_delay_close_230'] = calc_factor_data.groupby('Stkcd').apply(
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  calc_factor_data['alpha_new_factor'].replace([float('inf'), float('-inf')], 0, inplace=True)


In [11]:
# 假设 merge_daily_info 是另一个包含 Stkcd 和 TradingDate 的 DataFrame
# 将计算出的 Alpha08_Factor 合并到 merge_daily_info
# 注意：如果 calc_cvturn 就是最终要用的 DataFrame，可能不需要这一步合并
# 这里为了与用户提供的格式保持一致，假设有一个目标 DataFrame 'merge_daily_info'
# 如果 calc_cvturn 本身就是最终结果，则可以跳过此合并步骤
# 请根据实际情况调整 'merge_daily_info' 的定义
try:
    # 模拟 merge_daily_info 如果它不存在
    if 'merge_daily_info' not in locals():
        print("警告：'merge_daily_info' 未定义。为演示目的，将假设 merge_daily_info 等同于 calc_cvturn 的基础部分。")
        merge_daily_info = calc_cvturn[['Stkcd', 'TradingDate']].copy() # 只是一个示例

    # 进行合并
    calc_cvturn = pd.merge(merge_daily_info,
                           calc_factor_data[['Stkcd','TradingDate','alpha_new_factor']],
                           on=['Stkcd','TradingDate'],
                           how='left')
    calc_cvturn = calc_cvturn.rename(columns={'alpha_new_factor': 'alpha032'})
except NameError:
    print("错误：'merge_daily_info' 未定义，请确保该DataFrame存在后再执行合并。")


# 去掉缺失值与inf (根据用户提供的格式)
# 这一步会移除 Alpha12_Factor 列中因为 delta 计算导致的 NaN 值 (例如每个股票的第一个交易日)
calc_cvturn = calc_cvturn.dropna(subset=['alpha032']) # 只针对因子列去除NaN，避免影响其他数据
calc_cvturn = calc_cvturn[~np.isinf(calc_cvturn['alpha032'])] # 确保因子值不是无穷大

# 打印结果 DataFrame 的头部，以供检查
print(calc_cvturn.head())

    Stkcd TradingDate  alpha032
6       1  1991-04-10  1.024810
7       1  1991-04-12 -0.971897
9       1  1991-04-16  0.879270
10      1  1991-04-17 -1.366789
11      1  1991-04-18 -1.607555


In [13]:
##采用parquet优化内存
save_path = r'/Users/xuyanye/Desktop/quant_mm/factor/alpha/alpha032.parquet'
calc_cvturn.to_parquet(save_path, compression='gzip')