### 这个 notebook 用来读取文件，并测试一些方法

In [3]:
import os
import numpy as np

def get_all_py_files(root_dir: str) -> list:
    """
    获取指定目录及其子目录下的所有 .py 文件路径。

    Args:
        root_dir (str): 起始目录路径。

    Returns:
        list: 包含所有 .py 文件完整路径的列表。
    """
    py_files = []
    for dirpath, _, filenames in os.walk(root_dir):
        for file in filenames:
            if file.endswith(".py"):
                py_files.append(os.path.join(dirpath, file))
    return py_files

def write_all_py_contents_to_output(py_files: list, output_path: str = "src.txt") -> None:
    """
    将所有 .py 文件的内容写入一个文本文件中，并打印文件名作为分隔。

    Args:
        py_files (list): .py 文件路径列表。
        output_path (str): 输出文件路径。
    """
    with open(output_path, "w", encoding="utf-8") as out_file:
        for path in py_files:
            out_file.write(f"{'=' * 80}\n")
            out_file.write(f"File: {path}\n")
            out_file.write(f"{'-' * 80}\n")
            try:
                with open(path, "r", encoding="utf-8") as f:
                    out_file.write(f.read())
            except Exception as e:
                out_file.write(f"⚠️ Error reading {path}: {e}\n")
            out_file.write("\n\n")

if True:
    root_directory = "."  # 当前目录
    all_py_files = get_all_py_files(root_directory)
    write_all_py_contents_to_output(all_py_files)
    print(f"📄 所有 Python 文件内容已写入 src.txt（共 {len(all_py_files)} 个文件）")



📄 所有 Python 文件内容已写入 src.txt（共 14 个文件）


In [17]:
import importlib
from utility import set_random_seed

def reload_components():
    """
    重新加载以下模块，以便在开发过程中即时生效：
      - data.load_market_data
      - tokenizer.AlphaTokenizer
      - combination.AlphaCombinationModel
      - envs.AlphaGenerationEnv
      - generator.RLAlphaGenerator
    """
    import data, tokenizer, combination, alpha_generation_env, generator

    importlib.reload(data)
    importlib.reload(tokenizer)
    importlib.reload(combination)
    importlib.reload(alpha_generation_env)
    importlib.reload(generator)

    # 重新绑定到本地名称（可选）
    from data import load_market_data
    from tokenizer import AlphaTokenizer
    from combination import AlphaCombinationModel
    from alpha_generation_env import AlphaGenerationEnv
    from generator import RLAlphaGenerator

    return {
        "load_market_data": load_market_data,
        "AlphaTokenizer": AlphaTokenizer,
        "AlphaCombinationModel": AlphaCombinationModel,
        "AlphaGenerationEnv": AlphaGenerationEnv,
        "RLAlphaGenerator": RLAlphaGenerator,
    }

components = reload_components()


### 1.测试 AlphaCombinationModel._compute_alpha_from_expr

In [18]:
from combination import AlphaCombinationModel
from data import load_market_data

df = load_market_data()
model = AlphaCombinationModel()
model.inject_data(df, target_col='target')

# 表达式：close 的 100 秒均值
expr = "close 100 ts_mean"
alpha = model._compute_alpha_from_expr(expr)

print("alpha shape:", alpha.shape)
print("alpha sample:", alpha[~np.isnan(alpha)][:5])


alpha shape: (64861,)
alpha sample: [2971.82 2971.85 2971.83 2971.81 2971.81]


###  2. 测试 AlphaCombinationModel.add_alpha_expr

In [19]:
ic = model.add_alpha_expr("high low sub 100 ts_max")
print("该因子的 IC 为：", ic)
print("当前池中因子数：", len(model.alphas))


该因子的 IC 为： 0.017208891053807192
当前池中因子数： 1


### 3. 测试 AlphaTokenizer.encode / decode

In [20]:
from tokenizer import AlphaTokenizer

tokenizer = AlphaTokenizer()

expr = "close 5 ts_mean"
ids = tokenizer.encode(expr)
decoded = tokenizer.decode(ids)

print("Token IDs:", ids)
print("Decoded expr:", decoded)


Token IDs: [1, 6, 10, 32, 2]
Decoded expr: close 5 ts_mean


### 4. 测试 AlphaGenerationEnv.reset / step

In [21]:
from alpha_generation_env import AlphaGenerationEnv
from combination import AlphaCombinationModel
from tokenizer import AlphaTokenizer
from data import load_market_data

df = load_market_data()
combo = AlphaCombinationModel()
combo.inject_data(df, target_col='target')
tokenizer = AlphaTokenizer()
env = AlphaGenerationEnv(combo, tokenizer)

obs = env.reset()
print("初始状态 token IDs:", obs)

valid = env.valid_actions()
action = valid[1]
obs2, reward, done, info = env.step(action)
print("新状态:", obs2)
print("Reward:", reward, "Done:", done)


初始状态 token IDs: [1]
新状态: [1, 4]
Reward: 0.0 Done: False


### 5. 测试 PolicyNetwork / ValueNetwork 输出维度

In [10]:
import torch
from generator import PolicyNetwork, ValueNetwork

vocab_size = 50
seq_len = 6
hidden_dim = 64
device = "cpu"

x = torch.randint(0, vocab_size, (1, seq_len))  # batch_size=1
policy = PolicyNetwork(vocab_size, hidden_dim).to(device)
value = ValueNetwork(vocab_size, hidden_dim).to(device)

h0_p = policy.init_hidden(1, device)
logits, _ = policy(x, h0_p)
print("Policy logits shape:", logits.shape)  # 应为 (1, vocab_size)

h0_v = value.init_hidden(1, device)
v, _ = value(x, h0_v)
print("Value estimate shape:", v.shape)      # 应为 (1,)


Policy logits shape: torch.Size([1, 50])
Value estimate shape: torch.Size([1])


### 6. 测试 RLAlphaGenerator._collect_trajectories

In [24]:
from generator import RLAlphaGenerator
from alpha_generation_env import AlphaGenerationEnv
from combination import AlphaCombinationModel
from tokenizer import AlphaTokenizer
from data import load_market_data
from utility import set_random_seed

# reload_components()

set_random_seed(10)

df = load_market_data()
combo = AlphaCombinationModel()
combo.inject_data(df, "target")
tokenizer = AlphaTokenizer()
env = AlphaGenerationEnv(combo, tokenizer, max_len=20)

cfg = dict(
    vocab_size=tokenizer.vocab_size,
    hidden_dim=1280,
    batch_size=1280,
    device="cpu",
)

agent = RLAlphaGenerator(env, cfg)

s, a, logp, ret, adv = agent._collect_trajectories()
print("Sample states shape:", s.shape)
print("Sample actions shape:", a.shape)
print("Sample rewards (returns):", ret.shape, ret)


  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)


Sample states shape: torch.Size([1280, 20])
Sample actions shape: torch.Size([1280])
Sample rewards (returns): torch.Size([1280]) tensor([-0.0044, -0.0044, -0.0044,  ...,  0.0000,  0.0000,  0.0000])


In [None]:
ret

### 7.测试训练脚本

In [22]:
from data import load_market_data
from combination import AlphaCombinationModel
from tokenizer import AlphaTokenizer
from alpha_generation_env import AlphaGenerationEnv
from generator import RLAlphaGenerator

df = load_market_data(path="data/rb_20250606_primary.csv")
combo = AlphaCombinationModel(max_pool_size=10)
combo.inject_data(df, target_col="target")

tokenizer = AlphaTokenizer()
env = AlphaGenerationEnv(combo, tokenizer, max_len=10)

config = {
    "vocab_size": tokenizer.vocab_size,
    "hidden_dim": 16,
    "batch_size": 32,
    "lr_policy": 1e-4,
    "lr_value": 1e-3,
    "gamma": 0.99,
    "clip_eps": 0.2,
    "entropy_coef": 0.0,
    "value_coef": 0.5,
    "update_epochs": 1,
    "max_seq_len": 10,
    "device": "cpu",
}

agent = RLAlphaGenerator(env, config)
print("✔️ 初始环境和 agent 创建完毕")

states, actions, old_logps, returns, advantages = agent._collect_trajectories()
print(f"采样轨迹 shapes: states={states.shape}, actions={actions.shape}, returns={returns.shape}, advantages={advantages.shape}")
assert states.shape[0] == config["batch_size"]
assert actions.ndim == 1
print("✔️ _collect_trajectories 正常工作")

# Cell 4: 单次迭代训练测试（无报错即可）
# 只跑 2 次迭代，观察控制台输出
agent.train(num_iterations=2)
print("✔️ train(num_iterations=2) 完成（无异常抛出）")

import torch

# 再初始化一个新 agent，保存初始 policy 参数快照
agent2 = RLAlphaGenerator(env, config)
initial = [p.clone() for p in agent2.policy_net.parameters()]

agent2.train(num_iterations=1)

updated = any(not torch.allclose(p0, p1) for p0, p1 in zip(initial, agent2.policy_net.parameters()))
assert updated, "PolicyNetwork 参数在训练后应当发生更新"
print("✔️ PolicyNetwork 参数已更新")


✔️ 初始环境和 agent 创建完毕
采样轨迹 shapes: states=torch.Size([32, 10]), actions=torch.Size([32]), returns=torch.Size([32]), advantages=torch.Size([32])
✔️ _collect_trajectories 正常工作
✔️ train(num_iterations=2) 完成（无异常抛出）
✔️ PolicyNetwork 参数已更新


✔️ PolicyNetwork 参数已更新
