### <b>一. 自相关检测的重要性</b>
##### 在投资组合构建中，降低因子之间的自相关性（self-correlation）非常重要，原因可从以下几个维度来理解：
##### 1. 提升组合多样性：当因子之间高度相关时，它们往往在相似的市场条件下同时赚钱或同时亏钱，本质上只是放大了单一逻辑的风险敞口；相反，低相关性的因子在不同市场情境下可能表现出互补特性，从而在整体上平滑组合的波动，提高稳定性；
##### 2. 高相关因子一起使用时，容易导致模型捕捉到数据中的噪声结构而非真正的市场信号；
##### 3. 若该因子与已提交因子高度相关，它对整体表现的边际增益将很小，甚至可能降低表现，因为我们每个评价周期可以提交的因子是有限的，所以应该尽可能提升每个Alpha的边际增益。
##### 注：BRAIN平台的自相关性计算以半年为周期滚动。

### <b>二. 工具简介</b>
##### 使用平台计算资源检验自相关性有着速度慢，被限流检验失败，挤占ProdCorr计算资源等问题。同时，计算自相关性的所有数据均可以获取到本地，并且Pandas等本地工具已内建成熟高效的自相关性计算函数，支持灵活地按日期滑窗、按分组股票池等维度进行分析。因此，在本地环境下进行自相关性检验，不仅可以规避平台资源限制带来的不确定性，也具备更高的效率、灵活性与性价比，是值得优先采用的方式。
#### <b>主要应用</b>
##### 1. 将自相关性检查加入CheckCorrelation流程，在检查ProdCorr之前先在本地检验自相关性，只检验自相关性小于某个阈值的ProdCorr，从而降低平台计算资源的请求频率，避免被限流，从而显著提升CheckCorr的效率，
##### 2. 自相关性可以嵌入到set_alpha_property之前。最近的PPAC Theme要求为因子打上desc，但是如果为每个因子都打desc会浪费不少的时间，因此可以提前检测自相关性，只有PPAC_Corr <= 0.5的再打desc。
##### <b>源代码来自KZ79256</b>

### <b>三. 工作流程</b>
##### 1. 增量获取已提交alpha的PnL和alpha_id, 并保存为pickle文件
##### 2. 获取目标alpha的全量信息
##### 3. 准备目标alpha的PnL和日收益率
##### 4. 截取近4年alpha_rets数据
##### 5. 计算目标alpha与同region的已提交alpha的皮尔逊相关性

###### *注：2025/5/20后平台自相关算法估计有小幅变化，导致普通自相关计算出现小幅误差，PPAC自相关计算不受影响。*

### <b>四. 环境准备</b>
#####  python_version >= 3.13.2
#####  numpy >= 2.2.4
#####  pandas >= 2.2.3

In [None]:
# 导入官方库
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import logging
import os
from pathlib import Path
import pickle
import requests
import time
from typing import Dict, List, Optional, Tuple

# 导入第三方库
from dotenv import load_dotenv  # 导入环境变量，非必需 pip install python-dotenv
import pandas as pd
import numpy as np

# 读取环境变量文件
load_dotenv()

# 定义用户名, 密码, 文件存储路径，如果明文写入了用户名密码，千万不要把代码上传到公开平台（GitHub, Gitee, B站, 小红书等）！！！
class cfg:
    username = os.getenv('WQ_EMAIL')  # " "
    password = os.getenv('WQ_PASSWORD')  # " "
    data_path = Path('./data')

### <b>五. 定义工具函数</b>

In [2]:
# 定义基础功能函数
def sign_in(username, password):
    """
    登录BRAIN平台

    Arguments:
        username (str): 用户名。
        password (str): 密码。

    Returns:
        requests.Session or None
    """
    s = requests.Session()
    s.auth = (username, password)
    try:
        response = s.post('https://api.worldquantbrain.com/authentication')
        response.raise_for_status()
        logging.info("Successfully signed in")
        return s
    except requests.exceptions.RequestException as e:
        logging.error(f"Login failed: {e}")
        return None


def save_obj(obj: object, name: str) -> None:
    """
    保存对象到文件中，以 pickle 格式序列化。

    Args:
        obj (object): 需要保存的对象。
        name (str): 文件名（不包含扩展名），保存的文件将以 '.pickle' 为扩展名。

    Returns:
        None: 此函数无返回值。

    Raises:
        pickle.PickleError: 如果序列化过程中发生错误。
        IOError: 如果文件写入过程中发生 I/O 错误。
    """
    with open(name + '.pickle', 'wb') as f:
        pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)


def load_obj(name: str) -> object:
    """
    加载指定名称的 pickle 文件并返回其内容。

    此函数会打开一个以 `.pickle` 为扩展名的文件，并使用 `pickle` 模块加载其内容。

    Args:
        name (str): 不带扩展名的文件名称。

    Returns:
        object: 从 pickle 文件中加载的 Python 对象。

    Raises:
        FileNotFoundError: 如果指定的文件不存在。
        pickle.UnpicklingError: 如果文件内容无法被正确反序列化。
    """
    with open(name + '.pickle', 'rb') as f:
        return pickle.load(f)


def wait_get(url: str, max_retries: int = 10) -> "Response":
    """
    发送带有重试机制的 GET 请求，直到成功或达到最大重试次数。
    此函数会根据服务器返回的 `Retry-After` 头信息进行等待，并在遇到 401 状态码时重新初始化配置。

    Args:
        url (str): 目标 URL。
        max_retries (int, optional): 最大重试次数，默认为 10。

    Returns:
        Response: 请求的响应对象。
    """
    retries = 0
    while retries < max_retries:
        while True:
            simulation_progress = sess.get(url)
            if simulation_progress.headers.get("Retry-After", 0) == 0:
                break
            time.sleep(float(simulation_progress.headers["Retry-After"]))
        if simulation_progress.status_code < 400:
            break
        else:
            time.sleep(2 ** retries)
            retries += 1
    return simulation_progress

### <b>六. 定义获取alpha_pnl的函数</b>

In [3]:
def _get_alpha_pnl(alpha_id: str) -> pd.DataFrame:
    """
    获取指定 alpha 的 PnL数据，并返回一个包含日期和 PnL 的 DataFrame。

    此函数通过调用 WorldQuant Brain API 获取指定 alpha 的 PnL 数据，
    并将其转换为 pandas DataFrame 格式，方便后续数据处理。

    Args:
        alpha_id (str): Alpha 的唯一标识符。

    Returns:
        pd.DataFrame: 包含日期和对应 PnL 数据的 DataFrame，列名为 'Date' 和 alpha_id。
    """
    pnl = wait_get("https://api.worldquantbrain.com/alphas/" + alpha_id + "/recordsets/pnl").json()
    df = pd.DataFrame(pnl['records'], columns=[item['name'] for item in pnl['schema']['properties']])
    df = df.rename(columns={'date':'Date', 'pnl':alpha_id})
    df = df[['Date', alpha_id]]
    return df


def get_alpha_pnls(
    alphas: list[dict], 
    alpha_pnls: Optional[pd.DataFrame] = None, 
    alpha_ids: Optional[dict[str, list]] = None
) -> Tuple[dict[str, list], pd.DataFrame]:
    """
    获取 alpha 的 PnL 数据，并按区域分类 alpha 的 ID。

    Args:
        alphas (list[dict]): 包含 alpha 信息的列表，每个元素是一个字典，包含 alpha 的 ID 和设置等信息。
        alpha_pnls (Optional[pd.DataFrame], 可选): 已有的 alpha PnL 数据，默认为空的 DataFrame。
        alpha_ids (Optional[dict[str, list]], 可选): 按区域分类的 alpha ID 字典，默认为空字典。

    Returns:
        Tuple[dict[str, list], pd.DataFrame]: 
            - 按区域分类的 alpha ID 字典。
            - 包含所有 alpha 的 PnL 数据的 DataFrame。
    """
    # 如果alpha_ids字典没有传入，则生成一个默认值为list()的字典存储alpha_id以及一个dataframe存储alpha_pnls
    if alpha_ids is None:
        alpha_ids = defaultdict(list)
    if alpha_pnls is None:
        alpha_pnls = pd.DataFrame()
    # 筛选新增的alpha_id
    new_alphas = [item for item in alphas if item['id'] not in alpha_pnls.columns]
    if not new_alphas:
        return alpha_ids, alpha_pnls
    # 在alpha_ids字典中的对应region增加新增的alpha_id
    for item_alpha in new_alphas:
        alpha_ids[item_alpha['settings']['region']].append(item_alpha['id'])

    # 使用线程池并发对多个alpha_ids批量抓取PnL数据
    fetch_pnl_func = lambda alpha_id: _get_alpha_pnl(alpha_id).set_index('Date')
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = executor.map(fetch_pnl_func, [item['id'] for item in new_alphas])
    # 把获取的alpha_pnls和已有的PnL数据合并为一个dataframe，并按日期升序排列
    alpha_pnls = pd.concat([alpha_pnls] + list(results), axis=1)
    alpha_pnls.sort_index(inplace=True)

    return alpha_ids, alpha_pnls


def get_os_alphas(limit: int = 100, get_first: bool = False) -> List[Dict]:
    """
    获取OS阶段的alpha列表。

    此函数通过调用WorldQuant Brain API获取用户的alpha列表，支持分页获取，并可以选择只获取第一个结果。

    Args:
        limit (int, optional): 每次请求获取的alpha数量限制。默认为100。
        get_first (bool, optional): 是否只获取第一次请求的alpha结果。如果为True，则只请求一次。默认为False。

    Returns:
        List[Dict]: 包含alpha信息的字典列表，每个字典表示一个alpha。
    """
    fetched_alphas = []
    offset = 0
    retries = 0
    total_alphas = 100
    while len(fetched_alphas) < total_alphas:
        print(f"Fetching alphas from offset {offset} to {offset + limit}")
        url = f"https://api.worldquantbrain.com/users/self/alphas?stage=OS&limit={limit}&offset={offset}&order=-dateSubmitted"
        res = wait_get(url).json()
        if offset == 0:
            total_alphas = res['count']
        alphas = res["results"]
        fetched_alphas.extend(alphas)
        if len(alphas) < limit:
            break
        offset += limit
        if get_first:
            break
    return fetched_alphas[:total_alphas]


def download_data(flag_increment=True):
    """
    下载数据并保存到指定路径。

    此函数会检查数据是否已经存在，如果不存在，则从 API 下载数据并保存到指定路径。

    Args:
        flag_increment (bool): 是否使用增量下载，默认为 True。
    """
    # 根据是否增量下载给变量赋值
    if flag_increment:
        try:
            os_alpha_ids = load_obj(str(cfg.data_path / 'os_alpha_ids'))
            os_alpha_pnls = load_obj(str(cfg.data_path / 'os_alpha_pnls'))
            ppac_alpha_ids = load_obj(str(cfg.data_path / 'ppac_alpha_ids'))
            exist_alpha = [alpha for ids in os_alpha_ids.values() for alpha in ids]
        except Exception as e:
            logging.error(f"Failed to load existing data: {e}")
            os_alpha_ids = None
            os_alpha_pnls = None
            exist_alpha = []
            ppac_alpha_ids = []
    else:
        os_alpha_ids = None
        os_alpha_pnls = None
        exist_alpha = []
        ppac_alpha_ids = []
    # 增量/全量获取已提交alpha信息
    if os_alpha_ids is None:
        alphas = get_os_alphas(limit=100, get_first=False)
    else:
        alphas = get_os_alphas(limit=30, get_first=True)  # 可以根据实际情况调整limit
    # 筛选新增alpha_id的数据
    alphas = [item for item in alphas if item['id'] not in exist_alpha]
    # 获取符合ppac theme的alpha_ids
    ppac_alpha_ids += [item['id'] for item in alphas for item_match in item['classifications'] if item_match['name'] == 'Power Pool Alpha']
    # 获取全量的alpha_pnls以及全量alpha_ids和ppac主题alpha_ids，并保存为pickle文件
    os_alpha_ids, os_alpha_pnls = get_alpha_pnls(alphas, alpha_pnls=os_alpha_pnls, alpha_ids=os_alpha_ids)
    save_obj(os_alpha_ids, str(cfg.data_path / 'os_alpha_ids'))
    save_obj(os_alpha_pnls, str(cfg.data_path / 'os_alpha_pnls'))
    save_obj(ppac_alpha_ids, str(cfg.data_path / 'ppac_alpha_ids'))
    print(f'新下载的alpha数量: {len(alphas)}, 目前总共alpha数量: {os_alpha_pnls.shape[1]}')


def load_data(tag=None):
    """
    加载数据。

    此函数会检查数据是否已经存在，如果不存在，则从 API 下载数据并保存到指定路径。

    Args:
        tag (str): 数据标记，默认为 None。
    """

    # 读取pickle文件
    os_alpha_ids = load_obj(str(cfg.data_path / 'os_alpha_ids'))
    os_alpha_pnls = load_obj(str(cfg.data_path / 'os_alpha_pnls'))
    ppac_alpha_ids = load_obj(str(cfg.data_path / 'ppac_alpha_ids'))

    # 根据tag筛选alpha_pnls
    if tag=='PPAC':
        for item in os_alpha_ids:
            os_alpha_ids[item] = [alpha for alpha in os_alpha_ids[item] if alpha in ppac_alpha_ids]
    elif tag=='SelfCorr':
        for item in os_alpha_ids:
            os_alpha_ids[item] = [alpha for alpha in os_alpha_ids[item] if alpha not in ppac_alpha_ids]
    else:
        os_alpha_ids = os_alpha_ids

    exist_alpha = [alpha for ids in os_alpha_ids.values() for alpha in ids]
    os_alpha_pnls = os_alpha_pnls[exist_alpha]

    # 计算已提交alpha的日度收益率并截取近4年数据
    os_alpha_rets = os_alpha_pnls - os_alpha_pnls.ffill().shift(1)
    os_alpha_rets = os_alpha_rets[pd.to_datetime(os_alpha_rets.index)>pd.to_datetime(os_alpha_rets.index).max() - pd.DateOffset(years=4)]
    return os_alpha_ids, os_alpha_rets

### <b>七. 定义计算相关性函数</b>

In [4]:
def calc_self_corr(
    alpha_id: str,
    os_alpha_rets: pd.DataFrame | None = None,
    os_alpha_ids: dict[str, str] | None = None,
    alpha_result: dict | None = None,
    return_alpha_pnls: bool = False,
    alpha_pnls: pd.DataFrame | None = None
) -> float | tuple[float, pd.DataFrame]:
    """
    计算指定 alpha 与其他 alpha 的最大自相关性。

    Args:
        alpha_id (str): 目标 alpha 的唯一标识符。
        os_alpha_rets (pd.DataFrame | None, optional): 其他 alpha 的收益率数据，默认为 None。
        os_alpha_ids (dict[str, str] | None, optional): 其他 alpha 的标识符映射，默认为 None。
        alpha_result (dict | None, optional): 目标 alpha 的详细信息，默认为 None。
        return_alpha_pnls (bool, optional): 是否返回 alpha 的 PnL 数据，默认为 False。
        alpha_pnls (pd.DataFrame | None, optional): 目标 alpha 的 PnL 数据，默认为 None。

    Returns:
        float | tuple[float, pd.DataFrame]: 如果 `return_alpha_pnls` 为 False，返回最大自相关性值；
            如果 `return_alpha_pnls` 为 True，返回包含最大自相关性值和 alpha PnL 数据的元组。
    """
    # 按需获取alpha_result
    if alpha_result is None:
        alpha_result = wait_get(f"https://api.worldquantbrain.com/alphas/{alpha_id}").json()

    # 按需获取alpha_pnls
    if alpha_pnls is not None:
        if len(alpha_pnls) == 0:
            alpha_pnls = None
    if alpha_pnls is None:
        _, alpha_pnls = get_alpha_pnls([alpha_result])
        alpha_pnls = alpha_pnls[alpha_id]

    # 计算因子的日度收益率
    alpha_rets = alpha_pnls - alpha_pnls.ffill().shift(1)
    # 截取最近4年数据
    alpha_rets = alpha_rets[pd.to_datetime(alpha_rets.index)>pd.to_datetime(alpha_rets.index).max() - pd.DateOffset(years=4)]
    # 打印结果到控制台
    print(os_alpha_rets[os_alpha_ids[alpha_result['settings']['region']]].corrwith(alpha_rets).sort_values(ascending=False).round(4))
    # 按需决定是否需要保存到csv文件
    # os_alpha_rets[os_alpha_ids[alpha_result['settings']['region']]].corrwith(alpha_rets).sort_values(ascending=False).round(4).to_csv(str(cfg.data_path / 'os_alpha_corr.csv'))
    # 返回最大值
    self_corr = os_alpha_rets[os_alpha_ids[alpha_result['settings']['region']]].corrwith(alpha_rets).max()
    if np.isnan(self_corr):
        self_corr = 0
    if return_alpha_pnls:
        return self_corr, alpha_pnls
    else:
        return self_corr

### <b>八. 调用函数计算自相关性</b>

In [5]:
# 登录
global sess
sess = sign_in(cfg.username, cfg.password)

In [6]:
# 增量下载数据
download_data(flag_increment=True)

Fetching alphas from offset 0 to 30
新下载的alpha数量: 7, 目前总共alpha数量: 786


In [7]:
# 计算相关性
alpha_id = 'zArN9G1'
os_alpha_ids, os_alpha_rets = load_data(tag='SelfCorr')  # 加载数据， 如果需要使用不同的标签，可以传入 tag 参数， 例如 tag='PPAC' 或 tag='SelfCorr'
calc_self_corr(
    alpha_id=alpha_id,
    os_alpha_rets=os_alpha_rets,
    os_alpha_ids=os_alpha_ids,
)

7ZeYq7O    0.5112
A0llYKX    0.4517
rjOAqv9    0.4477
e9zzmpO    0.4231
6rxeLr7    0.4148
            ...  
7Za5XG2   -0.1968
GdQKWAG   -0.2149
a1QZLm9   -0.2188
QGNdPgW   -0.2336
2OOJgv8   -0.2435
Length: 224, dtype: float64


np.float64(0.5112330270153354)

In [8]:
alpha_id = 'zArN9G1'
os_alpha_ids, os_alpha_rets = load_data(tag='PPAC')
calc_self_corr(
    alpha_id=alpha_id,
    os_alpha_rets=os_alpha_rets,
    os_alpha_ids=os_alpha_ids,
)

L8ZQv1M    0.4527
QWmAnRW    0.4047
xbEjoWb    0.4041
LZ0PeYm    0.3896
lKYbkQN    0.3625
            ...  
NVE010p   -0.1117
aa0mPVR   -0.1244
oE21k62   -0.1245
w9v5xZp   -0.1431
d9WOkOX   -0.1958
Length: 87, dtype: float64


np.float64(0.45271395078142107)

### <b>七. 检验结果</b>

![](https://pic1.imgdb.cn/item/682d8a1858cb8da5c801dac8.jpg)
![](https://pic1.imgdb.cn/item/682d3d5158cb8da5c80020fa.jpg)
