# Run Pipeline

Main entry point. Configure variables below, then run to generate and optionally execute the bash script.

In [161]:
from pathlib import Path
import json
import subprocess

def find_repo_root() -> Path:
    cur = Path().resolve()
    for _ in range(6):
        if (cur / 'README.md').exists() and (cur / 'src').exists():
            return cur
        cur = cur.parent
    return Path().resolve()

PROJECT_ROOT = find_repo_root()
RUN_NAME_BASE = "ML_2"
SPLIT_MODE = "simple"  # or "forward"
LABEL_MODE = "winsor_csz"  # raw | winsor_csz | neu_winsor_csz
# NOTE: X_WINSORIZE_BY_DATE / X_ZSCORE_BY_DATE apply to FEATURES only (not labels).
X_WINSORIZE_BY_DATE = True  # feature winsorization only
X_ZSCORE_BY_DATE = True  # feature zscore only
Q_LOW = 0.02
Q_HIGH = 0.98
MIN_N = 50
EPS = 1e-12

# Data preprocessing toggles
DV_LOG1P = True
ADD_TIME_FEATURES = True
ADD_MARKET_STATE_FEATURES = True
ADD_INDUSTRY_STATE_FEATURES = True
DUMP_DEBUG_SNAPSHOTS = False
DEBUG_N_ROWS = 2000
DEBUG_N_FEATURES = 60

DATA_ROOT = str(PROJECT_ROOT / "data" / "project_5year")
POSTPROCESS_PIPELINE = "none"  # or neutral_then_z / z_then_neutral
USE_PRED_Z = False
USE_NEUTRALIZE = False 
# MODELS_TO_RUN = ["torch_cnn", "torch_rnn", "torch_lstm"]  # add: "torch_cnn","torch_rnn","torch_lstm","torch_gru" "lgbm", 
MODELS_TO_RUN = ["lgbm", "extra_trees", "elasticnet", "xgb", "torch_mlp", "torch_cnn", "torch_rnn", "torch_lstm", "torch_gru"]  # add: "torch_cnn","torch_rnn","torch_lstm","torch_gru" "lgbm", 
# Torch sequence models (cnn/rnn/lstm/gru): only use G_raw_all.
# r_i and dv_i are treated as time sequence pairs: (T=20, C=2).
# f_i goes through a small MLP branch; industry_/beta/indbeta goes through a linear risk head.

# Training-related hyperparameters
TORCH_MAX_EPOCHS = 300  # shared default for torch models
TORCH_BATCH_SIZE = 1024
TORCH_NUM_WORKERS = 16
TORCH_LR = 0.003
TORCH_SCHEDULER_TYPE = "plateau_ic"  # none | plateau_ic | cosine # lr下降太快了！
TORCH_EARLY_STOP_PATIENCE = 50
TORCH_EARLY_STOP_MIN_DELTA = 0.0
TORCH_LOG_EVERY = 5

# Model-related hyperparameters
MODEL_PARAM_GRID = {
    "ridge": {"alpha": [0.1, 1.0, 10.0]},
    "elasticnet": {"alpha": [0.001, 0.01], "l1_ratio": [0.2, 0.5, 0.8]},
    "rf": {"n_estimators": [100, 200, 300], "max_depth": [None, 5]},
    "extra_trees": {"n_estimators": [200, 500], "max_depth": [None, 10]},
    "lgbm": {"num_leaves": [31, 63, 127], "learning_rate": [0.01, 0.05, 0.1], "n_estimators": [500]},
    "catboost": {"depth": [6, 8], "learning_rate": [0.05], "iterations": [500]},
    "xgb": {"n_estimators": [300, 600], "max_depth": [4, 6], "learning_rate": [0.05]},
    # Torch MLP (alpha head) params
    "torch_mlp": {"num_layers": [2], "hidden_dim": [512], "dropout": [0.1], "batch_norm": [True], "residual": [True], "batch_size": [TORCH_BATCH_SIZE], "num_workers": [TORCH_NUM_WORKERS], "max_epochs": [TORCH_MAX_EPOCHS], "log_every": [TORCH_LOG_EVERY], "lr": [TORCH_LR], "early_stop_patience": [TORCH_EARLY_STOP_PATIENCE], "early_stop_min_delta": [TORCH_EARLY_STOP_MIN_DELTA], "scheduler_type": [TORCH_SCHEDULER_TYPE]},
    # Torch CNN/RNN: uses only G_raw_all (r_i, dv_i, f_i, industry_, beta, indbeta)
    "torch_cnn": {"cnn_channels": [(512, 512), (1024, 1024)], "kernel_size": [3], "dropout": [0.1], "alpha_layers": [2], "alpha_hidden_dim": [512], "f_layers": [1], "f_hidden_dim": [512], "batch_size": [TORCH_BATCH_SIZE], "num_workers": [TORCH_NUM_WORKERS], "max_epochs": [TORCH_MAX_EPOCHS], "log_every": [TORCH_LOG_EVERY], "lr": [TORCH_LR], "early_stop_patience": [TORCH_EARLY_STOP_PATIENCE], "early_stop_min_delta": [TORCH_EARLY_STOP_MIN_DELTA], "scheduler_type": [TORCH_SCHEDULER_TYPE], "residual": [True]},
    "torch_rnn": {"rnn_hidden": [512], "rnn_layers": [2], "dropout": [0.1], "alpha_layers": [2], "alpha_hidden_dim": [512], "f_layers": [1], "f_hidden_dim": [512], "batch_size": [TORCH_BATCH_SIZE // 4], "num_workers": [TORCH_NUM_WORKERS], "max_epochs": [TORCH_MAX_EPOCHS], "log_every": [TORCH_LOG_EVERY], "lr": [TORCH_LR], "early_stop_patience": [TORCH_EARLY_STOP_PATIENCE], "early_stop_min_delta": [TORCH_EARLY_STOP_MIN_DELTA], "scheduler_type": [TORCH_SCHEDULER_TYPE], "residual": [True]},
    "torch_lstm": {"rnn_hidden": [512], "rnn_layers": [2], "dropout": [0.1], "alpha_layers": [2], "alpha_hidden_dim": [512], "f_layers": [1], "f_hidden_dim": [32], "batch_size": [TORCH_BATCH_SIZE // 4], "num_workers": [TORCH_NUM_WORKERS], "max_epochs": [TORCH_MAX_EPOCHS], "log_every": [TORCH_LOG_EVERY], "lr": [TORCH_LR], "early_stop_patience": [TORCH_EARLY_STOP_PATIENCE], "early_stop_min_delta": [TORCH_EARLY_STOP_MIN_DELTA], "scheduler_type": [TORCH_SCHEDULER_TYPE], "residual": [True]},
    "torch_gru": {"rnn_hidden": [64], "rnn_layers": [1], "dropout": [0.1], "alpha_layers": [2], "alpha_hidden_dim": [64], "f_layers": [1], "f_hidden_dim": [32], "batch_size": [TORCH_BATCH_SIZE // 4], "num_workers": [TORCH_NUM_WORKERS], "max_epochs": [TORCH_MAX_EPOCHS], "log_every": [TORCH_LOG_EVERY], "lr": [TORCH_LR], "early_stop_patience": [TORCH_EARLY_STOP_PATIENCE], "early_stop_min_delta": [TORCH_EARLY_STOP_MIN_DELTA], "scheduler_type": [TORCH_SCHEDULER_TYPE], "residual": [True]},
}
N_WORKERS = 16
DATA_WORKERS = 16  # parallel loading for daily files
PREPROCESS_WORKERS = 8  # multiprocessing for preprocessing
DO_BUILD_FEATURES = True
DO_RUN_TRAINING = True
SAVE_PREDS = False
USE_FEAT = True
GPU_ID = 3  # default A100; set None for CPU
SAMPLE_DAYS_PER_YEAR = 0  # set 0 for full data
PARALLEL_MODELS = 1
PARALLEL_GRID = 2  # grid-search workers per model
MAX_EVALS = 0  # random-search cap per model (0=full grid)
GRID_SEED = 42
N_JOBS_PER_MODEL = 16
DEBUG = True  # verbose debug outputs

RUN_NAME = f"{RUN_NAME_BASE}__{SPLIT_MODE}__{LABEL_MODE}__xw{int(X_WINSORIZE_BY_DATE)}_xz{int(X_ZSCORE_BY_DATE)}"
RUN_NAME += f"__dv{int(DV_LOG1P)}_tf{int(ADD_TIME_FEATURES)}_mkt{int(ADD_MARKET_STATE_FEATURES)}_ind{int(ADD_INDUSTRY_STATE_FEATURES)}"
RUN_NAME += f"__q{Q_LOW}-{Q_HIGH}_n{MIN_N}__sd{SAMPLE_DAYS_PER_YEAR}"


In [162]:
run_dir = PROJECT_ROOT / 'res' / 'experiments' / RUN_NAME
script_dir = PROJECT_ROOT / 'scripts' / RUN_NAME
run_dir.mkdir(parents=True, exist_ok=True)
script_dir.mkdir(parents=True, exist_ok=True)


In [163]:
MODEL_PARAM_GRID = {k: v for k, v in MODEL_PARAM_GRID.items() if k in MODELS_TO_RUN}
grid_path = script_dir / "param_grid.json"
grid_path.write_text(json.dumps(MODEL_PARAM_GRID, indent=2))

LOG_PATH = run_dir / "log.txt"

cmds = []

if DO_BUILD_FEATURES:
    cmds.append(
        f"python -m src.features.build_features --data_root {DATA_ROOT} --n_workers {N_WORKERS} --run_name {RUN_NAME}"
    )

if DO_RUN_TRAINING:
    models = ",".join(MODELS_TO_RUN)
    cmd = (
        f"python -m src.train.run_experiment --data_root {DATA_ROOT} --run_name {RUN_NAME} --split_mode {SPLIT_MODE} --models {models} "
        + (" --use_feat" if USE_FEAT else "")
        + (" --save_preds" if SAVE_PREDS else "")
        + (f" --gpu_id {GPU_ID}" if GPU_ID is not None else "")
        + (f" --sample_days_per_year {SAMPLE_DAYS_PER_YEAR}" if SAMPLE_DAYS_PER_YEAR else "")
        + (f" --parallel_models {PARALLEL_MODELS}" if PARALLEL_MODELS else "")
        + (f" --parallel_grid {PARALLEL_GRID}" if PARALLEL_GRID else "")
        + (f" --max_evals {MAX_EVALS}" if MAX_EVALS else "")
        + (f" --grid_seed {GRID_SEED}" if GRID_SEED else "")
        + (f" --n_jobs {N_JOBS_PER_MODEL}" if N_JOBS_PER_MODEL else "")
        + (f" --data_workers {DATA_WORKERS}" if DATA_WORKERS else "")
        + (f" --preprocess_workers {PREPROCESS_WORKERS}" if PREPROCESS_WORKERS else "")
        + (f" --label_mode {LABEL_MODE}")
        + (f" --param_grid {grid_path}")
        + (" --x_winsorize_by_date" if X_WINSORIZE_BY_DATE else "")
        + (" --x_zscore_by_date" if X_ZSCORE_BY_DATE else "")
        + (f" --q_low {Q_LOW} --q_high {Q_HIGH} --min_n {MIN_N} --eps {EPS}")
        + (" --dv_log1p" if DV_LOG1P else " --no-dv-log1p")
        + (" --add_time_features" if ADD_TIME_FEATURES else " --no-add-time-features")
        + (" --add_market_state_features" if ADD_MARKET_STATE_FEATURES else " --no-add-market-state-features")
        + (" --add_industry_state_features" if ADD_INDUSTRY_STATE_FEATURES else " --no-add-industry-state-features")
        + (" --dump_debug_snapshots" if DUMP_DEBUG_SNAPSHOTS else "")
        + (f" --debug_n_rows {DEBUG_N_ROWS} --debug_n_features {DEBUG_N_FEATURES}")
        + (f" --postprocess_pipeline {POSTPROCESS_PIPELINE}")
        + (" --use_pred_z" if USE_PRED_Z else "")
        + (" --use_neutralize" if USE_NEUTRALIZE else "") + (" --debug" if DEBUG else "")
    )
    cmds.append(cmd)

cmds = [f"{c} |& tee -a {LOG_PATH}" for c in cmds]
script_path = script_dir / "run_all.sh"
content = "#!/usr/bin/env bash\nset -euo pipefail\n" + "\n".join(cmds) + "\n"
script_path.write_text(content)
script_path.chmod(0o755)
script_path


PosixPath('/data1/hyzhang/Projects/intreviews_2026/quant_yanfu/scripts/ML_2__simple__winsor_csz__xw1_xz1__dv1_tf1_mkt1_ind1__q0.02-0.98_n50__sd0/run_all.sh')

In [146]:
# Optional: execute the script
# subprocess.run([str(script_path)], check=True)

In [31]:
# Load results if available
import pandas as pd
metrics_path = run_dir / 'metrics.csv'
if metrics_path.exists():
    display(pd.read_csv(metrics_path))


In [156]:
# Cleanup: remove runs without tuning outputs
import shutil
exp_root = PROJECT_ROOT / 'res' / 'experiments'
script_root = PROJECT_ROOT / 'scripts'
removed = []
for exp_dir in exp_root.iterdir():
    if not exp_dir.is_dir():
        continue
    tuning_dir = exp_dir / 'tuning'
    if (not tuning_dir.exists()) or (not any(tuning_dir.iterdir())):
        shutil.rmtree(exp_dir)
        script_dir = script_root / exp_dir.name
        if script_dir.exists():
            shutil.rmtree(script_dir)
        removed.append(exp_dir.name)
print('Removed:' if removed else 'Removed: None', removed)


Removed: None []
