This code integrates CRSP data, datashare data, macroeconomic data, and risk-free interest rate data, and performs lag, monthly cross-sectional normalization, and data breach checks.

In [1]:
import os
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm

First, define a function to combine company characteristic factors with macroeconomic data.

In [2]:
def merge_firm_and_macro(firm_path: str, macro_path: str, output_path: str) -> pd.DataFrame:

    print(" Reading datashare.csv and macro_factors.parquet (low-memory mode)...")

    df_macro = pd.read_parquet(macro_path)
    df_macro["year"] = df_macro["month"].dt.year
    df_macro["month_num"] = df_macro["month"].dt.month

    Path(os.path.dirname(output_path)).mkdir(parents=True, exist_ok=True)

    chunksize = 200_000  
    merged_chunks = []
    total_rows = 0

    with pd.read_csv(firm_path, engine="python", encoding="utf-8", chunksize=chunksize) as reader:
        for chunk in tqdm(reader, desc="Merging firm chunks", unit="chunk"):
            total_rows += len(chunk)

            chunk["DATE"] = pd.to_datetime(chunk["DATE"].astype(str), format="%Y%m%d", errors="coerce")
            chunk["year"] = chunk["DATE"].dt.year
            chunk["month"] = chunk["DATE"].dt.month

            cols_keep = ["permno", "DATE", "sic2"]
            rename_dict = {col: f"c_{col}" for col in chunk.columns if col not in cols_keep + ["year", "month"]}
            chunk = chunk.rename(columns=rename_dict)

            merged = pd.merge(
                chunk, df_macro,
                left_on=["year", "month"],
                right_on=["year", "month_num"],
                how="left"
            ).drop(columns=["year", "month", "month_num"], errors="ignore")

            merged_chunks.append(merged)

    print(f"Concatenating {len(merged_chunks)} chunks ...")
    df_merged = pd.concat(merged_chunks, ignore_index=True)
    print(f"Merge completed ‚Äî total {df_merged.shape[0]:,} rows, {df_merged.shape[1]} columns.")

    df_merged.to_parquet(output_path, index=False)
    print(f" Saved merged dataset to: {output_path}")

    return df_merged


Next, we define a function to add the risk-free rate, using T-30 data downloaded from the CRSP database as the risk-free rate.

In [3]:
def add_rf_to_macro(parquet_path: str, t_bill_path: str) -> pd.DataFrame:
    print("Join the risk-free interest rate RF...")

    df = pd.read_parquet(parquet_path)
    t_bill = pd.read_csv(t_bill_path)
    t_bill = t_bill.rename(columns={"caldt": "DATE", "t30ret": "rf"})
    t_bill["DATE"] = pd.to_datetime(t_bill["DATE"], errors="coerce")

    df["DATE"] = pd.to_datetime(df["DATE"], errors="coerce")
    df["year"] = df["DATE"].dt.year
    df["month"] = df["DATE"].dt.month

    t_bill["year"] = t_bill["DATE"].dt.year
    t_bill["month"] = t_bill["DATE"].dt.month

    df = pd.merge(
        df,
        t_bill[["year", "month", "rf"]],
        on=["year", "month"],
        how="left"
    ).drop(columns=["year", "month"])

    if df["rf"].isna().any():
        missing = df["rf"].isna().sum()
        print(f"There are {missing:,} missing rf values, which are filled with previous values.")
        df["rf"] = df["rf"].fillna(method="ffill")

    df.to_parquet(parquet_path, index=False)
    print(f"Written to {parquet_path} | shape={df.shape}")
    return df


In this stage, we define the core dependent variable 
ùë¶
y‚Äîthe excess return‚Äîand perform lag adjustments and monthly cross-sectional normalization on firm-level characteristics according to their information release frequency. Following the factor construction framework provided by the original authors, firm characteristics are categorized into annual, quarterly, and monthly variables.

Specifically, the excess return 
ùëü
ùë°
+
1
r
t+1
	‚Äã

 is defined as the stock return minus the risk-free rate for the same period, and it is shifted upward by one month (shift(-1)) so that each month‚Äôs feature set corresponds to the next month‚Äôs excess return. Under this structure, variables of different reporting frequencies are aligned according to their respective disclosure lags:

Annual variables are lagged by six months (
ùëö
+
6
m+6);

Quarterly variables are lagged by four months (
ùëö
+
4
m+4);

Monthly variables are contemporaneous and thus use zero-month lag (
ùëö
+
0
m+0).

To ensure comparability across firms, we then conduct monthly cross-sectional normalization. Within each month, non-missing observations of every feature are ranked and linearly scaled to the range 
[
‚àí
1
,
1
]
[‚àí1,1]. Missing values are imputed using the cross-sectional median. To simplify implementation and guarantee that imputed values correspond to the median rank, missing entries are directly set to zero.

In [4]:
def build_features(compustat_macro_path: str, crsp_csv_path: str, out_parquet_path: str) -> pd.DataFrame:

    print(" Building feature data...")

    # === 1) Load data ===
    comp = pd.read_parquet(compustat_macro_path)
    crsp = pd.read_csv(crsp_csv_path)

    comp["DATE"] = pd.to_datetime(comp["DATE"], errors="coerce")
    crsp["date"] = pd.to_datetime(crsp["date"], errors="coerce")
    crsp = crsp.sort_values(["permno", "date"]).reset_index(drop=True)

    # === 2) Compute rf on CRSP calendar & ret_excess inside function ===
    # rf_calendar from comp (month-end), merged onto CRSP by date
    rf_calendar = (
        comp.loc[:, ["DATE", "rf"]]
            .drop_duplicates(subset=["DATE"])
            .rename(columns={"DATE": "date"})
    )
    crsp = crsp.merge(rf_calendar, on="date", how="left")

    # robust numeric conversions
    crsp["ret"] = pd.to_numeric(crsp.get("ret"), errors="coerce")
    crsp["dlret"] = pd.to_numeric(crsp.get("dlret"), errors="coerce").fillna(0.0)

    crsp["ret_with_dlret"] = (1.0 + crsp["ret"].fillna(0.0)) * (1.0 + crsp["dlret"]) - 1.0
    crsp["ret_excess"] = crsp["ret_with_dlret"] - crsp["rf"]

    # target y = r_{t+1} excess return
    crsp["ret_excess_t_plus_1"] = crsp.groupby("permno")["ret_excess"].shift(-1)

    # === 3) Define variable groups ===
    quarterly_names = [
        "aeavol", "cash", "chtx", "cinvest", "ear", "roaq", "roavol", "roeq",
        "rsup", "stdacc", "stdcf", "ms", "nincr"
    ]
    monthly_names_c = [
        "baspread", "beta", "betasq", "chmom", "dolvol", "pricedelay", "retvol",
        "std_dolvol", "std_turn", "turn", "zerotrade", "idiovol", "ill", "indmom",
        "maxret", "mom12m", "mom1m", "mom36m", "mom6m", "mvel1"
    ]
    monthly_names_m = ["m_d/p", "m_e/p", "m_b/m", "m_ntis", "m_tbl", "m_tms", "m_dfr", "m_svar"]

    all_c_cols = [c for c in comp.columns if c.startswith("c_")]
    sic_col = "sic2" if "sic2" in comp.columns else None

    c_quarterly_cols = [f"c_{x}" for x in quarterly_names if f"c_{x}" in comp.columns]
    c_monthly_cols   = [f"c_{x}" for x in monthly_names_c if f"c_{x}" in comp.columns]
    c_annual_cols    = sorted(list(set(all_c_cols) - set(c_quarterly_cols) - set(c_monthly_cols)))

    # === 4) Build lag keys & trim right tables to avoid DATE collisions ===
    annual = comp[["permno", "DATE"] + c_annual_cols + ([sic_col] if sic_col else [])].copy()
    annual["DATE_LAG_A"] = annual["DATE"] + pd.DateOffset(months=6)
    right_annual = annual[["permno", "DATE_LAG_A"] + c_annual_cols + ([sic_col] if sic_col else [])]

    quarterly = comp[["permno", "DATE"] + c_quarterly_cols].copy()
    quarterly["DATE_LAG_Q"] = quarterly["DATE"] + pd.DateOffset(months=4)
    right_quarterly = quarterly[["permno", "DATE_LAG_Q"] + c_quarterly_cols]

    m_firm = comp[["permno", "DATE"] + c_monthly_cols].copy()
    m_firm["DATE_LAG_M"] = m_firm["DATE"]
    right_mfirm = m_firm[["permno", "DATE_LAG_M"] + c_monthly_cols]

    m_macro = comp[["permno", "DATE"] + monthly_names_m].copy()
    m_macro["DATE_LAG_M"] = m_macro["DATE"]
    right_mmacro = m_macro[["permno", "DATE_LAG_M"] + monthly_names_m]

    # === 5) Base CRSP panel used for merging ===
    crsp_base = crsp[["permno", "date", "ret_with_dlret", "ret_excess", "ret_excess_t_plus_1", "rf"]].copy()

    # === 6) Merge per permno with asof (backward) ===
    merged_list = []
    for pid, sub in tqdm(crsp_base.groupby("permno"), desc="‚è≥ merging", total=crsp_base["permno"].nunique()):
        merged = sub.sort_values("date").copy()

        ann_sub = right_annual.query("permno == @pid")
        if not ann_sub.empty:
            merged = pd.merge_asof(
                merged, ann_sub.sort_values("DATE_LAG_A"),
                left_on="date", right_on="DATE_LAG_A", by="permno", direction="backward"
            )

        qtr_sub = right_quarterly.query("permno == @pid")
        if not qtr_sub.empty:
            merged = pd.merge_asof(
                merged, qtr_sub.sort_values("DATE_LAG_Q"),
                left_on="date", right_on="DATE_LAG_Q", by="permno", direction="backward"
            )

        mfirm_sub = right_mfirm.query("permno == @pid")
        if not mfirm_sub.empty:
            merged = pd.merge_asof(
                merged, mfirm_sub.sort_values("DATE_LAG_M"),
                left_on="date", right_on="DATE_LAG_M", by="permno", direction="backward"
            )

        mmacro_sub = right_mmacro.query("permno == @pid")
        if not mmacro_sub.empty:
            merged = pd.merge_asof(
                merged, mmacro_sub.sort_values("DATE_LAG_M"),
                left_on="date", right_on="DATE_LAG_M", by="permno", direction="backward"
            )

        merged_list.append(merged)

    df = pd.concat(merged_list, ignore_index=True)
    df["month"] = df["date"].dt.to_period("M")

    # === 7) Rank-based cross-sectional normalization per month (your logic) ===
    feature_cols = [c for c in df.columns if c.startswith(("c_", "m_"))]

    def normalize_rank(g: pd.DataFrame) -> pd.DataFrame:
        for col in feature_cols:
            s = g[col]
            if s.isna().all():
                continue
            valid = s.dropna()
            n = len(valid)
            if n < 2:
                continue
            ranks = valid.rank(method="first")
            scaled = 2 * (ranks / n) - 1
            g.loc[valid.index, col] = scaled
            g.loc[s.isna(), col] = 0.0
        return g

    df = df.groupby("month", group_keys=False).apply(normalize_rank)

    # === 8) Save ===
    Path(os.path.dirname(out_parquet_path)).mkdir(parents=True, exist_ok=True)
    df.to_parquet(out_parquet_path, index=False)
    print(f"Features built and saved to {out_parquet_path} | shape={df.shape}")
    return df


Next, we construct a function to check for data leaks. We read the feature and revenue sources and set the information release time for features on different platforms: Annual variables ‚Äì 6 months lag; Quarterly variables ‚Äì 4 months lag; Monthly variables ‚Äì 0 months lag (one month lag relative to r_t+1).

Then, we check the data for each stock month by month, ensuring that the release time of all features is ‚â§ the current revenue time.

If any feature release time is found to be later than the revenue time, a data leak is determined.

We output a text report; if no violations are found, we print "All good".

In [5]:
def leakage_check(crsp_csv: str, comp_parquet: str, report_dir: str = "results"):
    print("Running leakage self-check ...")
    REPORT_DIR = Path(report_dir)
    REPORT_DIR.mkdir(parents=True, exist_ok=True)

    comp = pd.read_parquet(comp_parquet)
    comp["DATE"] = pd.to_datetime(comp["DATE"], errors="coerce")
    crsp = pd.read_csv(crsp_csv)
    crsp["date"] = pd.to_datetime(crsp["date"], errors="coerce")

    rf_calendar = comp.loc[:, ["DATE", "rf"]].drop_duplicates().rename(columns={"DATE": "date"})
    crsp = crsp.merge(rf_calendar, on="date", how="left")
    crsp["ret_excess_t_plus_1"] = crsp.groupby("permno")["ret"].shift(-1)

    quarterly_names = [
        'aeavol', "cash", "chtx", 'cinvest', "ear", "roaq", "roavol", "roeq",
        "rsup", 'stdacc', "stdcf", "ms", "nincr"
    ]
    monthly_names_c = [
        'baspread', "beta", "betasq", "chmom", "dolvol", "pricedelay", "retvol",
        'std_dolvol', "std_turn", "turn", "zerotrade", "idiovol", "ill", "indmom",
        'maxret', "mom12m", "mom1m", "mom36m", "mom6m", "mvel1"
    ]
    all_c_cols = [c for c in comp.columns if str(c).startswith("c_")]
    c_annual_cols = sorted(list(set(all_c_cols) - set([f"c_{n}" for n in quarterly_names]) - set([f"c_{n}" for n in monthly_names_c])))

    annual = comp[["permno", "DATE"] + c_annual_cols].copy()
    annual["RELEASE_A"] = annual["DATE"] + pd.DateOffset(months=6)
    qtr = comp[["permno", "DATE"] + [f"c_{n}" for n in quarterly_names]].copy()
    qtr["RELEASE_Q"] = qtr["DATE"] + pd.DateOffset(months=4)
    m_firm = comp[["permno", "DATE"] + [f"c_{n}" for n in monthly_names_c]].copy()
    m_firm["RELEASE_M"] = m_firm["DATE"]

    violations = {"annual": 0, "quarterly": 0, "monthly": 0}
    checked_rows = 0

    for pid in tqdm(crsp["permno"].unique(), desc="checking"):
        sub = crsp[crsp["permno"] == pid][["permno", "date", "ret_excess_t_plus_1"]]
        ann_sub, qtr_sub, m_sub = annual.query("permno == @pid"), qtr.query("permno == @pid"), m_firm.query("permno == @pid")

        merged = sub.sort_values("date")
        for dset, col, rule in [(ann_sub, "RELEASE_A", "annual"), (qtr_sub, "RELEASE_Q", "quarterly"), (m_sub, "RELEASE_M", "monthly")]:
            if not dset.empty:
                merged = pd.merge_asof(merged, dset.sort_values(col), left_on="date", right_on=col, by="permno", direction="backward")
                violations[rule] += int(((merged[col].notna()) & (merged[col] > merged["date"])).sum())
        checked_rows += len(merged)

    report_path = Path(REPORT_DIR) / "leakage_report.txt"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write(f"Leakage Check Report\nTotal rows: {checked_rows}\n")
        for k, v in violations.items():
            f.write(f"{k}: {v}\n")

    print(f" Leakage report saved: {report_path}")
    if any(v > 0 for v in violations.values()):
        raise AssertionError(f"Leakage check failed: {violations}")
    else:
        print("All good: no look-ahead leakage detected.")

Construct the main function and execute it.

In [6]:
def main():
    FIRM_CSV = "../data/raw/datashare.csv"
    MACRO_PARQUET = "../data/raw/macro_factors.parquet"
    TBILL_CSV = "../data/raw/t_bill.csv"
    CRSP_CSV = "../data/raw/crsp_monthly_1957_2021.csv"
    MERGED_PARQUET = "../data/processed/datashare_with_macro.parquet"
    FEATURES_PARQUET = "../data/processed/features.parquet"

    Path("data/processed").mkdir(parents=True, exist_ok=True)
    Path("results").mkdir(parents=True, exist_ok=True)

    merge_firm_and_macro(FIRM_CSV, MACRO_PARQUET, MERGED_PARQUET)
    add_rf_to_macro(MERGED_PARQUET, TBILL_CSV)
    build_features(MERGED_PARQUET, CRSP_CSV, FEATURES_PARQUET)
    leakage_check(CRSP_CSV, MERGED_PARQUET, "results")

    print("The entire process is complete; features.parquet has been generated and passed the leakage check.")


if __name__ == "__main__":
    main()

 Reading datashare.csv and macro_factors.parquet (low-memory mode)...


Merging firm chunks: 21chunk [07:39, 21.88s/chunk]


Concatenating 21 chunks ...
Merge completed ‚Äî total 4,117,300 rows, 107 columns.
 Saved merged dataset to: ../data/processed/datashare_with_macro.parquet
Join the risk-free interest rate RF...
There are 355,161 missing rf values, which are filled with previous values.


  df["rf"] = df["rf"].fillna(method="ffill")


Written to ../data/processed/datashare_with_macro.parquet | shape=(4117300, 108)
 Building feature data...


‚è≥ merging: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 32791/32791 [2:03:53<00:00,  4.41it/s]
  df = df.groupby("month", group_keys=False).apply(normalize_rank)


Features built and saved to ../data/processed/features.parquet | shape=(4353483, 114)
Running leakage self-check ...


checking: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 32791/32791 [1:50:47<00:00,  4.93it/s]


 Leakage report saved: results\leakage_report.txt
All good: no look-ahead leakage detected.
The entire process is complete; features.parquet has been generated and passed the leakage check.
