In [41]:
from pathlib import Path
import json
import random
from collections import defaultdict

import numpy as np
import pandas as pd
import pyarrow.dataset as ds
import xgboost as xgb

# configuration
DEVICE = "cpu"  # change to "cuda" if you have a GPU available
ROOT = Path("processed_data2")
TARGET = "stock_ret"
ID_COLS = ["gvkey", "iid", "excntry"]
PARTITION_COLS = ["year", "month"]
AUX_DROP = {"date", "char_date", "char_eom", "ret_eom", "prc", "id"}
NON_FEATURES = set(ID_COLS + PARTITION_COLS) | AUX_DROP | {TARGET}

TRAIN_START = 2005
OOS_START, OOS_END = 2015, 2024
LOG_DIR = Path("train_logs")
LOG_DIR.mkdir(exist_ok=True)

# tuning config
random.seed(0)
TUNE_YEAR = 2018
N_TRIALS = 20
WARM_ROUNDS = 200
EARLY_STOP = 30
PRUNE_MARGIN = 0.01


def list_partitions(root: Path):
    months = []
    for year_dir in sorted(root.glob("year=*")):
        y = int(year_dir.name.split("=")[1])
        for month_dir in sorted(year_dir.glob("month=*")):
            m = int(month_dir.name.split("=")[1])
            months.append((y, m))
    months.sort()
    return months


ALL_MONTHS = list_partitions(ROOT)
if not ALL_MONTHS:
    raise RuntimeError(f"No parquet partitions found under {ROOT}")

MONTH_LOOKUP = defaultdict(list)
for y, m in ALL_MONTHS:
    MONTH_LOOKUP[y].append(m)
for months in MONTH_LOOKUP.values():
    months.sort()

print(f"loaded {len(ALL_MONTHS)} partitions spanning years {min(MONTH_LOOKUP)}-{max(MONTH_LOOKUP)}")


loaded 245 partitions spanning years 2005-2025


In [42]:
# build feature list from column_names.txt and available parquet columns
with open("column_names.txt", "r") as f:
    requested = [c.strip() for c in f if c.strip()]

sample_year, sample_month = ALL_MONTHS[0]
sample_path = ROOT / f"year={sample_year}" / f"month={sample_month}" / "part-0.parquet"
sample_df = pd.read_parquet(sample_path)

available = set(sample_df.columns)
FEATURES = [c for c in requested if c in available and c not in NON_FEATURES]
if not FEATURES:
    raise RuntimeError("Feature list is empty after filtering; check column_names.txt and NON_FEATURES settings.")

missing = [c for c in requested if c not in available and c not in NON_FEATURES]
if missing:
    preview = ", ".join(missing[:10])
    print(f"warning: {len(missing)} requested columns not found in parquet sample (showing up to 10): {preview}")

with open("non-features.txt", "w") as out:
    for col in sorted((available - set(FEATURES)) - {TARGET}):
        out.write(col + "\n")

print(f"retained {len(FEATURES)} features; first 10: {FEATURES[:10]}")


retained 39 features; first 10: ['niq_be', 'prc_highprc_252d', 'niq_at', 'mispricing_perf', 'ni_be', 'ebit_bev', 'me', 'ret_12_1', 'ni_me', 'corr_1260d']


In [43]:
def dataset():
    return ds.dataset(str(ROOT), format="parquet", partitioning="hive")


def months_in(year: int):
    return MONTH_LOOKUP.get(year, [])


class ArrowMonthIter(xgb.core.DataIter):
    """Stream month-sliced panels into XGBoost without loading the full dataset."""

    def __init__(self, y_start, y_end, features, target, batch_size=131072):
        super().__init__()
        self.y_start = y_start
        self.y_end = y_end
        self.features = features
        self.target = target
        self.batch_size = batch_size
        self._plan = [(y, m) for y in range(y_start, y_end + 1) for m in months_in(y)]
        self._dataset = None
        self._scanner_iter = None
        self._idx = 0

    def reset(self):
        if not self._plan:
            raise ValueError(f"No partitions available between years {self.y_start} and {self.y_end}.")
        self._dataset = dataset()
        self._idx = 0
        self._scanner_iter = None

    def next(self, input_data):
        while True:
            if self._scanner_iter is None:
                if self._idx >= len(self._plan):
                    return 0
                y, m = self._plan[self._idx]
                self._idx += 1
                filt = (ds.field("year") == y) & (ds.field("month") == m)
                cols = self.features + [self.target]
                scanner = self._dataset.scanner(columns=cols, filter=filt, batch_size=self.batch_size)
                self._scanner_iter = iter(scanner.to_batches())
            try:
                batch = next(self._scanner_iter)
            except StopIteration:
                self._scanner_iter = None
                continue
            X = np.column_stack([batch.column(name).to_numpy(zero_copy_only=False) for name in self.features]).astype(np.float32, copy=False)
            y = batch.column(self.target).to_numpy(zero_copy_only=False).astype(np.float32, copy=False)
            if X.size == 0:
                continue
            input_data(data=X, label=y)
            return 1


def _build_dmats(train_end, val_start, val_end, max_bin):
    it_tr = ArrowMonthIter(TRAIN_START, train_end, FEATURES, TARGET)
    it_va = ArrowMonthIter(val_start, val_end, FEATURES, TARGET)
    dtr = xgb.QuantileDMatrix(it_tr, missing=np.nan, max_bin=max_bin)
    dva = xgb.QuantileDMatrix(it_va, ref=dtr, missing=np.nan, max_bin=max_bin)
    return dtr, dva


def _train_until(dtr, dva, params, max_rounds):
    booster = xgb.train(
        params,
        dtr,
        num_boost_round=max_rounds,
        evals=[(dva, "val")],
        early_stopping_rounds=EARLY_STOP,
        verbose_eval=False,
    )
    rmse = float(booster.best_score)
    best_it = booster.best_iteration if booster.best_iteration is not None else max_rounds - 1
    preds = booster.predict(dva, iteration_range=(0, best_it + 1))
    sigma_pred = float(np.std(preds))
    return booster, rmse, best_it, sigma_pred


def tune_once():
    if not months_in(TUNE_YEAR):
        raise ValueError(f"No data available for TUNE_YEAR={TUNE_YEAR}. Adjust the constant or regenerate processed_data2.")

    base = {
        "objective": "reg:squarederror",
        "tree_method": "hist",
        "device": DEVICE,
        "eval_metric": "rmse",
        "lambda": 5.0,
        "alpha": 0.0,
        "seed": 0,
        "nthread": -1,
        "colsample_bytree": 0.8,
    }

    mb_candidates = [256, 512]
    mcw_candidates = [1, 20]
    depths = [4, 6]
    etas = [0.03, 0.05]
    subs = [0.7, 0.9]

    trials = [
        {
            "max_depth": random.choice(depths),
            "eta": random.choice(etas),
            "subsample": random.choice(subs),
            "min_child_weight": random.choice(mcw_candidates),
            "max_bin": random.choice(mb_candidates),
        }
        for _ in range(N_TRIALS)
    ]

    dmat_cache = {}

    def get_dmats(max_bin):
        if max_bin not in dmat_cache:
            dmat_cache[max_bin] = _build_dmats(TUNE_YEAR - 3, TUNE_YEAR - 2, TUNE_YEAR - 1, max_bin)
        return dmat_cache[max_bin]

    best_rmse = float("inf")
    best_payload = None

    for trial in trials:
        params = dict(base, **trial)
        dtr, dva = get_dmats(trial["max_bin"])
        _, warm_rmse, _, _ = _train_until(dtr, dva, params, WARM_ROUNDS)
        if best_payload is not None and warm_rmse > best_rmse * (1.0 + PRUNE_MARGIN):
            continue
        booster, rmse, best_it, sigma_pred = _train_until(dtr, dva, params, 2000)
        if rmse < best_rmse:
            best_rmse = rmse
            best_payload = {
                "params": params,
                "best_iteration": best_it,
                "sigma_pred_val": sigma_pred,
            }

    if best_payload is None:
        best_payload = {
            "params": dict(base, max_depth=6, eta=0.05, subsample=0.9, min_child_weight=20, max_bin=256),
            "best_iteration": 1000,
            "sigma_pred_val": None,
        }

    (LOG_DIR / "global_best.json").write_text(json.dumps(best_payload, indent=2))
    print("tuned_params:", json.dumps(best_payload, indent=2))
    return best_payload


def get_best_params():
    payload_path = LOG_DIR / "global_best.json"
    if payload_path.exists():
        return json.loads(payload_path.read_text())
    return tune_once()


def fit_for_oos_year(oos_year: int):
    if not months_in(oos_year - 1):
        raise ValueError(f"Insufficient history to fit year {oos_year}. Ensure previous months exist in processed_data2.")
    best = get_best_params()
    max_bin = best["params"]["max_bin"]
    dtr, dva = _build_dmats(oos_year - 3, oos_year - 2, oos_year - 1, max_bin)
    booster = xgb.train(
        best["params"],
        dtr,
        num_boost_round=5000,
        evals=[(dva, "val")],
        early_stopping_rounds=EARLY_STOP,
        verbose_eval=False,
    )
    return booster


def predict_year(booster: xgb.Booster, year: int, out_root: Path):
    months = months_in(year)
    if not months:
        print(f"skip {year}: no partitions to score")
        return

    out_root.mkdir(exist_ok=True)
    dset = dataset()

    for month in months:
        filt = (ds.field("year") == year) & (ds.field("month") == month)
        cols = FEATURES + ID_COLS + PARTITION_COLS
        batches = dset.scanner(columns=cols, filter=filt, batch_size=131072).to_batches()
        if not batches:
            continue

        rows = []
        preds = []
        for batch in batches:
            X = np.column_stack([batch.column(name).to_numpy(zero_copy_only=False) for name in FEATURES]).astype(np.float32, copy=False)
            dm = xgb.DMatrix(X, missing=np.nan)
            pred = booster.predict(dm)
            ids = {col: batch.column(col).to_numpy(zero_copy_only=False) for col in ID_COLS + PARTITION_COLS}
            rows.append(ids)
            preds.append(pred.astype(np.float32, copy=False))

        if not rows:
            continue

        merged = {col: np.concatenate([chunk[col] for chunk in rows]) for col in rows[0]}
        out_df = pd.DataFrame(merged)
        out_df["pred_ret_t1"] = np.concatenate(preds)
        out_df = out_df.groupby(ID_COLS + PARTITION_COLS, as_index=False)["pred_ret_t1"].mean()
        sigma = float(out_df["pred_ret_t1"].std())
        print(f"year {year} month {month}: sigma_pred {sigma:.6f}")

        dest = out_root / f"year={year}" / f"month={month}"
        dest.mkdir(parents=True, exist_ok=True)
        out_df.to_parquet(dest / "part-0.parquet", index=False)


In [44]:
# run tuning once (writes train_logs/global_best.json)
# best_params = tune_once()
best_params = get_best_params()
print(json.dumps(best_params, indent=2))


Exception ignored on calling ctypes callback function <bound method DataIter._next_wrapper of <__main__.ArrowMonthIter object at 0x12ff2fb10>>:
Traceback (most recent call last):
  File "/Users/gui.oba/Documents/Coding/FIAM/.venv/lib/python3.13/site-packages/xgboost/core.py", line 640, in _next_wrapper
    return self._handle_exception(lambda: int(self.next(input_data)), 0)
  File "/Users/gui.oba/Documents/Coding/FIAM/.venv/lib/python3.13/site-packages/xgboost/core.py", line 553, in _handle_exception
    return fn()
  File "/Users/gui.oba/Documents/Coding/FIAM/.venv/lib/python3.13/site-packages/xgboost/core.py", line 640, in <lambda>
    return self._handle_exception(lambda: int(self.next(input_data)), 0)
  File "/var/folders/23/bz30dzh9653db9qn7hz3ydqm0000gn/T/ipykernel_58234/3266367322.py", line 43, in next
  File "pyarrow/_dataset.pyx", line 3904, in _iterator
  File "pyarrow/_dataset.pyx", line 3497, in pyarrow._dataset.TaggedRecordBatchIterator.__next__
  File "<string>", line 1, 

XGBoostError: [17:12:16] /Users/runner/work/xgboost/xgboost/src/data/iterative_dmatrix.cc:99: Check failed: rbegin == Info().num_row_ (615992 vs. 616065) : 
Stack trace:
  [bt] (0) 1   libxgboost.dylib                    0x000000012e1d2254 dmlc::LogMessageFatal::~LogMessageFatal() + 124
  [bt] (1) 2   libxgboost.dylib                    0x000000012e382798 xgboost::data::IterativeDMatrix::InitFromCPU(xgboost::Context const*, xgboost::BatchParam const&, void*, float, std::__1::shared_ptr<xgboost::DMatrix>) + 2796
  [bt] (2) 3   libxgboost.dylib                    0x000000012e381884 xgboost::data::IterativeDMatrix::IterativeDMatrix(void*, void*, std::__1::shared_ptr<xgboost::DMatrix>, void (*)(void*), int (*)(void*), float, int, int, long long) + 788
  [bt] (3) 4   libxgboost.dylib                    0x000000012e324f90 xgboost::DMatrix* xgboost::DMatrix::Create<void*, void*, void (void*), int (void*)>(void*, void*, std::__1::shared_ptr<xgboost::DMatrix>, void (*)(void*), int (*)(void*), float, int, int, long long) + 152
  [bt] (4) 5   libxgboost.dylib                    0x000000012e1dc254 XGQuantileDMatrixCreateFromCallback + 520
  [bt] (5) 6   libffi.dylib                        0x00000001a2ef8050 ffi_call_SYSV + 80
  [bt] (6) 7   libffi.dylib                        0x00000001a2f00ae0 ffi_call_int + 1212
  [bt] (7) 8   _ctypes.cpython-313-darwin.so       0x000000011190f838 _ctypes_callproc + 940
  [bt] (8) 9   _ctypes.cpython-313-darwin.so       0x0000000111905480 PyCFuncPtr_call + 256



In [None]:
for year in range(OOS_START, OOS_END + 1):
    if not months_in(year):
        print(f"skip {year}: no data in processed_data2")
        continue
    bst = fit_for_oos_year(year)
    predict_year(bst, year, Path("oos_preds"))
    print(f"year {year}: prediction pass done")
