# **`README.md`**

# Algorithmic Monitoring: Measuring Market Stress with Machine Learning

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2602.07066-b31b1b.svg)](https://arxiv.org/abs/2602.07066)
[![Journal](https://img.shields.io/badge/Journal-ArXiv%20Preprint-003366)](https://arxiv.org/abs/2602.07066)
[![Year](https://img.shields.io/badge/Year-2026-purple)](https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning)
[![Discipline](https://img.shields.io/badge/Discipline-Financial%20Econometrics%20%7C%20Machine%20Learning-00529B)](https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning)
[![Data Sources](https://img.shields.io/badge/Data-CRSP%20%7C%20WRDS-lightgrey)](https://wrds-www.wharton.upenn.edu/)
[![Core Method](https://img.shields.io/badge/Method-Lasso--Logit%20%7C%20Expanding%20Window-orange)](https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning)
[![Analysis](https://img.shields.io/badge/Analysis-Probabilistic%20Forecasting-red)](https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning)
[![Validation](https://img.shields.io/badge/Validation-Brier%20Score%20%7C%20ECE%20%7C%20AUC-green)](https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning)
[![Robustness](https://img.shields.io/badge/Robustness-Block%20Bootstrap%20%7C%20Nonlinear%20Benchmarks-yellow)](https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%230C55A5.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![YAML](https://img.shields.io/badge/YAML-%23CB171E.svg?style=flat&logo=yaml&logoColor=white)](https://yaml.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![Open Source](https://img.shields.io/badge/Open%20Source-%E2%9D%A4-brightgreen)](https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning)

**Repository:** `https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning`

**Owner:** 2026 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2026 paper entitled **"Algorithmic Monitoring: Measuring Market Stress with Machine Learning"** by:

*   **Marc Schmitt** (University of Oxford)

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from the ingestion and rigorous validation of CRSP micro-data to the construction of the Market Stress Probability Index (MSPI) using L1-regularized logistic regression, culminating in comprehensive out-of-sample evaluation and structural economic analysis.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `execute_complete_mspi_research_pipeline`](#key-callable-execute_complete_mspi_research_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Schmitt (2026). The core of this repository is the iPython Notebook `measuring_market_stress_with_machine_learning_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline addresses the critical challenge of **real-time market stress monitoring** in modern algorithmic markets, where fragility manifests in the cross-section of returns and trading activity.

The paper proposes the **Market Stress Probability Index (MSPI)**, a forward-looking probability measure constructed from interpretable cross-sectional fragility signals. This codebase operationalizes the proposed solution:
-   **Validates** data integrity using strict schema checks and temporal consistency enforcement.
-   **Engineers** cross-sectional fragility features (moments, tails, liquidity) from high-frequency daily data.
-   **Learns** a sparse probability mapping using Lasso-Logit in a strict real-time expanding window.
-   **Evaluates** performance via proper scoring rules (Brier, Log Loss), calibration diagnostics, and robustness horse races against nonlinear learners (Random Forest, Gradient Boosting).

## Theoretical Background

The implemented methods combine techniques from Financial Econometrics, Machine Learning, and Probabilistic Forecasting.

**1. Cross-Sectional Fragility Signals:**
Stress leaves a footprint in the cross-section of returns. The model aggregates daily statistics into monthly features $X_t$:
$$ Z_t \equiv \frac{1}{D_t} \sum_{d \in t} z_d $$
where $z_d$ includes dispersion $\sigma^{xs}_d$, skewness, kurtosis, and tail participation $\text{Frac}^{dn}_d$.

**2. Latent Stress Definition:**
A month is labeled as stress ($S_t=1$) if market returns crash or realized volatility spikes relative to a dynamic history:
$$ S_t \equiv \mathbb{I} \{ R^{mkt}_t \le c_R \} \lor \mathbb{I} \{ \sigma^{mkt}_t \ge q_{t-1}(\alpha) \} $$

**3. Sparse Probability Modeling (Lasso-Logit):**
The probability of future stress is modeled using L1-regularized logistic regression to ensure parsimony and interpretability:
$$ MSPI_t \equiv \Pr(Y_{t+1} = 1 | X_t) = \Lambda(\beta_0 + X'_t\beta) $$
$$ (\hat{\beta}_0, \hat{\beta}) \in \arg\min_{\beta_0,\beta} \left\{ - \ell(\beta) + \lambda\|\beta\|_1 \right\} $$

**4. Real-Time Discipline:**
The pipeline enforces a strict **expanding window** protocol. At time $t$, the model is trained only on information available up to $t$, preventing look-ahead bias in both feature engineering (standardization) and model estimation.

## Features

The provided iPython Notebook (`measuring_market_stress_with_machine_learning_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The pipeline is decomposed into 22 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters (grids, splits, hyperparameters) are managed in an external `config.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks schema integrity, temporal monotonicity, and return plausibility.
-   **Deterministic Execution:** Enforces reproducibility through seed control, strict causality checks, and frozen parameter sets.
-   **Comprehensive Audit Logging:** Generates detailed logs of every processing step, including invariant checks and benchmark comparisons.
-   **Reproducible Artifacts:** Generates structured results containing raw time-series, aggregated metrics, and economic analysis outputs.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Configuration & Validation (Task 1):** Loads and validates the study configuration, enforcing parameter constraints and reproduction modes.

2.  **Data Ingestion & Cleansing (Tasks 2-5):** Validates CRSP micro and macro schema, enforces strict monotonicity, handles missingness, and aligns calendars.

3.  **Universe Construction (Task 6):** Filters for common shares on major exchanges with price > $1.

4.  **Feature Engineering (Tasks 7-10):** Computes daily cross-sectional moments, tail measures, and trading proxies, aggregating them to monthly frequency.

5.  **Target Construction (Tasks 11-12):** Computes market aggregates and constructs the binary stress label $S_t$ using dynamic volatility quantiles.

6.  **Standardization (Task 13):** Implements expanding-window z-scoring to prevent data leakage.

7.  **Hyperparameter Tuning (Task 14):** Optimizes regularization parameters ($\lambda$) using time-series cross-validation in the initial window.

8.  **Forecasting (Tasks 15-16):** Generates out-of-sample probability forecasts for MSPI (Lasso) and the Benchmark (Ridge).

9.  **Robustness (Task 18):** Runs a horse race against Random Forest and Gradient Boosting models with real-time Platt calibration.

10. **Evaluation (Tasks 19-21):** Computes AUC, PR-AUC, Brier Score, Log Loss, ECE, and performs block-bootstrap inference.

11. **Economic Analysis (Task 22):** Estimates predictive regressions for volatility and impulse response functions via local projections.

12. **Final Orchestration (Task 22-Plus):** Manages the end-to-end flow and enforces anti-look-ahead safety checks.

## Core Components (Notebook Structure)

The notebook is structured as a logical pipeline with modular orchestrator functions for each of the 22 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `execute_complete_mspi_research_pipeline`

The project is designed around a single, top-level user-facing interface function:

-   **`execute_complete_mspi_research_pipeline`:** This master orchestrator function runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, managing data flow between validation, feature engineering, modeling, evaluation, and economic analysis modules.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `pyyaml`, `scikit-learn`, `statsmodels`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning.git
    cd measuring_market_stress_with_machine_learning
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy pyyaml scikit-learn statsmodels
    ```

## Input Data Structure

The pipeline requires two primary DataFrames:

1.  **`df_crsp_daily` (Micro-Data)**:
    -   `PERMNO` (Int64): Security identifier.
    -   `DATE` (datetime64[ns]): Trading date.
    -   `SHRCD`, `EXCHCD` (Int64): Share and exchange codes.
    -   `PRC`, `RET`, `VOL`, `SHROUT` (float64): Price, return, volume, shares outstanding.

2.  **`df_crsp_index` (Macro-Data)**:
    -   `DATE` (datetime64[ns]): Trading date.
    -   `vwretd` (float64): Value-weighted market return.

*Note: The pipeline includes a synthetic data generator for testing purposes if access to CRSP is unavailable.*

## Usage

The notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell, which demonstrates how to use the top-level `execute_complete_mspi_research_pipeline` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load the master configuration from the YAML file.
    # (Assumes config.yaml is in the working directory)
    with open("config.yaml", "r") as f:
        config = yaml.safe_load(f)
    
    # 2. Load raw datasets (Example using synthetic generator provided in the notebook)
    # In production, load from CSV/Parquet: pd.read_csv(...)
    df_daily, df_index = generate_synthetic_crsp_data()

    # 3. Execute the entire replication study.
    results = execute_complete_mspi_research_pipeline(df_daily, df_index, config)
    
    # 4. Access results
    print(results["mspi_forecasts"].head())
```

## Output Structure

The pipeline returns a dictionary containing:
-   **`mspi_forecasts`**: DataFrame of out-of-sample stress probabilities ($MSPI_t$) and targets.
-   **`discrimination_metrics`**: DataFrame comparing AUC and PR-AUC across models.
-   **`probability_metrics`**: DataFrame with Brier Score, Log Loss, and ECE.
-   **`economic_analysis`**: Dictionary containing predictive regression stats and IRF DataFrames.
-   **`audit_log`**: A detailed record of data cleansing stats, universe counts, and hyperparameter choices.

## Project Structure

```
measuring_market_stress_with_machine_learning/
│
├── measuring_market_stress_with_machine_learning_draft.ipynb   # Main implementation notebook
├── config.yaml                                                 # Master configuration file
├── requirements.txt                                            # Python package dependencies
│
├── LICENSE                                                     # MIT Project License File
└── README.md                                                   # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify study parameters such as:
-   **Stress Definition:** `vol_quantile_alpha` (e.g., 0.90 or 0.95), `return_cutoff_c_R`.
-   **Feature Engineering:** `tail_threshold_tau` (e.g., 0.05).
-   **Learning Protocol:** `initial_training_window_months`, `cv_n_splits`.
-   **Models:** Hyperparameter grids for Lasso, Ridge, RF, and GB.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Alternative Stress Definitions:** Incorporating liquidity or credit spreads into the stress label.
-   **Intraday Features:** Using high-frequency TAQ data for realized measures.
-   **Deep Learning Models:** Benchmarking against LSTM or Transformer architectures (with proper calibration).
-   **International Markets:** Extending the analysis to global equity markets.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{schmitt2026algorithmic,
  title={Algorithmic Monitoring: Measuring Market Stress with Machine Learning},
  author={Schmitt, Marc},
  journal={arXiv preprint arXiv:2602.07066},
  year={2026}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2026). Algorithmic Monitoring: Measuring Market Stress with Machine Learning: An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/measuring_market_stress_with_machine_learning
```

## Acknowledgments

-   Credit to **Marc Schmitt** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, Scikit-Learn, and Statsmodels**.

--

*This README was generated based on the structure and content of the `measuring_market_stress_with_machine_learning_draft.ipynb` notebook and follows best practices for research software documentation.*

# Paper

Title: "*Algorithmic Monitoring: Measuring Market Stress with Machine Learning*"

Authors: Marc Schmitt

E-Journal Submission Date: 5 February 2026

Link: https://arxiv.org/abs/2602.07066

Modified Abstract:

The author constructs a Market Stress Probability Index (MSPI) that estimates the probability of high stress in the U.S. equity market one month ahead using information from the cross-section of individual stocks. Using CRSP daily data, each month is summarized by a set of interpretable cross-sectional fragility signals and mapped into a forward-looking stress probability via an L1-regularized logistic regression in a real-time expanding-window design. Out of sample, MSPI tracks major stress episodes and improves discrimination and accuracy relative to a parsimonious benchmark based on lagged market return and realized volatility, delivering calibrated stress probabilities on an economically meaningful scale. Further, I illustrate how MSPI can be used as a probability-based measurement object in financial econometrics. The resulting index provides a transparent and easily updated measure of near-term equity-market stress risk.

# Summary

### **Executive Summary**
This paper introduces the **Market Stress Probability Index (MSPI)**, a novel financial-econometric instrument designed to estimate the one-month-ahead probability of a high-stress regime in U.S. equity markets. Departing from composite indices that rely on multi-asset data or option-implied volatility, MSPI is an **equity-only** measure derived strictly from the cross-sectional statistics of individual stock returns (CRSP data).

The author employs an **L1-regularized logistic regression (Lasso-logit)** within a strict real-time expanding-window protocol. The central empirical finding is that this parsimonious, interpretable linear mapping outperforms a benchmark of lagged market aggregates and remains highly competitive against flexible non-linear learners (Random Forests and Gradient Boosting) in terms of discrimination and probability calibration. The paper frames this approach as **"Algorithmic Monitoring"**: the use of transparent statistical learning to compress high-dimensional market microstructure signals into actionable, calibrated state variables.

### **The Econometric Challenge: Monitoring in Algorithmic Markets**
The paper posits that the structural transformation of markets—characterized by high-frequency algorithmic trading and electronic liquidity provision—requires a commensurate shift in monitoring technologies.
*   **The Problem:** Financial stress is a latent state. Traditional manual oversight struggles to aggregate weak, noisy signals across thousands of securities in real time. Furthermore, stress often arises from endogenous feedback loops (liquidity withdrawal, correlated deleveraging) rather than solely macroeconomic fundamentals.
*   **The Objective:** To construct a measurement object that is (i) strictly real-time (no look-ahead bias), (ii) interpretable, (iii) reproducible using standard data, and (iv) output as a well-calibrated probability rather than an arbitrary index level.

### **Data Construction and Feature Engineering**
The study utilizes CRSP daily stock data for U.S. common stocks. The core hypothesis is that stress leaves a distinctive "footprint" in the cross-section of returns before it manifests in aggregate market crashes.

**The Feature Set ($X_t$):**
The model aggregates daily cross-sectional statistics into monthly predictors, categorized into three fragility signals:
1.  **Distributional Moments:** Cross-sectional dispersion ($\sigma^{xs}$), skewness, and kurtosis. (Logic: Stress correlates with return fragmentation and fat tails).
2.  **Tail Participation:** The fraction of stocks experiencing daily returns $\le -5\%$ or $\ge +5\%$.
3.  **Trading Intensity:** Average log volume, dollar volume, and turnover.

**The Target Variable ($Y_{t+1}$):**
A month is labeled as a "Stress" month ($S_t=1$) if:
*   The CRSP value-weighted market return is $\le -5\%$; **OR**
*   Realized market volatility exceeds its historical 90th percentile ($q_{t-1}(0.90)$).
*   *Crucial Note:* The volatility threshold is expanding and computed only on past data to ensure the label itself is available in real time.

### **The Learning Protocol**
The paper treats statistical learning as a measurement tool rather than a "black box" forecasting engine.

*   **Primary Model:** L1-regularized Logistic Regression (Lasso-logit). The $L_1$ penalty induces sparsity, selecting only the most relevant cross-sectional signals.
    $$ \text{MSPI}_t \equiv \text{Pr}(Y_{t+1}=1 | X_t) = \Lambda(\beta_0 + X_t'\beta) $$
*   **Benchmarks:**
    1.  **Ridge-Logit Benchmark:** Uses only lagged aggregate market return and realized volatility.
    2.  **Non-Linear ML:** Random Forests (RF) and Gradient Boosted Trees (GB).
*   **Evaluation Protocol:**
    *   **Expanding Window:** Models are re-trained monthly.
    *   **Hyperparameter Tuning:** Performed on an initial 120-month training set, then fixed to avoid implicit look-ahead.
    *   **Calibration:** The non-linear models (RF, GB) are subjected to **Platt scaling** (real-time calibration) to ensure their outputs are valid probabilities, not just ranking scores.

### **Empirical Performance (Out-of-Sample 2005–2024)**
The evaluation focuses on both discrimination (ranking) and calibration (probability accuracy).

*   **Discrimination (AUC & PR-AUC):**
    *   **MSPI (Lasso-logit):** AUC **0.800** / PR-AUC **0.538**.
    *   **Benchmark:** AUC 0.752 / PR-AUC 0.444.
    *   **Gradient Boosting:** AUC 0.756.
    *   **Random Forest:** AUC 0.727.
    *   *Result:* The sparse linear model outperforms the benchmark and the non-linear learners. This suggests that in monthly macro-finance data (low signal-to-noise), the bias reduction from non-linear models is outweighed by their estimation variance.

*   **Probability Accuracy (Proper Scoring Rules):**
    *   **Brier Score:** MSPI achieves the lowest (best) score of **0.106**, compared to 0.116 for the benchmark.
    *   **Calibration:** MSPI shows a low Expected Calibration Error (ECE) of **0.062**. The predicted probabilities closely match realized stress frequencies.

*   **Time-Series Dynamics:**
    *   MSPI correctly identifies precursors to the 2008 Global Financial Crisis and the 2020 COVID crash.
    *   Compared to the "jagged" predictions of Random Forests, MSPI provides a smoother, more stable state variable suitable for monitoring.

### **Economic Interpretation and Utility**
The paper demonstrates that MSPI is not merely a binary classifier but a continuous state variable for financial econometrics.

*   **Predictive Content:** Higher MSPI levels strongly predict next-month realized volatility and downside tail risk (crashes).
*   **Stress-Risk Innovations:** The author proposes using local projections to study shocks. By orthogonalizing MSPI against lagged market data ($u_t$), one can isolate "news" regarding stress risk.
    $$ \text{MSPI}_t = \delta_0 + \delta_1 \text{MSPI}_{t-1} + \Delta' Z_{t-1} + u_t $$
*   **Transparency vs. Complexity:** The results validate the "transparency-flexibility trade-off." In this domain, a transparent, linear aggregation of cross-sectional fragility signals is superior to opaque non-linear models, satisfying the regulatory and governance requirements for "Algorithmic Monitoring."

### **Conclusion**
Schmitt successfully argues that the cross-section of equity returns contains rich, latent information about future market stress that is not captured by aggregate market history alone. The **MSPI** serves as a robust, reproducible, and economically interpretable "thermometer" for market fragility, proving that in financial time-series forecasting, disciplined regularization (Lasso) often trumps model complexity.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  Algorithmic Monitoring: Measuring Market Stress with Machine Learning
#
#  This module provides a complete, production-grade implementation of the
#  Market Stress Probability Index (MSPI) framework presented in "Algorithmic
#  Monitoring: Measuring Market Stress with Machine Learning" by Marc Schmitt
#  (2026). It delivers a computationally tractable system for real-time,
#  equity-only stress monitoring, enabling robust, state-dependent risk
#  assessment, probabilistic forecasting, and structural analysis of market
#  fragility using high-dimensional cross-sectional signals.
#
#  Core Methodological Components:
#  • Cross-sectional fragility signal extraction (moments, tails, liquidity)
#  • Real-time expanding-window stress labeling via dynamic volatility quantiles
#  • L1-regularized logistic regression (Lasso-Logit) for sparse probability modeling
#  • Strict real-time discipline (no look-ahead bias) in feature engineering and training
#  • Robustness benchmarking against nonlinear learners (Random Forest, Gradient Boosting)
#  • Probability calibration via Platt scaling for reliable risk estimates
#  • Block-bootstrap inference for rigorous out-of-sample performance comparison
#  • Structural economic analysis via stress-risk innovations and local projections
#
#  Technical Implementation Features:
#  • Vectorized processing of large-scale CRSP daily micro-data
#  • Deterministic deduplication and type coercion for financial time series
#  • Expanding-window standardization and hyperparameter tuning
#  • Comprehensive audit logging for data lineage and pipeline integrity
#  • Integration with statsmodels for HAC-robust econometric inference
#  • Modular architecture facilitating component reuse and testing
#
#  Paper Reference:
#  Schmitt, M. (2026). Algorithmic Monitoring: Measuring Market Stress with
#  Machine Learning. arXiv preprint arXiv:2602.07066.
#  https://arxiv.org/abs/2602.07066
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

import logging
from typing import Any, Dict, List, Optional, Tuple, Union, Set, Callable

import numpy as np
import pandas as pd
import statsmodels.api as sm
from pandas.api.types import is_datetime64_any_dtype, is_numeric_dtype, is_integer_dtype
from scipy.special import expit
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    average_precision_score,
    brier_score_loss,
    log_loss,
    roc_auc_score,
)


# Implementation

# Draft 1

## **Discussion of the Inputs, Processes and Outputs of Key Callables**

Here is the granular, step-by-step assessment and documentation of the 22 task-specific orchestrator callables that constitute the complete "Algorithmic Monitoring" research pipeline.

### 1. `validate_study_config` (Task 1 Orchestrator)
*   **Inputs:** `raw_study_config` (Dictionary).
*   **Processes:** Injects strict drift-control defaults (e.g., `realized_vol_ddof`, `vol_quantile_interpolation`), validates the presence of all required schema keys, asserts the six mandatory anti-look-ahead constraints, and logs warnings for unspecified optional fields.
*   **Outputs:** `validated_config` (Dictionary), `validation_report` (Dictionary).
*   **Data Transformation:** Augments the input dictionary with explicit technical parameters and evaluates boolean logic across nested dictionary paths to generate a structured validation report.
*   **Role in Research Pipeline:** Enforces the foundational methodological guardrails. It ensures the pipeline strictly adheres to the "expanding window, no look-ahead, reproducible updating" protocol mandated in Box 2 of the LaTeX context.

### 2. `validate_crsp_daily` (Task 2 Orchestrator)
*   **Inputs:** `df_crsp_daily` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Verifies the presence and data types of required columns (`PERMNO`, `DATE`, `SHRCD`, `EXCHCD`, `PRC`, `RET`, `VOL`, `SHROUT`), checks value plausibility (e.g., non-negative volume, valid share codes), and computes missingness statistics.
*   **Outputs:** Validation report (Dictionary).
*   **Data Transformation:** Applies read-only vectorized boolean masking and aggregation functions (`.isna().sum()`, `.nunique()`) to generate summary statistics without mutating the underlying micro-data.
*   **Role in Research Pipeline:** Validates the integrity of the "CRSP daily stock data accessed via WRDS for U.S. common stocks," ensuring the raw inputs can support the rigorous cross-sectional computations required downstream.

### 3. `validate_crsp_index` (Task 3 Orchestrator)
*   **Inputs:** `df_crsp_daily` (DataFrame), `df_crsp_index` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Validates the schema of the index data, normalizes dates to midnight to verify temporal coverage against the micro-data, and checks the magnitude of `vwretd` to ensure decimal unit compliance.
*   **Outputs:** Validation report (Dictionary).
*   **Data Transformation:** Performs read-only date normalization and set-difference operations (`np.setdiff1d`) to identify calendar gaps between the micro and macro datasets.
*   **Role in Research Pipeline:** Ensures the "CRSP daily value-weighted market index return" is structurally sound and temporally aligned, which is a strict prerequisite for constructing the stress label $S_t$.

### 4. `cleanse_crsp_daily` (Task 4 Orchestrator)
*   **Inputs:** `df_crsp_daily` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Performs deterministic deduplication (stable sort by `PERMNO` and `DATE`, keeping the last observation), coerces target columns to numeric types, creates an absolute price column, drops rows with missing returns or prices, and normalizes dates to create a canonical `year_month` key.
*   **Outputs:** `df_clean` (DataFrame), `audit_log` (Dictionary).
*   **Data Transformation:** Transforms the raw panel via row-filtering (`dropna`), type casting (`to_numeric`), and column mutation ($|PRC|$), fundamentally altering the shape and type-safety of the data structure.
*   **Role in Research Pipeline:** Implements the data preparation requirements: "(iii) non-missing returns and prices." It explicitly handles the CRSP signed-price convention by computing $|P_{i,d}|$ for subsequent microstructure filtering.

### 5. `cleanse_crsp_index` (Task 5 Orchestrator)
*   **Inputs:** `df_crsp_daily_clean` (DataFrame), `df_crsp_index` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Deduplicates index rows, coerces `vwretd` to numeric, drops missing observations, normalizes dates, creates the `year_month` key, and asserts that every month in the micro-data exists in the index data.
*   **Outputs:** `df_index_clean` (DataFrame), `audit_log` (Dictionary).
*   **Data Transformation:** Applies row-filtering and date normalization, followed by a strict set-subset assertion between the unique `year_month` values of the micro and macro panels.
*   **Role in Research Pipeline:** Guarantees that the aggregate market variables can be computed for every month present in the cross-sectional feature matrix, preventing undefined states in the target variable $Y_{t+1}$.

### 6. `construct_eligible_universe` (Task 6 Orchestrator)
*   **Inputs:** `df_crsp_daily_clean` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Filters the cleansed panel to retain only ordinary common shares (codes 10, 11), major exchanges (codes 1, 2, 3), and stocks with an absolute price $\ge \$1$. Computes the daily cross-section size $N_d$.
*   **Outputs:** `df_eligible` (DataFrame), `nd_series` (Series), `audit_log` (Dictionary).
*   **Data Transformation:** Applies sequential boolean indexing to slice the DataFrame, followed by a `groupby("DATE")["PERMNO"].nunique()` aggregation to generate the $N_d$ vector.
*   **Role in Research Pipeline:** Accurately implements the paper's universe filters: "(i) ordinary common shares (CRSP share codes 10 and 11), (ii) NYSE/AMEX/NASDAQ listings... and (iv) absolute price at least $1."

### 7. `compute_daily_return_moments` (Task 7 Orchestrator)
*   **Inputs:** `df_eligible` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Computes the daily cross-sectional mean, population dispersion, skewness, and raw kurtosis. Enforces a zero-variance policy to prevent division by zero.
*   **Outputs:** DataFrame of daily moments indexed by `DATE`.
*   **Data Transformation:** Groups the panel by `DATE` and applies custom aggregations to compute higher-order central moments, broadcasting the daily mean back to the panel to center returns before exponentiation.
*   **Role in Research Pipeline:** Implements equations (1) through (4) from the LaTeX context, capturing the "fragmentation of returns" and "extreme cross-sectional moves":
    $\bar{r}_d \equiv \frac{1}{N_d} \sum_{i=1}^{N_d} r_{i,d}$
    $\sigma^{xs}_d \equiv \sqrt{\frac{1}{N_d} \sum_{i=1}^{N_d} (r_{i,d} - \bar{r}_d)^2}$
    $\text{Skew}^{xs}_d \equiv \frac{\frac{1}{N_d} \sum_{i=1}^{N_d} (r_{i,d} - \bar{r}_d)^3}{(\sigma^{xs}_d)^3}$
    $\text{Kurt}^{xs}_d \equiv \frac{\frac{1}{N_d} \sum_{i=1}^{N_d} (r_{i,d} - \bar{r}_d)^4}{(\sigma^{xs}_d)^4}$

### 8. `compute_daily_tail_measures` (Task 8 Orchestrator)
*   **Inputs:** `df_eligible` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Computes the cross-sectional mean absolute return, and the fraction of stocks experiencing returns $\le -5\%$ and $\ge +5\%$.
*   **Outputs:** DataFrame of daily tail measures indexed by `DATE`.
*   **Data Transformation:** Creates boolean indicator series for tail events, then applies a `groupby("DATE").mean()` operation to compute the exact fraction of the cross-section meeting the criteria.
*   **Role in Research Pipeline:** Implements equations (5) and (6) to capture "tail-like behavior in a robust way":
    $\text{Frac}^{dn}_d(\tau) \equiv \frac{1}{N_d} \sum_{i=1}^{N_d} \mathbb{I} \{r_{i,d} \le -\tau\}$
    $\text{Frac}^{up}_d(\tau) \equiv \frac{1}{N_d} \sum_{i=1}^{N_d} \mathbb{I} \{r_{i,d} \ge \tau\}$

### 9. `compute_daily_trading_proxies` (Task 9 Orchestrator)
*   **Inputs:** `df_eligible` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Computes the cross-sectional average of $\log(1 + \text{Vol})$, dollar volume ($|P| \times \text{Vol}$), and turnover ($\text{Vol} / (1000 \times \text{SHROUT})$), filtering out missing or invalid denominators.
*   **Outputs:** DataFrame of daily trading proxies indexed by `DATE`.
*   **Data Transformation:** Performs row-wise arithmetic transformations (logarithms, multiplication, division) on valid subsets of the data, followed by `groupby` averaging.
*   **Role in Research Pipeline:** Implements the liquidity usage and trading intensity signals defined in Table 1: "average log volume, average dollar volume, and average turnover (volume scaled by shares outstanding)."

### 10. `aggregate_monthly_features` (Task 10 Orchestrator)
*   **Inputs:** `daily_stats_all` (DataFrame), `nd_series` (Series), `study_config` (Dictionary).
*   **Processes:** Averages all daily statistics within each calendar month, renames columns to the canonical feature set, and drops months with insufficient trading days.
*   **Outputs:** `X_t` (DataFrame), `audit_log` (Dictionary).
*   **Data Transformation:** Groups the daily time series by `year_month` and applies a `.mean()` reduction, transforming the data from a daily frequency to a monthly panel $X_t$.
*   **Role in Research Pipeline:** Implements equation (7), compressing daily fragility signals into the monthly feature vector used for prediction:
    $Z_t \equiv \frac{1}{D_t} \sum_{d \in t} z_d$

### 11. `construct_market_aggregates` (Task 11 Orchestrator)
*   **Inputs:** `df_crsp_index_clean` (DataFrame), `X_t` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Compounds daily market returns to compute $R^{mkt}_t$, computes the annualized within-month standard deviation to yield $\sigma^{mkt}_t$, and aligns these aggregates with $X_t$ via an inner join.
*   **Outputs:** `aligned_panel` (DataFrame), `audit_log` (Dictionary).
*   **Data Transformation:** Applies a custom product aggregation ($\prod(1+r) - 1$) and standard deviation aggregation to the daily index data, followed by a relational join on the `year_month` index.
*   **Role in Research Pipeline:** Constructs the macroeconomic state variables required for both the stress definition and the parsimonious benchmark model.

### 12. `construct_stress_labels` (Task 12 Orchestrator)
*   **Inputs:** `aligned_panel` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Computes the expanding 90th percentile of realized volatility strictly up to $t-1$, evaluates the logical OR condition for stress at month $t$, and shifts the result backward to create the target $Y_{t+1}$.
*   **Outputs:** `modeling_panel` (DataFrame), `audit_log` (Dictionary).
*   **Data Transformation:** Utilizes an `.expanding().quantile().shift(1)` operation to prevent look-ahead bias, applies boolean logic to create a binary state variable, and uses `.shift(-1)` to align the future state with current features.
*   **Role in Research Pipeline:** Implements equations (8) and (9), defining the latent stress state and the supervised learning target:
    $S_t \equiv \mathbb{I} \{ R^{mkt}_t \le c_R \} \lor \mathbb{I} \{ \sigma^{mkt}_t \ge q_{t-1}(\alpha) \}$
    $Y_{t+1} \equiv S_{t+1}$

### 13. `expanding_window_standardizer` (Task 13 Orchestrator)
*   **Inputs:** `x_panel` (DataFrame), `train_end_idx` (Any), `study_config` (Dictionary).
*   **Processes:** Slices the feature panel up to the training boundary, computes the mean and population standard deviation on complete cases, and applies the z-score transformation to the training data.
*   **Outputs:** `x_train_std` (DataFrame), `scaler_params` (Dictionary).
*   **Data Transformation:** Extracts cross-sectional moments from a temporal slice and broadcasts these scalars across the DataFrame to normalize the feature distributions.
*   **Role in Research Pipeline:** Implements the strict data hygiene requirement: "Predictors are standardized (z-scored) using information available within each training window."

### 14. `tune_hyperparameters` (Task 14 Orchestrator)
*   **Inputs:** `modeling_panel` (DataFrame), `standardizer_func` (Callable), `study_config` (Dictionary).
*   **Processes:** Generates forward-chaining time-series splits within the initial 120-month window, performs a grid search for the L1 (Lasso) and L2 (Ridge) penalty parameters minimizing log loss, and freezes the optimal values.
*   **Outputs:** `frozen_hyperparams` (Dictionary).
*   **Data Transformation:** Iteratively slices the panel into expanding train/validation folds, standardizes features *within* each fold, fits logistic models, and aggregates validation scores.
*   **Role in Research Pipeline:** Implements the hyperparameter selection protocol: "The regularization parameter $\lambda$ is selected via time-series cross-validation within this initial window and then held fixed..."

### 15. `forecast_mspi` (Task 15 Orchestrator)
*   **Inputs:** `modeling_panel` (DataFrame), `frozen_hyperparams` (Dictionary), `standardizer_func` (Callable), `study_config` (Dictionary).
*   **Processes:** Iterates over out-of-sample months. For each month $t$, defines a training set strictly where $\tau+1 \le t$, standardizes data, fits an L1-regularized logistic regression, and predicts the probability of stress for $t+1$.
*   **Outputs:** `mspi_forecasts` (DataFrame).
*   **Data Transformation:** Sequentially expands a temporal slice, fits a convex optimization routine (Coordinate Descent via `liblinear`), and maps the linear predictor through a sigmoid function to yield probabilities.
*   **Role in Research Pipeline:** Implements the core MSPI algorithm (equations 10 and 11):
    $MSPI_t \equiv \Pr(Y_{t+1} = 1 | X_t) = \Lambda(\beta_0 + X'_t\beta)$
    $(\hat{\beta}_0, \hat{\beta}) \in \arg\min_{\beta_0,\beta} \left\{ - \sum_{t \in \mathcal{T}} \left[ Y_{t+1} \log p_t + \dots \right] + \lambda\|\beta\|_1 \right\}$

### 16. `forecast_benchmark` (Task 16 Orchestrator)
*   **Inputs:** `modeling_panel` (DataFrame), `frozen_hyperparams` (Dictionary), `standardizer_func` (Callable), `study_config` (Dictionary).
*   **Processes:** Executes the identical expanding-window protocol as Task 15, but restricts features to $R^{mkt}_t$ and $\sigma^{mkt}_t$ and applies an L2 (Ridge) penalty.
*   **Outputs:** `benchmark_forecasts` (DataFrame).
*   **Data Transformation:** Identical to Task 15, but operating on a highly restricted, two-dimensional feature space.
*   **Role in Research Pipeline:** Implements the parsimonious benchmark model (equation 12):
    $p^B_t = \Lambda(\alpha_0 + \alpha_1 R^{mkt}_t + \alpha_2 \sigma^{mkt}_t)$

### 17. `run_robustness_horse_race` (Task 18 Orchestrator)
*   **Inputs:** `modeling_panel`, `mspi_forecasts`, `benchmark_forecasts`, `frozen_hyperparams`, `standardizer_func`, `study_config`.
*   **Processes:** Fits Random Forest and Gradient Boosting models in expanding windows. Extracts raw scores, fits a Platt scaling (logistic) calibrator on the training scores, applies it to the test scores, and unifies all model forecasts into a single panel.
*   **Outputs:** `unified_evaluation_panel` (DataFrame).
*   **Data Transformation:** Fits non-parametric ensemble trees, extracts decision functions, and maps them through a secondary logistic regression layer to produce calibrated probabilities.
*   **Role in Research Pipeline:** Implements the nonlinear machine-learning benchmarks and explicit probability calibration: "I apply a real-time calibration map (Platt scaling) within each training window to obtain the probability forecast $\hat{p}^{RF}_t = g_{RF}(s^{RF}_t)$."

### 18. `compute_discrimination_metrics` (Task 19 Orchestrator)
*   **Inputs:** `unified_evaluation_panel` (DataFrame).
*   **Processes:** Computes the Area Under the ROC Curve (AUC) and the Area Under the Precision-Recall Curve (PR-AUC) for all models using their raw, uncalibrated scores.
*   **Outputs:** `discrimination_metrics` (DataFrame).
*   **Data Transformation:** Computes rank-based integration metrics over the aligned arrays of predictions and ground-truth labels.
*   **Role in Research Pipeline:** Implements the rank-based evaluation: "I next quantify these comparisons using rank-based discrimination metrics (ROC and precision–recall) computed from raw scores..."

### 19. `compute_probability_metrics` (Task 20 Orchestrator)
*   **Inputs:** `unified_evaluation_panel`, `discrimination_metrics`, `study_config`.
*   **Processes:** Computes the Brier Score, Log Loss (with probability clipping), and Expected Calibration Error (ECE) using equal-mass quantile binning. Merges these with the discrimination metrics.
*   **Outputs:** `probability_metrics` (DataFrame).
*   **Data Transformation:** Applies proper scoring rule formulas and performs quantile discretization (`pd.qcut`) to compute weighted absolute calibration errors.
*   **Role in Research Pipeline:** Implements the probability accuracy evaluation: "The Brier score and log loss (negative log-likelihood) reward sharp but accurate probabilities and penalize overconfident errors."

### 20. `perform_bootstrap_inference` (Task 21 Orchestrator)
*   **Inputs:** `unified_evaluation_panel` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Generates 2,000 moving block bootstrap resamples (block length 12), computes the difference in metrics (MSPI vs Benchmark) for each resample, and calculates the mean difference and 95% confidence intervals.
*   **Outputs:** `bootstrap_inference` (DataFrame).
*   **Data Transformation:** Performs resampling with replacement on integer indices, recalculates metrics iteratively, and applies percentile aggregations to the resulting distributions.
*   **Role in Research Pipeline:** Implements the statistical significance testing: "To quantify sampling uncertainty... I implement a block bootstrap over months (12-month blocks; 2,000 replications)."

### 21. `perform_economic_analysis` (Task 22 Orchestrator)
*   **Inputs:** `mspi_forecasts` (DataFrame), `market_aggregates` (DataFrame), `study_config` (Dictionary).
*   **Processes:** Estimates a predictive OLS regression for next-month volatility, constructs stress-risk innovations by orthogonalizing MSPI against its lags and market controls, and estimates local projections (IRFs) using HAC standard errors.
*   **Outputs:** `economic_analysis` (Dictionary).
*   **Data Transformation:** Performs temporal shifting to align variables, fits OLS regressions, and computes Heteroskedasticity and Autocorrelation Consistent (Newey-West) covariance matrices.
*   **Role in Research Pipeline:** Implements the financial econometrics applications (equations 13, 15, 16):
    $\sigma^{mkt}_{t+1} = \alpha + \gamma MSPI_t + \phi' Z_t + \eta_{t+1}$
    $MSPI_t = \delta_0 + \delta_1 MSPI_{t-1} + \Delta' Z_{t-1} + u_t$
    $y_{t+h} = a_h + b_h u_t + \Gamma'_h W_{t-1} + \varepsilon_{t+h}$

### 22. `execute_complete_mspi_research_pipeline` (Top-Level Orchestrator)
*   **Inputs:** `df_crsp_daily` (DataFrame), `df_crsp_index` (DataFrame), `raw_study_config` (Dictionary).
*   **Processes:** Sequentially executes all 21 upstream orchestrators in strict dependency order. It manages the flow of intermediate DataFrames, passes the standardizer callable to modeling functions, executes final anti-look-ahead safety checks, and compiles a master results dictionary.
*   **Outputs:** `results` (Dictionary containing all features, labels, forecasts, metrics, economic analyses, and audit logs).
*   **Data Transformation:** Acts as the master composition function, threading the outputs of data cleansing into feature engineering, into labeling, into modeling, and finally into evaluation and inference.
*   **Role in Research Pipeline:** Operationalizes the entire "Algorithmic Monitoring" framework. It guarantees that the "expanding window, no look-ahead, reproducible updating" protocol defined in Box 2 is strictly enforced from raw data ingestion to final econometric output.

<br><br>

## **Usage Example**

Here is the granular, step-by-step guide to executing the end-to-end pipeline for **"Algorithmic Monitoring: Measuring Market Stress with Machine Learning"**. This example demonstrates how to synthetically generate the required CRSP micro and macro data, load the study configuration from a YAML file, and execute the full research pipeline using the `execute_complete_mspi_research_pipeline` orchestrator.

**Note:** This example assumes that the required Python modules and all callables defined in this workbook (from `validate_study_config` to `execute_complete_mspi_research_pipeline`) are available in the current namespace (e.g., defined in a single Jupyter notebook). It also assumes a `config.yaml` file exists in the working directory.

### **Step 1: Synthetic Data Generation**

The pipeline requires two specific DataFrames: `df_crsp_daily` (micro-data) and `df_crsp_index` (market aggregate). We generate high-fidelity synthetic versions of these datasets using `Faker` and `numpy` to mimic the schema, data types, and statistical properties required by the pipeline's validation logic.

**Methodology:**
1.  **Micro-Data (`df_crsp_daily`):** We simulate a panel of 50 stocks over a 20-year period. We include all required columns (`PERMNO`, `DATE`, `SHRCD`, `EXCHCD`, `PRC`, `RET`, `VOL`, `SHROUT`). We intentionally introduce some missing values and negative prices (CRSP convention) to test the cleansing logic.
2.  **Index Data (`df_crsp_index`):** We simulate a value-weighted market return (`vwretd`) corresponding to the same dates.
3.  **Schema Enforcement:** We explicitly cast columns to the strict types (`Int64`, `float64`, `datetime64[ns]`) expected by the validators.

```python
import pandas as pd
import numpy as np
import yaml
import os
from faker import Faker
from typing import Dict, Any

# Initialize Faker and RNG for reproducibility
fake = Faker()
Faker.seed(42)
np.random.seed(42)

def generate_synthetic_crsp_data(
    start_date: str = "2000-01-01",
    end_date: str = "2024-12-31",
    n_stocks: int = 50
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Generates synthetic CRSP Daily Stock File (micro) and Index File (macro) DataFrames.

    Purpose:
        To create high-fidelity mock datasets that mimic the schema and statistical
        properties of CRSP data, enabling the execution of the MSPI pipeline without
        proprietary data access.

    Inputs:
        start_date (str): Start date 'YYYY-MM-DD'.
        end_date (str): End date 'YYYY-MM-DD'.
        n_stocks (int): Number of unique stocks to simulate in the micro-panel.

    Returns:
        tuple[pd.DataFrame, pd.DataFrame]: (df_crsp_daily, df_crsp_index)
    """
    # 1. Generate Trading Days (Business Days)
    dates = pd.date_range(start=start_date, end=end_date, freq='B')
    n_days = len(dates)
    
    # -------------------------------------------------------------------------
    # A. Generate Micro-Data (df_crsp_daily)
    # -------------------------------------------------------------------------
    permnos = np.arange(10000, 10000 + n_stocks)
    
    # Create a long-format panel
    # Repeat dates for each permno
    panel_dates = np.tile(dates, n_stocks)
    panel_permnos = np.repeat(permnos, n_days)
    
    n_rows = len(panel_dates)
    
    # Simulate Returns (Normal distribution, mean 0, vol 2%)
    ret = np.random.normal(0.0004, 0.02, n_rows)
    
    # Simulate Prices (Random walk approx, some negative for CRSP convention)
    # Base price ~ $50
    prc_raw = np.random.uniform(5, 100, n_rows)
    # Make 5% of prices negative (bid/ask flag)
    neg_mask = np.random.rand(n_rows) < 0.05
    prc = prc_raw.copy()
    prc[neg_mask] = -prc[neg_mask]
    
    # Simulate Volume and Shares
    vol = np.random.randint(1000, 1000000, n_rows)
    shrout = np.random.randint(5000, 500000, n_rows) # In thousands
    
    # Simulate Codes (Mostly valid, some invalid to test filters)
    # SHRCD: 90% are 10 or 11
    shrcd = np.random.choice([10, 11, 12, 31], size=n_rows, p=[0.45, 0.45, 0.05, 0.05])
    # EXCHCD: 90% are 1, 2, 3
    exchcd = np.random.choice([1, 2, 3, 4], size=n_rows, p=[0.3, 0.3, 0.3, 0.1])
    
    df_crsp_daily = pd.DataFrame({
        "PERMNO": panel_permnos,
        "DATE": panel_dates,
        "SHRCD": shrcd,
        "EXCHCD": exchcd,
        "PRC": prc,
        "RET": ret,
        "VOL": vol,
        "SHROUT": shrout
    })
    
    # Enforce Types
    df_crsp_daily["PERMNO"] = df_crsp_daily["PERMNO"].astype("Int64")
    df_crsp_daily["SHRCD"] = df_crsp_daily["SHRCD"].astype("Int64")
    df_crsp_daily["EXCHCD"] = df_crsp_daily["EXCHCD"].astype("Int64")
    df_crsp_daily["DATE"] = df_crsp_daily["DATE"].astype("datetime64[ns]")
    df_crsp_daily["PRC"] = df_crsp_daily["PRC"].astype("float64")
    df_crsp_daily["RET"] = df_crsp_daily["RET"].astype("float64")
    df_crsp_daily["VOL"] = df_crsp_daily["VOL"].astype("float64")
    df_crsp_daily["SHROUT"] = df_crsp_daily["SHROUT"].astype("float64")
    
    # Introduce some missingness (1% of RET)
    mask_missing = np.random.rand(n_rows) < 0.01
    df_crsp_daily.loc[mask_missing, "RET"] = np.nan

    # -------------------------------------------------------------------------
    # B. Generate Index Data (df_crsp_index)
    # -------------------------------------------------------------------------
    # Simulate market return (correlated with mean stock return but lower vol)
    # Mean 0.04%, Vol 1%
    vwretd = np.random.normal(0.0004, 0.01, n_days)
    
    df_crsp_index = pd.DataFrame({
        "DATE": dates,
        "vwretd": vwretd
    })
    
    # Enforce Types
    df_crsp_index["DATE"] = df_crsp_index["DATE"].astype("datetime64[ns]")
    df_crsp_index["vwretd"] = df_crsp_index["vwretd"].astype("float64")
    
    return df_crsp_daily, df_crsp_index

# Generate datasets
print("Generating synthetic CRSP data...")
df_daily, df_index = generate_synthetic_crsp_data()
print("Data generation complete.")
print(f"Micro-data shape: {df_daily.shape}")
print(f"Index-data shape: {df_index.shape}")
```
<br>

### **Step 2: Loading the Configuration (`config.yaml`)**

We read the `config.yaml` file (assumed to be in the working directory) into a Python dictionary. This dictionary serves as the single source of truth for all study parameters.

```python
def load_config(filepath: str = "config.yaml") -> Dict[str, Any]:
    """
    Loads the study configuration parameters from a YAML file into a Python dictionary.

    Purpose:
        To ingest the deterministic hyperparameters, data split definitions, and
        evaluation metrics defined in the external configuration file. This ensures
        reproducibility by separating code from configuration.

    Inputs:
        filepath (str): The relative or absolute path to the YAML configuration file.
                        Default is "config.yaml".

    Returns:
        Dict[str, Any]: A nested dictionary containing the study configuration.
                        Returns an empty dictionary if the file is not found (fallback).

    Raises:
        TypeError: If filepath is not a string.
        yaml.YAMLError: If the file contains invalid YAML syntax.
    """
    # Validate input type
    if not isinstance(filepath, str):
        raise TypeError(f"filepath must be a string, got {type(filepath)}.")

    try:
        # Open the file stream
        with open(filepath, "r") as file:
            # Parse the YAML content safely
            config = yaml.safe_load(file)

        # Log success to console
        print(f"\nSuccessfully loaded configuration from {filepath}")

        return config

    except FileNotFoundError:
        # Fallback for demonstration if file doesn't exist in this specific run environment
        # In a real scenario, this would raise an error, but per instructions we handle it gracefully.
        print(f"\nWarning: {filepath} not found. Please ensure the file exists.")
        return {}
    except yaml.YAMLError as e:
        # Handle parsing errors explicitly
        print(f"\nError parsing YAML file {filepath}: {e}")
        raise

# Load the configuration
# Note: Ensure 'config.yaml' is in your working directory.
raw_config = load_config()

```

<br>

### **Step 3: Executing the Pipeline (`execute_complete_mspi_research_pipeline`)**

We now invoke the top-level orchestrator. This function will validate inputs, cleanse data, engineer features, train models, and produce the final MSPI index and analysis.

```python
if __name__ == "__main__":
    print("\n" + "="*80)
    print("STARTING MSPI PIPELINE EXECUTION")
    print("="*80)
    
    # Execute the pipeline
    try:
        results = execute_complete_mspi_research_pipeline(
            df_crsp_daily=df_daily,
            df_crsp_index=df_index,
            raw_study_config=raw_config
        )
        
        print("\n" + "="*80)
        print("PIPELINE EXECUTION SUCCESSFUL")
        print("="*80)
        
        # ---------------------------------------------------------------------
        # Inspecting Artifacts
        # ---------------------------------------------------------------------
        
        # 1. MSPI Forecasts
        print("\n[1] MSPI Forecasts (Head):")
        print(results["mspi_forecasts"].head())
        
        # 2. Discrimination Metrics
        print("\n[2] Discrimination Metrics (AUC/PR-AUC):")
        print(results["discrimination_metrics"])
        
        # 3. Probability Metrics
        print("\n[3] Probability Metrics (Brier/LogLoss/ECE):")
        print(results["probability_metrics"])
        
        # 4. Economic Analysis (Predictive Regression)
        print("\n[4] Predictive Regression Results (Next-Month Volatility):")
        reg_stats = results["economic_analysis"]["predictive_regression_vol"]
        print(pd.DataFrame([reg_stats]))
        
        # 5. Audit Log (Sample)
        print("\n[5] Audit Log (Data Cleansing):")
        print(results["audit_log"]["cleansing_daily"])
        
    except Exception as e:
        print(f"\nCRITICAL FAILURE: Pipeline execution failed with error: {e}")
        import traceback
        traceback.print_exc()
```

<br>

### **Summary of Outputs**

The `results` dictionary contains the complete set of research artifacts:
*   **`mspi_forecasts`**: The time series of stress probabilities ($MSPI_t$) and realized targets ($Y_{t+1}$).
*   **`discrimination_metrics`**: A DataFrame comparing AUC and PR-AUC across MSPI, Benchmark, RF, and GB.
*   **`probability_metrics`**: Calibration diagnostics including Brier Score and ECE.
*   **`economic_analysis`**: Results from the predictive volatility regression and impulse response functions.
*   **`audit_log`**: A detailed record of rows dropped, duplicates removed, and model hyperparameters used, ensuring full reproducibility.

<br>

## **Implemented Callables**

In [None]:
# Task 1 — Validate `STUDY_CONFIG` dictionary completeness and internal coherence

# ==============================================================================
# Task 1: Validate and parse the study configuration dictionary
# ==============================================================================

def apply_strict_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Injects strict, reproducible defaults for technical parameters that may be
    missing from the high-level study configuration. This ensures no silent
    numerical drift occurs due to unspecified library defaults.

    Parameters
    ----------
    config : Dict[str, Any]
        The input study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        The configuration dictionary with explicit drift-control keys added.
    """
    # Create a deep copy to avoid mutating the input in place unexpectedly
    # (In a full implementation, use copy.deepcopy; here we assume dict structure)
    clean_config = config.copy()

    # 1. Market Aggregates Defaults
    if "market_aggregates" in clean_config:
        # Default to sample standard deviation (unbiased) for realized vol
        clean_config["market_aggregates"].setdefault("realized_vol_ddof", 1)

    # 2. Target Definition Defaults
    if "target_definition" in clean_config:
        # Default to linear interpolation for quantiles (standard in finance)
        clean_config["target_definition"].setdefault("vol_quantile_interpolation", "linear")
        # Default to 12 months history before first label is generated
        clean_config["target_definition"].setdefault("min_vol_history_months", 12)

    # 3. Feature Engineering Defaults
    if "feature_engineering" in clean_config:
        # Default to Period['M'] for strict monthly alignment
        clean_config["feature_engineering"].setdefault("month_index_type", "period_M")
        # Default epsilon for zero-variance checks
        clean_config["feature_engineering"].setdefault("std_epsilon", 1e-8)

    # 4. Evaluation Defaults
    if "evaluation_inference" in clean_config:
        # Default probability clipping for log loss stability
        clean_config["evaluation_inference"].setdefault("proba_clip_epsilon", 1e-6)
        # Default calibration protocol
        clean_config["evaluation_inference"].setdefault("calibration_protocol", "in_sample_within_window")
        # Default bootstrap settings if missing
        if "block_bootstrap" in clean_config["evaluation_inference"]:
            clean_config["evaluation_inference"]["block_bootstrap"].setdefault("variant", "moving")
            clean_config["evaluation_inference"]["block_bootstrap"].setdefault("ci_method", "percentile")
            clean_config["evaluation_inference"]["block_bootstrap"].setdefault("degenerate_sample_policy", "skip")

    # 5. Learning Protocol Defaults
    if "learning_protocol" in clean_config:
        if "hyperparameter_tuning" in clean_config["learning_protocol"]:
            # Default CV objective
            clean_config["learning_protocol"]["hyperparameter_tuning"].setdefault("cv_objective", "neg_log_loss")
            # Default CV geometry (5 splits is standard for this sample size)
            clean_config["learning_protocol"]["hyperparameter_tuning"].setdefault("cv_n_splits", 5)
            clean_config["learning_protocol"]["hyperparameter_tuning"].setdefault("cv_val_size_months", 12)
            clean_config["learning_protocol"]["hyperparameter_tuning"].setdefault("cv_step_size_months", 12)

    return clean_config

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1: Verify all required top-level keys exist and validate schema structure.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_schema_completeness(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Validates that the configuration dictionary contains all required top-level keys
    and critical sub-keys necessary for the MSPI pipeline.

    Parameters
    ----------
    config : Dict[str, Any]
        The study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        A report dictionary with keys 'passed' (bool), 'missing_keys' (List[str]),
        and 'errors' (List[str]).

    Raises
    ------
    ValueError
        If any top-level key is missing, as this prevents pipeline initialization.
    """
    # List of mandatory top-level keys derived from the task specification
    required_top_level = [
        "raw_data_schema",
        "universe_construction",
        "feature_engineering",
        "market_aggregates",
        "target_definition",
        "learning_protocol",
        "models",
        "evaluation_inference",
        "economic_analysis",
        "reproducibility"
    ]

    missing_keys = [key for key in required_top_level if key not in config]

    if missing_keys:
        error_msg = f"Critical Error: Missing top-level configuration keys: {missing_keys}"
        raise ValueError(error_msg)

    # Validate critical sub-keys for drift control (must be present after default injection)
    # We check paths like "target_definition.vol_quantile_alpha"
    critical_subpaths = [
        ("target_definition", "vol_quantile_alpha"),
        ("target_definition", "vol_quantile_interpolation"), # Drift control
        ("market_aggregates", "realized_vol_ddof"),          # Drift control
        ("feature_engineering", "month_index_type"),         # Drift control
        ("learning_protocol", "window_type"),
        ("reproducibility", "random_seed")
    ]

    errors = []
    for parent, child in critical_subpaths:
        if child not in config.get(parent, {}):
            errors.append(f"Missing critical sub-key: ['{parent}']['{child}']")

    return {
        "passed": len(errors) == 0,
        "missing_keys": missing_keys,
        "errors": errors
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Assert anti-look-ahead constraints.
# -------------------------------------------------------------------------------------------------------------------------------
def assert_real_time_discipline(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Enforces strict anti-look-ahead constraints mandated by the research protocol.
    Violations are treated as fatal errors.

    Parameters
    ----------
    config : Dict[str, Any]
        The study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        A report dictionary with 'passed' (bool) and 'violations' (List[str]).

    Raises
    ------
    ValueError
        If any real-time constraint is violated.
    """
    violations = []

    # 1. Volatility Quantile Window must be expanding
    # Source: "qt-1(alpha) is the expanding, real-time alpha-quantile"
    if config["target_definition"]["vol_quantile_window"] != "expanding":
        violations.append(
            f"Constraint Violation: target_definition.vol_quantile_window must be 'expanding', "
            f"got '{config['target_definition']['vol_quantile_window']}'"
        )

    # 2. Quantile Lag must be 1 month
    # Source: "qt-1(alpha)... computed using data available up to month t-1"
    if config["target_definition"]["vol_quantile_lag_months"] != 1:
        violations.append(
            f"Constraint Violation: target_definition.vol_quantile_lag_months must be 1, "
            f"got {config['target_definition']['vol_quantile_lag_months']}"
        )

    # 3. Learning Window Type must be expanding
    # Source: "re-estimated in an expanding-window design"
    if config["learning_protocol"]["window_type"] != "expanding":
        violations.append(
            f"Constraint Violation: learning_protocol.window_type must be 'expanding', "
            f"got '{config['learning_protocol']['window_type']}'"
        )

    # 4. Standardization Scope must be training window only
    # Source: "Predictors are standardized (z-scored) using information available within each training window"
    std_scope = config["learning_protocol"]["standardization"]["scope"]
    if std_scope != "training_window_only":
        violations.append(
            f"Constraint Violation: learning_protocol.standardization.scope must be 'training_window_only', "
            f"got '{std_scope}'"
        )

    # 5. Hyperparameter Tuning Scope
    # Source: "selected via time-series cross-validation within this initial window"
    tune_init = config["learning_protocol"]["hyperparameter_tuning"]["tune_in_initial_window_only"]
    if tune_init is not True:
        violations.append(
            f"Constraint Violation: learning_protocol.hyperparameter_tuning.tune_in_initial_window_only must be True, "
            f"got {tune_init}"
        )

    # 6. Hyperparameter Fixing
    # Source: "held fixed for the subsequent expanding-window evaluation"
    hold_fixed = config["learning_protocol"]["hyperparameter_tuning"]["hold_fixed_after_tuning"]
    if hold_fixed is not True:
        violations.append(
            f"Constraint Violation: learning_protocol.hyperparameter_tuning.hold_fixed_after_tuning must be True, "
            f"got {hold_fixed}"
        )

    if violations:
        error_msg = "\n".join(violations)
        raise ValueError(f"Real-Time Discipline Check Failed:\n{error_msg}")

    return {
        "passed": True,
        "violations": []
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Log unspecified fields and drift controls.
# -------------------------------------------------------------------------------------------------------------------------------
def check_unspecified_fields(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Identifies fields that are explicitly None or missing, which require user resolution
    or strict default injection to prevent numerical drift.

    Parameters
    ----------
    config : Dict[str, Any]
        The study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        A report dictionary with 'warnings' (List[str]) detailing unspecified fields.
    """
    warnings = []

    # Check for None values in fields that require explicit settings
    # 1. CV Objective
    cv_obj = config["learning_protocol"]["hyperparameter_tuning"].get("cv_objective")
    if cv_obj is None:
        warnings.append("Warning: 'cv_objective' is None. Defaulting to 'neg_log_loss' is recommended.")

    # 2. Random Forest Hyperparameters
    rf_params = config["models"]["random_forest"].get("hyperparameters")
    if rf_params is None:
        warnings.append("Warning: Random Forest 'hyperparameters' is None. Full tuning grid must be defined in initial window.")

    # 3. Gradient Boosting Hyperparameters
    gb_params = config["models"]["gradient_boosted_trees"].get("hyperparameters")
    if gb_params is None:
        warnings.append("Warning: Gradient Boosting 'hyperparameters' is None. Full tuning grid must be defined in initial window.")

    # 4. Local Projections Horizons
    lp_h = config["economic_analysis"]["local_projections"].get("horizons_h")
    if lp_h is None:
        warnings.append("Warning: Local Projections 'horizons_h' is None. User must specify horizons (e.g., 12).")

    # 5. Local Projections Controls
    lp_w = config["economic_analysis"]["local_projections"].get("controls_W")
    if lp_w is None:
        warnings.append("Warning: Local Projections 'controls_W' is None. User must specify control variables.")

    return {
        "warnings": warnings
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def validate_study_config(raw_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the validation of the study configuration. It applies strict defaults
    to prevent drift, verifies schema completeness, enforces real-time discipline,
    and reports warnings for unspecified fields.

    Parameters
    ----------
    raw_config : Dict[str, Any]
        The raw input configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing:
        - 'validated_config': The configuration with strict defaults applied.
        - 'validation_report': A summary of checks, errors, and warnings.

    Raises
    ------
    ValueError
        If critical schema keys are missing or real-time constraints are violated.
    """
    # 1. Apply strict defaults to ensure drift control keys exist
    # This transforms the raw input into a "canonical" config
    validated_config = apply_strict_defaults(raw_config)

    # 2. Validate Schema Completeness
    # This will raise ValueError if top-level keys are missing
    schema_report = validate_schema_completeness(validated_config)
    if not schema_report["passed"]:
        # This block might be unreachable due to raise in function, but good for robustness
        raise ValueError(f"Schema Validation Failed: {schema_report['errors']}")

    # 3. Assert Real-Time Discipline
    # This will raise ValueError if constraints are violated
    discipline_report = assert_real_time_discipline(validated_config)

    # 4. Check for Unspecified Fields (Warnings)
    warnings_report = check_unspecified_fields(validated_config)

    # Compile final report
    final_report = {
        "schema_validation": schema_report,
        "discipline_validation": discipline_report,
        "warnings": warnings_report["warnings"],
        "status": "PASSED"
    }

    # Log warnings to console/logger
    if warnings_report["warnings"]:
        print("Configuration Warnings:")
        for w in warnings_report["warnings"]:
            print(f"  - {w}")

    return {
        "validated_config": validated_config,
        "validation_report": final_report
    }


In [None]:
# Task 2 — Validate df_crsp_daily schema, types, and value ranges

# ==============================================================================
# Task 2: Validate df_crsp_daily schema, types, and value ranges
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Verify required columns are present and correctly typed.
# -------------------------------------------------------------------------------------------------------------------------------
def _check_column_presence_and_types(
    df: pd.DataFrame,
    schema_config: Dict[str, Any]
) -> None:
    """
    Validates that the CRSP Daily DataFrame contains all required columns and that
    they conform to the expected data types (datetime, integer-like, numeric).

    Parameters
    ----------
    df : pd.DataFrame
        The CRSP Daily Stock File (micro-data).
    schema_config : Dict[str, Any]
        The 'crsp_daily_stock_file' section of the study configuration, containing
        'required_columns'.

    Raises
    ------
    ValueError
        If any required column is missing or has a fundamentally incompatible type.
    """
    required_cols = schema_config["required_columns"]
    missing = [c for c in required_cols if c not in df.columns]

    if missing:
        # Critical failure: missing columns prevent any downstream processing.
        raise ValueError(f"CRSP Daily validation failed. Missing columns: {missing}")

    # 1. Validate DATE type
    # Must be datetime64[ns] to ensure correct temporal sorting and grouping.
    if not is_datetime64_any_dtype(df["DATE"]):
        raise TypeError(
            f"Column 'DATE' must be datetime64[ns], got {df['DATE'].dtype}. "
            "Please parse dates during ingestion."
        )

    # 2. Validate Integer-like Identifiers (PERMNO, SHRCD, EXCHCD)
    # These define the panel structure and universe. Float representation is acceptable
    # if lossless, but object/string is dangerous.
    int_cols = ["PERMNO", "SHRCD", "EXCHCD"]
    for col in int_cols:
        if not is_numeric_dtype(df[col]):
            # If object, check if it's coercible to integer (e.g. string "10")
            # This is a strict check to prevent "10.0" strings or mixed types.
            try:
                pd.to_numeric(df[col], errors='raise').astype('Int64')
            except Exception:
                raise TypeError(
                    f"Column '{col}' must be integer-like, got {df[col].dtype} "
                    "and is not safely coercible."
                )

    # 3. Validate Numeric Signals (PRC, RET, VOL, SHROUT)
    # These are the basis of all fragility signals.
    num_cols = ["PRC", "RET", "VOL", "SHROUT"]
    for col in num_cols:
        # We allow object type here ONLY if it contains coercible numbers (common in CRSP
        # where error codes like 'C' are mixed with floats). Task 4 will cleanse them.
        # Here we just ensure the column isn't purely garbage.
        if not is_numeric_dtype(df[col]):
            # Attempt to coerce a sample to verify feasibility
            sample = df[col].dropna().head(100)
            try:
                pd.to_numeric(sample, errors='coerce')
            except Exception:
                raise TypeError(
                    f"Column '{col}' has incompatible type {df[col].dtype} and "
                    "sample could not be coerced to numeric."
                )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Verify value-range plausibility (sanity bounds).
# -------------------------------------------------------------------------------------------------------------------------------
def _check_value_plausibility(df: pd.DataFrame) -> List[str]:
    """
    Performs vectorized sanity checks on data values to detect gross data corruption
    or ingestion errors (e.g., negative shares, impossible returns).

    Parameters
    ----------
    df : pd.DataFrame
        The CRSP Daily Stock File.

    Returns
    -------
    List[str]
        A list of warning messages describing any plausibility violations.
    """
    warnings = []

    # 1. SHRCD Plausibility
    # CRSP share codes are typically 10, 11, 12, etc.
    # We check if the dataset contains *any* valid common share codes (10, 11).
    # If not, the universe filter will result in an empty set.
    valid_shrcd_mask = df["SHRCD"].isin([10, 11])
    if not valid_shrcd_mask.any():
        warnings.append("CRITICAL: No rows with SHRCD 10 or 11 found. Universe will be empty.")

    # 2. EXCHCD Plausibility
    # We check for presence of NYSE/AMEX/NASDAQ codes (1, 2, 3).
    valid_exch_mask = df["EXCHCD"].isin([1, 2, 3])
    if not valid_exch_mask.any():
        warnings.append("CRITICAL: No rows with EXCHCD 1, 2, or 3 found. Universe will be empty.")

    # 3. RET Plausibility
    # Coerce to numeric for checking (handling CRSP error codes as NaN temporarily)
    # Returns < -1.0 are impossible for simple holding periods (limited liability).
    # Returns > 20.0 (2000%) are extremely rare daily events, likely errors if frequent.
    ret_numeric = pd.to_numeric(df["RET"], errors='coerce')
    if (ret_numeric < -1.0).any():
        count = (ret_numeric < -1.0).sum()
        warnings.append(f"Found {count} rows with RET < -1.0 (impossible). Check data source.")

    # 4. VOL Non-negativity
    # Volume cannot be negative.
    vol_numeric = pd.to_numeric(df["VOL"], errors='coerce')
    if (vol_numeric < 0).any():
        count = (vol_numeric < 0).sum()
        warnings.append(f"Found {count} rows with negative VOL. Check data source.")

    # 5. SHROUT Positivity
    # Shares outstanding must be positive for a traded stock.
    shrout_numeric = pd.to_numeric(df["SHROUT"], errors='coerce')
    # Check strictly negative or zero (where non-missing)
    invalid_shrout = (shrout_numeric <= 0)
    if invalid_shrout.any():
        count = invalid_shrout.sum()
        warnings.append(f"Found {count} rows with SHROUT <= 0. Turnover will be undefined.")

    return warnings

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Report DataFrame shape and missingness summary.
# -------------------------------------------------------------------------------------------------------------------------------
def _summarize_dataframe_state(df: pd.DataFrame) -> Dict[str, Any]:
    """
    Computes summary statistics of the DataFrame to serve as an audit record
    before cleansing.

    Parameters
    ----------
    df : pd.DataFrame
        The CRSP Daily Stock File.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing shape, date ranges, and missingness fractions.
    """
    # 1. Basic Dimensions
    n_rows = len(df)
    n_permnos = df["PERMNO"].nunique()
    min_date = df["DATE"].min()
    max_date = df["DATE"].max()

    # 2. Missingness Fractions
    # We care about missingness in the critical signal columns.
    # Note: We check raw missingness (NaN), not CRSP error codes yet.
    missing_stats = {}
    for col in ["RET", "PRC", "VOL", "SHROUT"]:
        # For object columns, we might want to count non-coercible as missing too,
        # but strictly speaking, NaN is the structural missingness.
        # Task 4 handles the coercion-to-NaN.
        missing_count = df[col].isna().sum()
        missing_stats[f"pct_missing_{col}"] = missing_count / n_rows if n_rows > 0 else 0.0

    # 3. Universe Eligibility Preview
    # Fraction of rows that are NOT common shares (10, 11) or NOT major exchange (1, 2, 3)
    # This helps anticipate how much data will be dropped in Task 6.
    # We handle potential non-numeric types by coercing for the check.
    shrcd_numeric = pd.to_numeric(df["SHRCD"], errors='coerce')
    exchcd_numeric = pd.to_numeric(df["EXCHCD"], errors='coerce')

    ineligible_shrcd = (~shrcd_numeric.isin([10, 11])).sum()
    ineligible_exchcd = (~exchcd_numeric.isin([1, 2, 3])).sum()

    return {
        "n_rows": n_rows,
        "n_unique_permnos": n_permnos,
        "min_date": str(min_date),
        "max_date": str(max_date),
        "missingness": missing_stats,
        "universe_preview": {
            "pct_ineligible_shrcd": ineligible_shrcd / n_rows if n_rows > 0 else 0.0,
            "pct_ineligible_exchcd": ineligible_exchcd / n_rows if n_rows > 0 else 0.0
        }
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def validate_crsp_daily(
    df_crsp_daily: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the validation of the CRSP Daily Stock File. It verifies schema
    compliance, checks for data plausibility, and generates a summary audit report.
    This function does NOT modify the input DataFrame.

    Parameters
    ----------
    df_crsp_daily : pd.DataFrame
        The raw CRSP Daily Stock File.
    study_config : Dict[str, Any]
        The full study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        A validation report containing 'status' (str), 'warnings' (List[str]),
        and 'summary_stats' (Dict).

    Raises
    ------
    ValueError
        If schema validation fails (missing columns).
    TypeError
        If column types are fundamentally incompatible.
    """
    # Extract relevant schema config
    schema = study_config["raw_data_schema"]["crsp_daily_stock_file"]

    # 1. Schema and Type Validation (Raises on failure)
    _check_column_presence_and_types(df_crsp_daily, schema)

    # 2. Plausibility Checks (Returns warnings)
    warnings = _check_value_plausibility(df_crsp_daily)

    # 3. Summary Statistics (Audit log)
    summary_stats = _summarize_dataframe_state(df_crsp_daily)

    # Log warnings if any
    if warnings:
        print("CRSP Daily Data Warnings:")
        for w in warnings:
            print(f"  - {w}")

    return {
        "status": "PASSED",
        "warnings": warnings,
        "summary_stats": summary_stats
    }


In [None]:
# Task 3 — Validate df_crsp_index schema, types, and temporal coverage

# ==============================================================================
# Task 3: Validate df_crsp_index schema, types, and temporal coverage
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Verify required columns and types.
# -------------------------------------------------------------------------------------------------------------------------------
def _check_index_schema(
    df_index: pd.DataFrame,
    schema_config: Dict[str, Any]
) -> None:
    """
    Validates that the CRSP Index DataFrame contains the required columns and types.

    Parameters
    ----------
    df_index : pd.DataFrame
        The CRSP Daily Index File.
    schema_config : Dict[str, Any]
        The 'crsp_daily_index_file' section of the study configuration.

    Raises
    ------
    ValueError
        If required columns are missing.
    TypeError
        If column types are incompatible.
    """
    required_cols = schema_config["required_columns"]
    missing = [c for c in required_cols if c not in df_index.columns]

    if missing:
        raise ValueError(f"CRSP Index validation failed. Missing columns: {missing}")

    # 1. Validate DATE
    if not is_datetime64_any_dtype(df_index["DATE"]):
        raise TypeError(
            f"Index 'DATE' must be datetime64[ns], got {df_index['DATE'].dtype}."
        )

    # 2. Validate vwretd (Value-Weighted Return)
    # Must be numeric. If object, check coercibility.
    if not is_numeric_dtype(df_index["vwretd"]):
        try:
            pd.to_numeric(df_index["vwretd"], errors='raise')
        except Exception:
            raise TypeError(
                f"Index 'vwretd' must be numeric, got {df_index['vwretd'].dtype}."
            )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Verify temporal coverage relative to df_crsp_daily.
# -------------------------------------------------------------------------------------------------------------------------------
def _check_temporal_coverage(
    df_micro: pd.DataFrame,
    df_index: pd.DataFrame
) -> Dict[str, Any]:
    """
    Verifies that the index data covers the full temporal span of the micro data.
    Identifies any trading days present in the micro data but missing from the index.

    Parameters
    ----------
    df_micro : pd.DataFrame
        The CRSP Daily Stock File.
    df_index : pd.DataFrame
        The CRSP Daily Index File.

    Returns
    -------
    Dict[str, Any]
        A coverage report containing date ranges and missing date counts.
    """
    # Normalize to date-only (midnight) to ensure fair comparison
    micro_dates = pd.to_datetime(df_micro["DATE"]).dt.normalize().unique()
    index_dates = pd.to_datetime(df_index["DATE"]).dt.normalize().unique()

    micro_min, micro_max = micro_dates.min(), micro_dates.max()
    index_min, index_max = index_dates.min(), index_dates.max()

    # Check span
    span_warning = []
    if index_min > micro_min:
        span_warning.append(f"Index starts after micro data ({index_min} > {micro_min})")
    if index_max < micro_max:
        span_warning.append(f"Index ends before micro data ({index_max} < {micro_max})")

    # Check for specific gaps (micro dates missing from index)
    # We use set difference for efficiency
    missing_dates = np.setdiff1d(micro_dates, index_dates)
    n_missing = len(missing_dates)

    # Check if missing dates are weekdays (more serious)
    missing_weekdays = 0
    if n_missing > 0:
        missing_dt = pd.to_datetime(missing_dates)
        # dayofweek: 0=Mon, 4=Fri, 5=Sat, 6=Sun
        missing_weekdays = (missing_dt.dayofweek < 5).sum()

    return {
        "micro_range": (str(micro_min), str(micro_max)),
        "index_range": (str(index_min), str(index_max)),
        "span_warnings": span_warning,
        "missing_dates_count": n_missing,
        "missing_weekdays_count": missing_weekdays,
        "missing_dates_sample": [str(d) for d in missing_dates[:5]] if n_missing > 0 else []
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Verify plausibility of vwretd values.
# -------------------------------------------------------------------------------------------------------------------------------
def _check_index_plausibility(df_index: pd.DataFrame) -> List[str]:
    """
    Checks if index returns are in decimal units and within plausible historical bounds.

    Parameters
    ----------
    df_index : pd.DataFrame
        The CRSP Daily Index File.

    Returns
    -------
    List[str]
        A list of warning messages.
    """
    warnings = []

    # Coerce to numeric for checking
    vwretd = pd.to_numeric(df_index["vwretd"], errors='coerce')

    # 1. Unit Check (Decimal vs Percent)
    # If median absolute return > 0.5, it's likely percent (e.g., 1.0 = 1%).
    # Real daily returns are typically ~0.00-0.02.
    median_abs = vwretd.abs().median()
    if median_abs > 0.10: # Threshold: 10% median daily move is impossible
        warnings.append(
            f"CRITICAL: Median absolute index return is {median_abs:.4f}. "
            "Data appears to be in PERCENT, not DECIMAL. Pipeline requires decimal."
        )

    # 2. Extreme Value Check
    # Max daily drop in history (1987) was ~22%.
    # We flag anything > 25% magnitude as suspicious.
    max_abs = vwretd.abs().max()
    if max_abs > 0.25:
        warnings.append(
            f"Warning: Max absolute index return is {max_abs:.4f} (> 0.25). "
            "Verify if this is a valid crash (e.g. 1987) or data error."
        )

    return warnings

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def validate_crsp_index(
    df_crsp_daily: pd.DataFrame,
    df_crsp_index: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the validation of the CRSP Daily Index File. Verifies schema,
    temporal alignment with micro data, and value plausibility.

    Parameters
    ----------
    df_crsp_daily : pd.DataFrame
        The CRSP Daily Stock File (for temporal comparison).
    df_crsp_index : pd.DataFrame
        The CRSP Daily Index File.
    study_config : Dict[str, Any]
        The study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        A validation report.

    Raises
    ------
    ValueError
        If schema validation fails.
    """
    schema = study_config["raw_data_schema"]["crsp_daily_index_file"]

    # 1. Schema Validation
    _check_index_schema(df_crsp_index, schema)

    # 2. Temporal Coverage
    coverage_report = _check_temporal_coverage(df_crsp_daily, df_crsp_index)

    # 3. Plausibility
    plausibility_warnings = _check_index_plausibility(df_crsp_index)

    # Combine warnings
    all_warnings = coverage_report["span_warnings"] + plausibility_warnings

    # Log warnings
    if all_warnings:
        print("CRSP Index Data Warnings:")
        for w in all_warnings:
            print(f"  - {w}")

    if coverage_report["missing_weekdays_count"] > 0:
        print(f"  - CRITICAL: {coverage_report['missing_weekdays_count']} weekdays missing from index.")

    return {
        "status": "PASSED",
        "coverage": coverage_report,
        "warnings": all_warnings
    }


In [None]:
# Task 4 — Cleanse df_crsp_daily (micro data)

# ==============================================================================
# Task 4: Cleanse df_crsp_daily (micro data)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Resolve duplicates, coerce types, and handle CRSP signed-price convention.
# -------------------------------------------------------------------------------------------------------------------------------
def _deduplicate_and_coerce(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Performs deterministic deduplication and type coercion on the CRSP Daily file.
    Handles the CRSP signed-price convention by creating an absolute price column.

    Parameters
    ----------
    df : pd.DataFrame
        Raw CRSP Daily DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, int]]
        - Cleansed DataFrame with numeric columns and 'abs_prc'.
        - Dictionary of cleansing statistics (duplicates dropped, coercion NaNs).
    """
    # 1. Deterministic Deduplication
    # Sort by PERMNO, DATE to ensure 'keep="last"' is deterministic relative to file order
    # (assuming file order has some meaning, otherwise this is at least reproducible).
    # We use a stable sort.
    df_sorted = df.sort_values(by=["PERMNO", "DATE"], kind="stable")

    initial_count = len(df_sorted)
    # Drop exact duplicates first
    df_dedup = df_sorted.drop_duplicates()
    exact_dupes = initial_count - len(df_dedup)

    # Drop duplicates on primary key (PERMNO, DATE), keeping last
    df_dedup = df_dedup.drop_duplicates(subset=["PERMNO", "DATE"], keep="last")
    key_dupes = (initial_count - exact_dupes) - len(df_dedup)

    # 2. Type Coercion
    # Coerce critical columns to numeric, turning errors (e.g. 'C') into NaN
    coercion_stats = {}
    for col in ["RET", "PRC", "VOL", "SHROUT"]:
        # Track pre-existing NaNs to distinguish coercion effects
        pre_nans = df_dedup[col].isna().sum()

        # Coerce
        df_dedup[col] = pd.to_numeric(df_dedup[col], errors="coerce")

        post_nans = df_dedup[col].isna().sum()
        coercion_stats[f"nans_induced_{col}"] = int(post_nans - pre_nans)

    # 3. Handle Signed Price Convention
    # CRSP reports negative prices if they are bid/ask averages.
    # We need absolute price for filters and dollar volume.
    df_dedup["abs_prc"] = df_dedup["PRC"].abs()

    stats = {
        "exact_duplicates_dropped": int(exact_dupes),
        "key_duplicates_dropped": int(key_dupes),
        **coercion_stats
    }

    return df_dedup, stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Drop rows that violate non-missing requirements.
# -------------------------------------------------------------------------------------------------------------------------------
def _enforce_non_missing_requirements(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Drops rows where Returns or Prices are missing, as required by the paper's
    methodology. Retains rows with missing Volume/SharesOut for return-based signals.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame from Step 1.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, int]]
        - Filtered DataFrame.
        - Dictionary of drop counts.
    """
    initial_count = len(df)

    # 1. Drop missing RET
    # Paper: "non-missing returns"
    df_ret_valid = df.dropna(subset=["RET"])
    dropped_ret = initial_count - len(df_ret_valid)

    # 2. Drop missing PRC (abs_prc)
    # Paper: "non-missing... prices" (implied by price filter and universe definition)
    df_valid = df_ret_valid.dropna(subset=["abs_prc"])
    dropped_prc = len(df_ret_valid) - len(df_valid)

    stats = {
        "rows_dropped_missing_ret": int(dropped_ret),
        "rows_dropped_missing_prc": int(dropped_prc),
        "final_row_count": len(df_valid)
    }

    return df_valid, stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Normalize DATE and create auxiliary temporal columns.
# -------------------------------------------------------------------------------------------------------------------------------
def _normalize_dates_and_create_month_key(
    df: pd.DataFrame,
    month_index_type: str = "period_M"
) -> pd.DataFrame:
    """
    Normalizes dates to midnight and creates a canonical 'year_month' column
    for aggregation.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame from Step 2.
    month_index_type : str
        Configuration for month key type. Currently supports 'period_M' (pandas Period).

    Returns
    -------
    pd.DataFrame
        DataFrame with normalized 'DATE' and new 'year_month' column.
    """
    # 1. Normalize DATE to midnight (remove time component)
    # Ensure it's datetime first (Task 2 validated this, but safe to enforce)
    df["DATE"] = pd.to_datetime(df["DATE"]).dt.normalize()

    # 2. Create Canonical Month Key
    # This logic must be identical to Task 5 (Index cleansing)
    if month_index_type == "period_M":
        df["year_month"] = df["DATE"].dt.to_period("M")
    else:
        # Fallback or other types could be implemented here
        raise ValueError(f"Unsupported month_index_type: {month_index_type}")

    return df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def cleanse_crsp_daily(
    df_crsp_daily: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the cleansing of the CRSP Daily Stock File.
    1. Deduplicates and coerces types.
    2. Enforces non-missing return/price requirements.
    3. Normalizes dates and creates month keys.

    Parameters
    ----------
    df_crsp_daily : pd.DataFrame
        Raw CRSP Daily DataFrame.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        - 'df_clean': The cleansed DataFrame.
        - 'audit_log': Dictionary of cleansing statistics.
    """
    # Extract config for month type
    month_type = study_config["feature_engineering"].get("month_index_type", "period_M")

    # Step 1: Deduplicate and Coerce
    df_step1, stats_step1 = _deduplicate_and_coerce(df_crsp_daily)

    # Step 2: Enforce Non-Missing
    df_step2, stats_step2 = _enforce_non_missing_requirements(df_step1)

    # Step 3: Temporal Normalization
    df_final = _normalize_dates_and_create_month_key(df_step2, month_index_type=month_type)

    # Compile Audit Log
    audit_log = {
        **stats_step1,
        **stats_step2,
        "month_index_type_used": month_type
    }

    return {
        "df_clean": df_final,
        "audit_log": audit_log
    }


In [None]:
# Task 5 — Cleanse df_crsp_index and enforce cross-dataset calendar alignment

# ==============================================================================
# Task 5: Cleanse df_crsp_index and enforce cross-dataset calendar alignment
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1: Resolve duplicates and coerce types.
# -------------------------------------------------------------------------------------------------------------------------------
def _deduplicate_and_coerce_index(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Cleanses the CRSP Index DataFrame by removing duplicates and enforcing numeric types.

    Parameters
    ----------
    df : pd.DataFrame
        Raw CRSP Index DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, int]]
        - Cleansed DataFrame.
        - Dictionary of cleansing statistics.
    """
    initial_count = len(df)

    # 1. Deterministic Deduplication
    # Sort by DATE to ensure deterministic 'keep="last"'
    df_sorted = df.sort_values(by="DATE", kind="stable")

    # Drop exact duplicates
    df_dedup = df_sorted.drop_duplicates()
    exact_dupes = initial_count - len(df_dedup)

    # Drop duplicate dates (keep last update)
    df_dedup = df_dedup.drop_duplicates(subset=["DATE"], keep="last")
    date_dupes = (initial_count - exact_dupes) - len(df_dedup)

    # 2. Type Coercion
    # Coerce vwretd to numeric
    pre_nans = df_dedup["vwretd"].isna().sum()
    df_dedup["vwretd"] = pd.to_numeric(df_dedup["vwretd"], errors="coerce")
    post_nans = df_dedup["vwretd"].isna().sum()
    coercion_nans = int(post_nans - pre_nans)

    # 3. Drop Missing vwretd
    # Cannot compute market return or volatility without it
    df_valid = df_dedup.dropna(subset=["vwretd"])
    dropped_missing = len(df_dedup) - len(df_valid)

    stats = {
        "index_exact_duplicates_dropped": int(exact_dupes),
        "index_date_duplicates_dropped": int(date_dupes),
        "index_vwretd_coercion_nans": coercion_nans,
        "index_rows_dropped_missing_vwretd": int(dropped_missing)
    }

    return df_valid, stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 2: Normalize DATE and create year_month column.
# -------------------------------------------------------------------------------------------------------------------------------
def _normalize_index_dates(
    df: pd.DataFrame,
    month_index_type: str = "period_M"
) -> pd.DataFrame:
    """
    Normalizes index dates and creates the canonical month key, matching the logic
    used for micro data.

    Parameters
    ----------
    df : pd.DataFrame
        Cleansed Index DataFrame.
    month_index_type : str
        Configuration for month key type.

    Returns
    -------
    pd.DataFrame
        DataFrame with 'year_month' column.
    """
    # Normalize to midnight
    df["DATE"] = pd.to_datetime(df["DATE"]).dt.normalize()

    # Create Month Key
    if month_index_type == "period_M":
        df["year_month"] = df["DATE"].dt.to_period("M")
    else:
        raise ValueError(f"Unsupported month_index_type: {month_index_type}")

    return df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Verify alignment and log coverage statistics.
# -------------------------------------------------------------------------------------------------------------------------------
def _verify_calendar_alignment(
    df_micro: pd.DataFrame,
    df_index: pd.DataFrame
) -> Dict[str, Any]:
    """
    Verifies that every month present in the micro data has corresponding index data.
    This is a fatal requirement for computing stress labels.

    Parameters
    ----------
    df_micro : pd.DataFrame
        Cleansed Micro DataFrame (from Task 4).
    df_index : pd.DataFrame
        Cleansed Index DataFrame.

    Returns
    -------
    Dict[str, Any]
        Alignment report.

    Raises
    ------
    ValueError
        If micro data contains months missing from the index data.
    """
    # Get unique months
    micro_months = set(df_micro["year_month"].unique())
    index_months = set(df_index["year_month"].unique())

    # Check subset condition
    missing_months = sorted(list(micro_months - index_months))

    if missing_months:
        # Fatal error: We cannot label stress for these months
        raise ValueError(
            f"CRITICAL ALIGNMENT FAILURE: {len(missing_months)} months exist in "
            f"micro data but are missing from index data. Examples: {missing_months[:5]}"
        )

    # Compute trading days per month
    micro_counts = df_micro.groupby("year_month")["DATE"].nunique()
    index_counts = df_index.groupby("year_month")["DATE"].nunique()

    # Compare counts for overlapping months
    # We expect them to be identical or very close (index might have fewer holidays?)
    # Actually, index usually has *more* days if micro filters drop low-activity days,
    # but here we count *potential* trading days.
    # We just log the stats.
    alignment_stats = {
        "n_micro_months": len(micro_months),
        "n_index_months": len(index_months),
        "months_aligned": True
    }

    return alignment_stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def cleanse_crsp_index(
    df_crsp_daily_clean: pd.DataFrame,
    df_crsp_index: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the cleansing of the CRSP Index File and enforces alignment with
    the already-cleansed micro data.

    Parameters
    ----------
    df_crsp_daily_clean : pd.DataFrame
        Cleansed Micro DataFrame (from Task 4).
    df_crsp_index : pd.DataFrame
        Raw CRSP Index DataFrame.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        - 'df_index_clean': Cleansed Index DataFrame.
        - 'audit_log': Cleansing and alignment statistics.
    """
    month_type = study_config["feature_engineering"].get("month_index_type", "period_M")

    # Step 1: Deduplicate and Coerce
    df_step1, stats_step1 = _deduplicate_and_coerce_index(df_crsp_index)

    # Step 2: Temporal Normalization
    df_final = _normalize_index_dates(df_step1, month_index_type=month_type)

    # Step 3: Verify Alignment
    # This raises ValueError if alignment fails
    alignment_stats = _verify_calendar_alignment(df_crsp_daily_clean, df_final)

    audit_log = {
        **stats_step1,
        **alignment_stats,
        "month_index_type_used": month_type
    }

    return {
        "df_index_clean": df_final,
        "audit_log": audit_log
    }


In [None]:
# Task 6 — Construct the eligible daily universe (apply all paper filters)

# ==============================================================================
# Task 6: Construct the eligible daily universe (apply all paper filters)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1: Apply share-code and exchange-code filters.
# -------------------------------------------------------------------------------------------------------------------------------
def _apply_universe_filters(
    df: pd.DataFrame,
    universe_config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Applies share code and exchange code filters to define the eligible universe.

    Parameters
    ----------
    df : pd.DataFrame
        Cleansed CRSP Daily DataFrame.
    universe_config : Dict[str, Any]
        Configuration containing valid share and exchange codes.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, int]]
        - Filtered DataFrame.
        - Dictionary of drop statistics.
    """
    initial_count = len(df)

    # Extract valid codes
    valid_shrcd = universe_config["valid_share_codes"]
    valid_exchcd = universe_config["valid_exch_codes"]

    # Apply Filters
    # 1. Share Code Filter (Ordinary Common Shares: 10, 11)
    mask_shrcd = df["SHRCD"].isin(valid_shrcd)
    df_shrcd = df[mask_shrcd]
    dropped_shrcd = initial_count - len(df_shrcd)

    # 2. Exchange Code Filter (NYSE/AMEX/NASDAQ: 1, 2, 3)
    mask_exchcd = df_shrcd["EXCHCD"].isin(valid_exchcd)
    df_exchcd = df_shrcd[mask_exchcd]
    dropped_exchcd = len(df_shrcd) - len(df_exchcd)

    stats = {
        "rows_dropped_shrcd_filter": int(dropped_shrcd),
        "rows_dropped_exchcd_filter": int(dropped_exchcd)
    }

    return df_exchcd, stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 2: Apply the penny-stock (microstructure) price filter.
# -------------------------------------------------------------------------------------------------------------------------------
def _apply_price_filter(
    df: pd.DataFrame,
    universe_config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Applies the minimum price filter to remove penny stocks.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame filtered by share/exchange codes.
    universe_config : Dict[str, Any]
        Configuration containing the minimum price threshold.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, int]]
        - Filtered DataFrame.
        - Dictionary of drop statistics.
    """
    initial_count = len(df)
    min_price = universe_config["min_abs_price_usd"]

    # Apply Price Filter (|PRC| >= 1.00)
    # We use 'abs_prc' which was created in Task 4
    mask_price = df["abs_prc"] >= min_price
    df_price = df[mask_price]
    dropped_price = initial_count - len(df_price)

    stats = {
        "rows_dropped_price_filter": int(dropped_price)
    }

    return df_price, stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Define daily cross-section size Nd and produce the final eligible panel.
# -------------------------------------------------------------------------------------------------------------------------------
def _finalize_universe_and_compute_Nd(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.Series]:
    """
    Computes the daily cross-section size (Nd) and prepares the final eligible panel.

    Parameters
    ----------
    df : pd.DataFrame
        Fully filtered DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, pd.Series]
        - Final eligible panel with selected columns.
        - Series of Nd (number of eligible stocks) indexed by DATE.
    """
    # Compute Nd: Count of unique PERMNOs per DATE
    # Since we deduplicated in Task 4, simple count is sufficient, but nunique is safer
    nd_series = df.groupby("DATE")["PERMNO"].nunique().rename("n_stocks")

    # Select only necessary columns for downstream tasks to optimize memory
    # We need: DATE, year_month, PERMNO, RET, abs_prc, VOL, SHROUT
    cols_to_keep = ["DATE", "year_month", "PERMNO", "RET", "abs_prc", "VOL", "SHROUT"]
    df_final = df[cols_to_keep].copy()

    # Ensure sorted by DATE, PERMNO for efficient grouping later
    df_final = df_final.sort_values(by=["DATE", "PERMNO"])

    return df_final, nd_series

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def construct_eligible_universe(
    df_crsp_daily_clean: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the construction of the eligible daily universe by applying
    share code, exchange code, and price filters.

    Parameters
    ----------
    df_crsp_daily_clean : pd.DataFrame
        Cleansed CRSP Daily DataFrame (from Task 4).
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        - 'df_eligible': Final eligible daily panel.
        - 'nd_series': Daily cross-section size Series.
        - 'audit_log': Filtering statistics.
    """
    universe_config = study_config["universe_construction"]

    # Step 1: Share/Exchange Code Filters
    df_step1, stats_step1 = _apply_universe_filters(df_crsp_daily_clean, universe_config)

    # Step 2: Price Filter
    df_step2, stats_step2 = _apply_price_filter(df_step1, universe_config)

    # Step 3: Finalize and Compute Nd
    df_final, nd_series = _finalize_universe_and_compute_Nd(df_step2)

    audit_log = {
        **stats_step1,
        **stats_step2,
        "final_universe_row_count": len(df_final),
        "final_universe_unique_dates": len(nd_series)
    }

    return {
        "df_eligible": df_final,
        "nd_series": nd_series,
        "audit_log": audit_log
    }


In [None]:
# Task 7 — Compute daily cross-sectional return distribution moments

# ==============================================================================
# Task 7: Compute daily cross-sectional return distribution moments
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1: Compute daily cross-sectional mean and dispersion.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_mean_and_dispersion(
    df: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes the daily cross-sectional mean return and population standard deviation.

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.

    Returns
    -------
    pd.DataFrame
        DataFrame indexed by DATE with columns 'mean_ret' and 'sigma_xs'.
    """
    # Group by DATE
    grouped = df.groupby("DATE")["RET"]

    # Compute Mean
    # Equation: \bar{r}_d = (1/N_d) * sum(r_{i,d})
    mean_ret = grouped.mean().rename("mean_ret")

    # Compute Population Standard Deviation (ddof=0)
    # Equation: sigma^{xs}_d = sqrt((1/N_d) * sum((r_{i,d} - \bar{r}_d)^2))
    sigma_xs = grouped.std(ddof=0).rename("sigma_xs")

    # Combine into DataFrame
    return pd.concat([mean_ret, sigma_xs], axis=1)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 2: Compute daily cross-sectional skewness.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_skewness(
    df: pd.DataFrame,
    daily_stats: pd.DataFrame,
    epsilon: float = 1e-8
) -> pd.Series:
    """
    Computes the daily cross-sectional skewness using the population formula.
    Handles cases with near-zero dispersion by setting skewness to NaN.

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.
    daily_stats : pd.DataFrame
        DataFrame containing 'mean_ret' and 'sigma_xs'.
    epsilon : float
        Threshold for zero dispersion.

    Returns
    -------
    pd.Series
        Series of daily skewness indexed by DATE.
    """
    # Merge daily mean back to panel to center returns
    # We use the index (DATE) for merging
    df_merged = df.join(daily_stats["mean_ret"], on="DATE")

    # Compute centered returns
    df_merged["centered_ret"] = df_merged["RET"] - df_merged["mean_ret"]

    # Compute 3rd central moment: mean((r - r_bar)^3)
    # Note: groupby().mean() divides by N_d, which matches the population formula 1/N_d
    m3 = df_merged.groupby("DATE")["centered_ret"].apply(lambda x: (x**3).mean())

    # Get sigma_xs aligned
    sigma = daily_stats["sigma_xs"]

    # Compute Skewness
    # Equation: Skew = m3 / sigma^3
    # Handle division by zero
    valid_sigma = sigma > epsilon
    skew_xs = pd.Series(np.nan, index=sigma.index, name="Skew_xs")
    skew_xs[valid_sigma] = m3[valid_sigma] / (sigma[valid_sigma] ** 3)

    return skew_xs

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 3: Compute daily cross-sectional kurtosis.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_kurtosis(
    df: pd.DataFrame,
    daily_stats: pd.DataFrame,
    epsilon: float = 1e-8
) -> pd.Series:
    """
    Computes the daily cross-sectional raw kurtosis using the population formula.
    Handles cases with near-zero dispersion by setting kurtosis to NaN.

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.
    daily_stats : pd.DataFrame
        DataFrame containing 'mean_ret' and 'sigma_xs'.
    epsilon : float
        Threshold for zero dispersion.

    Returns
    -------
    pd.Series
        Series of daily kurtosis indexed by DATE.
    """
    # Merge daily mean back (or reuse if we passed the merged frame, but for modularity we re-merge or assume logic holds)
    # Optimization: In orchestrator, we can pass the centered returns, but here we stick to the interface.
    df_merged = df.join(daily_stats["mean_ret"], on="DATE")
    df_merged["centered_ret"] = df_merged["RET"] - df_merged["mean_ret"]

    # Compute 4th central moment: mean((r - r_bar)^4)
    m4 = df_merged.groupby("DATE")["centered_ret"].apply(lambda x: (x**4).mean())

    # Get sigma_xs aligned
    sigma = daily_stats["sigma_xs"]

    # Compute Kurtosis (Raw)
    # Equation: Kurt = m4 / sigma^4
    valid_sigma = sigma > epsilon
    kurt_xs = pd.Series(np.nan, index=sigma.index, name="Kurt_xs")
    kurt_xs[valid_sigma] = m4[valid_sigma] / (sigma[valid_sigma] ** 4)

    return kurt_xs

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_daily_return_moments(
    df_eligible: pd.DataFrame,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the computation of daily cross-sectional return moments:
    Mean, Dispersion (Sigma), Skewness, and Kurtosis.

    Parameters
    ----------
    df_eligible : pd.DataFrame
        The eligible daily stock panel.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    pd.DataFrame
        DataFrame indexed by DATE containing the computed moments.
    """
    epsilon = study_config["feature_engineering"].get("std_epsilon", 1e-8)

    # Step 1: Mean and Dispersion
    daily_stats = _compute_mean_and_dispersion(df_eligible)

    # Step 2: Skewness
    skew_xs = _compute_skewness(df_eligible, daily_stats, epsilon)

    # Step 3: Kurtosis
    kurt_xs = _compute_kurtosis(df_eligible, daily_stats, epsilon)

    # Combine all moments
    final_stats = pd.concat([daily_stats, skew_xs, kurt_xs], axis=1)

    return final_stats


In [None]:
# Task 8 — Compute daily tail participation shares and mean absolute return

# ==============================================================================
# Task 8: Compute daily tail participation shares and mean absolute return
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1: Compute mean absolute return.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_mean_absolute_return(
    df: pd.DataFrame
) -> pd.Series:
    """
    Computes the daily cross-sectional mean absolute return.

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.

    Returns
    -------
    pd.Series
        Series of mean absolute returns indexed by DATE.
    """
    # Equation: \overline{|r|}_d = (1/N_d) * sum(|r_{i,d}|)
    return df["RET"].abs().groupby(df["DATE"]).mean().rename("mean_abs_ret")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 2: Compute fraction of extreme losers.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_frac_down(
    df: pd.DataFrame,
    tau: float
) -> pd.Series:
    """
    Computes the daily fraction of stocks with returns <= -tau.

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.
    tau : float
        Tail threshold (e.g., 0.05).

    Returns
    -------
    pd.Series
        Series of downside tail fractions indexed by DATE.
    """
    # Equation: Frac^{dn}_d(tau) = (1/N_d) * sum(I{r_{i,d} <= -tau})
    # Note: Inequality is inclusive per paper definition
    is_down = df["RET"] <= -tau
    return is_down.groupby(df["DATE"]).mean().rename(f"Frac_dn_{int(tau*100)}pct")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 3: Compute fraction of extreme winners.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_frac_up(
    df: pd.DataFrame,
    tau: float
) -> pd.Series:
    """
    Computes the daily fraction of stocks with returns >= tau.

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.
    tau : float
        Tail threshold (e.g., 0.05).

    Returns
    -------
    pd.Series
        Series of upside tail fractions indexed by DATE.
    """
    # Equation: Frac^{up}_d(tau) = (1/N_d) * sum(I{r_{i,d} >= tau})
    is_up = df["RET"] >= tau
    return is_up.groupby(df["DATE"]).mean().rename(f"Frac_up_{int(tau*100)}pct")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_daily_tail_measures(
    df_eligible: pd.DataFrame,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the computation of daily tail risk measures and mean absolute return.

    Parameters
    ----------
    df_eligible : pd.DataFrame
        The eligible daily stock panel.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    pd.DataFrame
        DataFrame indexed by DATE containing the computed tail measures.
    """
    tau = study_config["feature_engineering"]["tail_threshold_tau"]

    # Step 1: Mean Absolute Return
    mean_abs_ret = _compute_mean_absolute_return(df_eligible)

    # Step 2: Downside Tail Fraction
    frac_dn = _compute_frac_down(df_eligible, tau)

    # Step 3: Upside Tail Fraction
    frac_up = _compute_frac_up(df_eligible, tau)

    # Combine
    final_tails = pd.concat([mean_abs_ret, frac_dn, frac_up], axis=1)

    return final_tails


In [None]:
# Task 9 — Compute daily trading-intensity proxies

# ==============================================================================
# Task 9: Compute daily trading-intensity proxies
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 1: Compute cross-sectional average log volume.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_log_volume(
    df: pd.DataFrame,
    offset: float = 1.0
) -> Tuple[pd.Series, pd.Series]:
    """
    Computes the daily cross-sectional average of log(1 + Volume).

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.
    offset : float
        Offset for log transformation (default 1.0).

    Returns
    -------
    Tuple[pd.Series, pd.Series]
        - Series of mean log volume indexed by DATE.
        - Series of valid volume counts (N_vol) indexed by DATE.
    """
    # Filter valid volume
    valid_vol = df["VOL"].dropna()
    valid_vol = valid_vol[valid_vol >= 0]

    # Compute log transform
    log_vol = np.log(offset + valid_vol)

    # Group by DATE (using the index of the filtered series which aligns with df)
    grouped = log_vol.groupby(df.loc[log_vol.index, "DATE"])

    mean_log_vol = grouped.mean().rename("mean_log1p_vol")
    n_vol = grouped.count().rename("N_vol")

    return mean_log_vol, n_vol

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 2: Compute cross-sectional average dollar volume.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_dollar_volume(
    df: pd.DataFrame
) -> Tuple[pd.Series, pd.Series]:
    """
    Computes the daily cross-sectional average dollar volume (|Price| * Volume).

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.

    Returns
    -------
    Tuple[pd.Series, pd.Series]
        - Series of mean dollar volume indexed by DATE.
        - Series of valid dollar volume counts (N_dol) indexed by DATE.
    """
    # Compute dollar volume
    # We use abs_prc which is guaranteed non-negative
    # We must ensure both VOL and abs_prc are valid
    valid_mask = df["VOL"].notna() & (df["VOL"] >= 0) & df["abs_prc"].notna()
    df_valid = df[valid_mask].copy()

    df_valid["dollar_vol"] = df_valid["abs_prc"] * df_valid["VOL"]

    grouped = df_valid.groupby("DATE")["dollar_vol"]

    mean_dol_vol = grouped.mean().rename("mean_dollar_vol")
    n_dol = grouped.count().rename("N_dol")

    return mean_dol_vol, n_dol

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 3: Compute cross-sectional average turnover.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_turnover(
    df: pd.DataFrame,
    shares_scaling: float = 1000.0
) -> Tuple[pd.Series, pd.Series]:
    """
    Computes the daily cross-sectional average turnover (Volume / SharesOutstanding).
    Handles CRSP unit conventions (SHROUT in thousands).

    Parameters
    ----------
    df : pd.DataFrame
        Eligible daily panel.
    shares_scaling : float
        Scaling factor for SHROUT (default 1000.0).

    Returns
    -------
    Tuple[pd.Series, pd.Series]
        - Series of mean turnover indexed by DATE.
        - Series of valid turnover counts (N_turn) indexed by DATE.
    """
    # Filter valid inputs
    # SHROUT must be positive for division
    valid_mask = (
        df["VOL"].notna() & (df["VOL"] >= 0) &
        df["SHROUT"].notna() & (df["SHROUT"] > 0)
    )
    df_valid = df[valid_mask].copy()

    # Compute Turnover
    # Equation: Turn = Vol / (1000 * SHROUT)
    df_valid["turnover"] = df_valid["VOL"] / (shares_scaling * df_valid["SHROUT"])

    # Filter infinite values if any (though SHROUT > 0 prevents div/0)
    df_valid = df_valid[np.isfinite(df_valid["turnover"])]

    grouped = df_valid.groupby("DATE")["turnover"]

    mean_turn = grouped.mean().rename("mean_turnover")
    n_turn = grouped.count().rename("N_turn")

    return mean_turn, n_turn

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_daily_trading_proxies(
    df_eligible: pd.DataFrame,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the computation of daily trading intensity proxies:
    Log Volume, Dollar Volume, and Turnover.

    Parameters
    ----------
    df_eligible : pd.DataFrame
        The eligible daily stock panel.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    pd.DataFrame
        DataFrame indexed by DATE containing the computed proxies and coverage counts.
    """
    # Extract Key Parameters
    feat_config = study_config["feature_engineering"]
    log_offset = feat_config.get("log_volume_offset", 1.0)
    shares_scaling = feat_config["turnover"].get("shares_out_scaling", 1000.0)

    # Step 1: Log Volume
    mean_log_vol, n_vol = _compute_log_volume(df_eligible, log_offset)

    # Step 2: Dollar Volume
    mean_dol_vol, n_dol = _compute_dollar_volume(df_eligible)

    # Step 3: Turnover
    mean_turn, n_turn = _compute_turnover(df_eligible, shares_scaling)

    # Combine
    final_proxies = pd.concat([
        mean_log_vol, n_vol,
        mean_dol_vol, n_dol,
        mean_turn, n_turn
    ], axis=1)

    return final_proxies


In [None]:
# Task 10: Aggregate daily cross-sectional statistics to monthly predictors X_t

# ==============================================================================
# Task 10: Aggregate daily cross-sectional statistics to monthly predictors X_t
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1: Apply the paper's within-month averaging operator to every daily statistic.
# -------------------------------------------------------------------------------------------------------------------------------
def _aggregate_daily_to_monthly(
    daily_stats: pd.DataFrame,
    nd_series: pd.Series
) -> Tuple[pd.DataFrame, pd.Series]:
    """
    Aggregates daily statistics to monthly frequency by computing the within-month mean.

    Parameters
    ----------
    daily_stats : pd.DataFrame
        DataFrame of daily statistics indexed by DATE.
    nd_series : pd.Series
        Series of daily cross-section sizes (Nd) indexed by DATE.

    Returns
    -------
    Tuple[pd.DataFrame, pd.Series]
        - Monthly features DataFrame indexed by year_month.
        - Series of trading days per month (Dt).
    """
    # Merge Nd into stats for aggregation
    df_all = daily_stats.join(nd_series, how="inner")

    # Ensure year_month column exists (it was created in Task 4/6 but might be lost if index was DATE)
    # We need to recover it from the DATE index
    if "year_month" not in df_all.columns:
        # Assuming DATE index is datetime
        df_all["year_month"] = df_all.index.to_period("M")

    # Group by year_month
    grouped = df_all.groupby("year_month")

    # Compute monthly means (Z_t = 1/D_t * sum(z_d))
    monthly_stats = grouped.mean()

    # Compute D_t (number of trading days)
    dt_series = grouped.size().rename("n_trading_days")

    return monthly_stats, dt_series

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2: Assemble the monthly feature vector X_t with canonical column names.
# -------------------------------------------------------------------------------------------------------------------------------
def _format_monthly_features(
    monthly_stats: pd.DataFrame,
    feature_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Renames and reorders columns to match the canonical feature set X_t.

    Parameters
    ----------
    monthly_stats : pd.DataFrame
        Raw monthly aggregated statistics.
    feature_config : Dict[str, Any]
        Configuration containing the final feature names list.

    Returns
    -------
    pd.DataFrame
        Canonical X_t DataFrame.
    """
    # Define mapping from daily stat names to monthly feature names
    # This mapping must align with the column names produced in Tasks 6-9
    # Daily Name -> Monthly Name
    rename_map = {
        "n_stocks": "n_stocks",
        "sigma_xs": "sigma_xs",
        "Skew_xs": "Skew_xs",
        "Kurt_xs": "Kurt_xs",
        "mean_abs_ret": "mean_abs_ret",
        "Frac_dn_5pct": "Frac_dn_5pct",
        "Frac_up_5pct": "Frac_up_5pct",
        "mean_log1p_vol": "mean_log1p_vol",
        "mean_dollar_vol": "mean_dollar_vol",
        "mean_turnover": "mean_turnover"
    }

    # Rename
    df_renamed = monthly_stats.rename(columns=rename_map)

    # Select and Reorder
    final_cols = feature_config["final_feature_names_monthly"]

    # Check for missing columns
    missing = [c for c in final_cols if c not in df_renamed.columns]
    if missing:
        raise ValueError(f"Missing columns for X_t: {missing}")

    return df_renamed[final_cols]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 3: Drop months with insufficient daily coverage.
# -------------------------------------------------------------------------------------------------------------------------------
def _filter_insufficient_months(
    xt: pd.DataFrame,
    dt_series: pd.Series,
    min_days: int = 15
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Drops months with fewer than `min_days` trading days to ensure robust monthly estimates.

    Parameters
    ----------
    xt : pd.DataFrame
        Monthly feature DataFrame.
    dt_series : pd.Series
        Series of trading days per month.
    min_days : int
        Minimum required trading days (default 15).

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, int]]
        - Filtered X_t DataFrame.
        - Drop statistics.
    """
    # Align indices
    dt_aligned = dt_series.reindex(xt.index)

    # Identify valid months
    valid_mask = dt_aligned >= min_days

    # Filter
    xt_filtered = xt[valid_mask].copy()
    dropped_count = len(xt) - len(xt_filtered)

    stats = {
        "months_dropped_insufficient_days": int(dropped_count),
        "min_trading_days_threshold": min_days
    }

    return xt_filtered, stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def aggregate_monthly_features(
    daily_stats_all: pd.DataFrame,
    nd_series: pd.Series,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the aggregation of daily statistics into the monthly feature matrix X_t.

    Parameters
    ----------
    daily_stats_all : pd.DataFrame
        Consolidated DataFrame of all daily statistics (moments, tails, proxies).
    nd_series : pd.Series
        Series of daily universe sizes.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        - 'X_t': Monthly feature DataFrame.
        - 'audit_log': Aggregation statistics.
    """
    feat_config = study_config["feature_engineering"]
    # Default min days to 15 if not specified (safe for monthly stats)
    min_days = feat_config.get("min_trading_days_per_month", 15)

    # Step 1: Aggregate
    monthly_raw, dt_series = _aggregate_daily_to_monthly(daily_stats_all, nd_series)

    # Step 2: Format
    xt_formatted = _format_monthly_features(monthly_raw, feat_config)

    # Step 3: Filter Coverage
    xt_final, stats = _filter_insufficient_months(xt_formatted, dt_series, min_days)

    audit_log = {
        **stats,
        "final_month_count": len(xt_final)
    }

    return {
        "X_t": xt_final,
        "audit_log": audit_log
    }


In [None]:
# Task 11 — Construct monthly market aggregates \(R^{mkt}_t\) and \(\sigma^{mkt}_t\)

# ==============================================================================
# Task 11: Construct monthly market aggregates R^{mkt}_t and sigma^{mkt}_t
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1: Compute monthly market return by compounding daily `vwretd`.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_monthly_market_return(
    df_index: pd.DataFrame,
    min_days: int = 15
) -> pd.Series:
    """
    Computes the monthly market return by compounding daily value-weighted returns.

    Parameters
    ----------
    df_index : pd.DataFrame
        Cleansed CRSP Index DataFrame.
    min_days : int
        Minimum trading days required to compute a valid monthly return.

    Returns
    -------
    pd.Series
        Series of monthly market returns indexed by year_month.
    """
    # Group by year_month
    grouped = df_index.groupby("year_month")["vwretd"]

    # Filter groups with sufficient days
    # We do this by computing count first
    counts = grouped.count()
    valid_months = counts[counts >= min_days].index

    # Filter original df to valid months for efficiency (or just apply and mask)
    # Applying directly is cleaner
    def compound_ret(x):
        if len(x) < min_days:
            return np.nan
        return np.prod(1 + x) - 1

    # Compute compounded return
    # Equation: R_t = prod(1 + r_d) - 1
    r_mkt = grouped.apply(compound_ret).rename("R_mkt")

    return r_mkt.dropna()

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 2: Compute monthly realized volatility (annualized).
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_monthly_realized_vol(
    df_index: pd.DataFrame,
    study_config: Dict[str, Any],
    min_days: int = 15
) -> pd.Series:
    """
    Computes the annualized monthly realized volatility.

    Parameters
    ----------
    df_index : pd.DataFrame
        Cleansed CRSP Index DataFrame.
    study_config : Dict[str, Any]
        Configuration containing annualization factor and ddof.
    min_days : int
        Minimum trading days required.

    Returns
    -------
    pd.Series
        Series of annualized realized volatility indexed by year_month.
    """
    agg_config = study_config["market_aggregates"]
    ann_factor = agg_config.get("annualization_factor", 252**0.5)
    ddof = agg_config.get("realized_vol_ddof", 1)

    grouped = df_index.groupby("year_month")["vwretd"]

    def realized_vol(x):
        if len(x) < min_days:
            return np.nan
        return x.std(ddof=ddof) * ann_factor

    # Equation: sigma_t = sqrt(252) * std(r_d)
    sigma_mkt = grouped.apply(realized_vol).rename("sigma_mkt")

    return sigma_mkt.dropna()

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 3: Align monthly market aggregates to the same year_month index as X_t.
# -------------------------------------------------------------------------------------------------------------------------------
def _align_market_and_features(
    xt: pd.DataFrame,
    r_mkt: pd.Series,
    sigma_mkt: pd.Series
) -> pd.DataFrame:
    """
    Aligns market aggregates with the feature matrix X_t, ensuring a common sample.

    Parameters
    ----------
    xt : pd.DataFrame
        Monthly feature matrix X_t.
    r_mkt : pd.Series
        Monthly market return.
    sigma_mkt : pd.Series
        Monthly realized volatility.

    Returns
    -------
    pd.DataFrame
        Merged DataFrame containing X_t, R_mkt, and sigma_mkt.
    """
    # Combine market vars
    market_df = pd.concat([r_mkt, sigma_mkt], axis=1)

    # Inner join with X_t to ensure common months
    # This drops months where either features or market data are missing/insufficient
    aligned_df = xt.join(market_df, how="inner")

    return aligned_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def construct_market_aggregates(
    df_crsp_index_clean: pd.DataFrame,
    xt: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the construction of monthly market aggregates and aligns them with features.

    Parameters
    ----------
    df_crsp_index_clean : pd.DataFrame
        Cleansed CRSP Index DataFrame.
    xt : pd.DataFrame
        Monthly feature matrix X_t.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        - 'aligned_panel': DataFrame with features and market aggregates.
        - 'audit_log': Construction statistics.
    """
    min_days = study_config["feature_engineering"].get("min_trading_days_per_month", 15)

    # Step 1: Market Return
    r_mkt = _compute_monthly_market_return(df_crsp_index_clean, min_days)

    # Step 2: Realized Volatility
    sigma_mkt = _compute_monthly_realized_vol(df_crsp_index_clean, study_config, min_days)

    # Step 3: Alignment
    aligned_panel = _align_market_and_features(xt, r_mkt, sigma_mkt)

    audit_log = {
        "market_return_months": len(r_mkt),
        "realized_vol_months": len(sigma_mkt),
        "aligned_months": len(aligned_panel),
        "dropped_months_alignment": len(xt) - len(aligned_panel)
    }

    return {
        "aligned_panel": aligned_panel,
        "audit_log": audit_log
    }


In [None]:
# Task 12 — Construct real-time stress labels \(S_t\) and supervised targets \(Y_{t+1}\)

# ==============================================================================
# Task 12: Construct real-time stress labels S_t and supervised targets Y_{t+1}
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1: Compute the expanding real-time volatility quantile \(q_{t-1}(\alpha)\).
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_expanding_vol_quantile(
    sigma_mkt: pd.Series,
    alpha: float = 0.90,
    min_history: int = 12,
    interpolation: str = "linear"
) -> pd.Series:
    """
    Computes the expanding real-time volatility quantile q_{t-1}(alpha).
    Strictly uses data up to t-1 to define the threshold for month t.

    Parameters
    ----------
    sigma_mkt : pd.Series
        Monthly realized volatility indexed by year_month.
    alpha : float
        Quantile level (e.g., 0.90).
    min_history : int
        Minimum months of history required to compute a quantile.
    interpolation : str
        Interpolation method for quantile.

    Returns
    -------
    pd.Series
        Series of quantiles indexed by year_month (aligned to the month t for which it serves as threshold).
    """
    # We need q_{t-1}.
    # Pandas expanding().quantile() at index t includes index t.
    # So we compute expanding quantile on the series, then shift by 1.
    # The shift(1) moves the quantile computed at t-1 to position t.
    quantiles = sigma_mkt.expanding(min_periods=min_history).quantile(
        quantile=alpha,
        interpolation=interpolation
    ).shift(1)

    quantiles.name = f"q_vol_{int(alpha*100)}"
    return quantiles

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 2: Apply the paper's stress definition.
# -------------------------------------------------------------------------------------------------------------------------------
def _define_stress_label(
    r_mkt: pd.Series,
    sigma_mkt: pd.Series,
    q_vol: pd.Series,
    c_r: float = -0.05
) -> pd.Series:
    """
    Constructs the binary stress label S_t based on market return and volatility.

    Parameters
    ----------
    r_mkt : pd.Series
        Monthly market return.
    sigma_mkt : pd.Series
        Monthly realized volatility.
    q_vol : pd.Series
        Expanding volatility quantile threshold (q_{t-1}).
    c_r : float
        Return cutoff (e.g., -0.05).

    Returns
    -------
    pd.Series
        Binary Series S_t (1 if stress, 0 if not, NaN if threshold undefined).
    """
    # Align all series
    df = pd.concat([r_mkt, sigma_mkt, q_vol], axis=1)
    df.columns = ["R_mkt", "sigma_mkt", "q_vol"]

    # Initialize as NaN (unknown stress status if history insufficient)
    s_t = pd.Series(np.nan, index=df.index, name="S_t")

    # Valid mask: where we have data and a valid threshold
    valid_mask = df["q_vol"].notna() & df["R_mkt"].notna() & df["sigma_mkt"].notna()

    # Apply definition on valid rows
    # S_t = 1 if (R_t <= c_R) OR (sigma_t >= q_{t-1})
    # Note: inequalities are inclusive per paper
    is_crash = df.loc[valid_mask, "R_mkt"] <= c_r
    is_vol_spike = df.loc[valid_mask, "sigma_mkt"] >= df.loc[valid_mask, "q_vol"]

    s_t[valid_mask] = (is_crash | is_vol_spike).astype(int)

    return s_t

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Define the supervised target and enforce real-time feasibility assertion.
# -------------------------------------------------------------------------------------------------------------------------------
def _create_supervised_target(
    panel: pd.DataFrame,
    s_t: pd.Series
) -> pd.DataFrame:
    """
    Creates the supervised learning target Y_{t+1} = S_{t+1} and aligns it with predictors X_t.

    Parameters
    ----------
    panel : pd.DataFrame
        Panel containing features and contemporaneous market vars (X_t, R_t, sigma_t).
    s_t : pd.Series
        Stress label S_t.

    Returns
    -------
    pd.DataFrame
        Panel with added 'S_t' and 'Y_next' columns.
        Rows where Y_next is NaN (last month) are dropped or kept as NaN depending on policy.
        Here we keep them but they can't be used for training.
    """
    # Add S_t to panel
    panel = panel.copy()
    panel["S_t"] = s_t

    # Create Target Y_{t+1}
    # Shift S_t backwards by 1: The label at t+1 becomes the target for row t
    panel["Y_next"] = s_t.shift(-1)

    # Assert Real-Time Feasibility
    # We verify that Y_next for row t corresponds to S_{t+1}
    # (This is guaranteed by shift(-1) on a sorted index, but we assert index monotonicity)
    if not panel.index.is_monotonic_increasing:
        raise ValueError("Panel index must be sorted chronologically to create targets.")

    return panel

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def construct_stress_labels(
    aligned_panel: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the construction of stress labels and supervised targets.

    Parameters
    ----------
    aligned_panel : pd.DataFrame
        Panel with X_t, R_mkt, sigma_mkt.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        - 'modeling_panel': Final DataFrame with features, labels, and targets.
        - 'audit_log': Labeling statistics.
    """
    # Extract parameters
    target_config = study_config["target_definition"]
    alpha = target_config["vol_quantile_alpha"]
    c_r = target_config["return_cutoff_c_R"]
    min_hist = target_config.get("min_vol_history_months", 12)
    interp = target_config.get("vol_quantile_interpolation", "linear")

    # Step 1: Volatility Threshold
    q_vol = _compute_expanding_vol_quantile(
        aligned_panel["sigma_mkt"], alpha, min_hist, interp
    )

    # Step 2: Stress Label S_t
    s_t = _define_stress_label(
        aligned_panel["R_mkt"], aligned_panel["sigma_mkt"], q_vol, c_r
    )

    # Step 3: Target Y_{t+1}
    final_panel = _create_supervised_target(aligned_panel, s_t)

    # Audit
    valid_targets = final_panel["Y_next"].notna().sum()
    stress_rate = final_panel["Y_next"].mean()

    audit_log = {
        "months_with_valid_threshold": q_vol.notna().sum(),
        "months_with_valid_target": int(valid_targets),
        "unconditional_stress_rate": float(stress_rate) if valid_targets > 0 else 0.0
    }

    return {
        "modeling_panel": final_panel,
        "audit_log": audit_log
    }


In [None]:
# Task 13 — Define the expanding-window standardization callable

# ==============================================================================
# Task 13: Define the expanding-window standardization callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1: Implement training-window mean and standard deviation computation.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_training_moments(
    x_train: pd.DataFrame,
    std_epsilon: float = 1e-8
) -> Tuple[pd.Series, pd.Series]:
    """
    Computes mean and population standard deviation on the training window.
    Enforces complete-case analysis (drops rows with NaNs) to ensure robust moments.

    Parameters
    ----------
    x_train : pd.DataFrame
        Feature matrix for the training window.
    std_epsilon : float
        Threshold for zero variance.

    Returns
    -------
    Tuple[pd.Series, pd.Series]
        - Means indexed by feature name.
        - Standard deviations indexed by feature name.

    Raises
    ------
    ValueError
        If a feature is entirely NaN in the training window.
    """
    # Enforce complete-case for moment estimation
    # This avoids bias from inconsistent sample sizes across features
    x_clean = x_train.dropna()

    if len(x_clean) == 0:
        # If dropping NaNs leaves no data, we can't standardize.
        # However, if x_train was not empty but had NaNs, this is a data quality issue.
        # We check if columns were all-NaN originally.
        all_nan_cols = x_train.columns[x_train.isna().all()].tolist()
        if all_nan_cols:
            raise ValueError(f"Features entirely NaN in training window: {all_nan_cols}")
        # If not all-NaN but rows disjointly missing, still an issue
        raise ValueError("Training window has 0 complete cases after dropping NaNs.")

    # Compute Moments
    mu = x_clean.mean()

    # Population std (ddof=0) per paper
    sigma = x_clean.std(ddof=0)

    return mu, sigma

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 2: Apply z-score transformation.
# -------------------------------------------------------------------------------------------------------------------------------
def _apply_zscore(
    x: Union[pd.DataFrame, pd.Series],
    mu: pd.Series,
    sigma: pd.Series,
    std_epsilon: float = 1e-8
) -> Union[pd.DataFrame, pd.Series]:
    """
    Applies z-score transformation using provided moments.
    Handles zero-variance features by setting standardized values to 0.

    Parameters
    ----------
    x : pd.DataFrame or pd.Series
        Data to standardize.
    mu : pd.Series
        Training means.
    sigma : pd.Series
        Training standard deviations.
    std_epsilon : float
        Threshold for zero variance.

    Returns
    -------
    Standardized data.
    """
    # Identify zero-variance features
    zero_var_mask = sigma <= std_epsilon

    # Avoid division by zero
    # Replace 0 sigma with 1 (so division does nothing), then mask result to 0
    safe_sigma = sigma.copy()
    safe_sigma[zero_var_mask] = 1.0

    # Standardize
    # (x - mu) / sigma
    # Pandas aligns on columns automatically
    z = (x - mu) / safe_sigma

    # Enforce zero-variance policy: set to 0
    if isinstance(z, pd.DataFrame):
        z.loc[:, zero_var_mask] = 0.0
    else:
        # Series case (single row)
        z[zero_var_mask] = 0.0

    return z

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def expanding_window_standardizer(
    x_panel: pd.DataFrame,
    train_end_idx: Any,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates expanding-window standardization.
    Computes moments on X[...:train_end_idx] and transforms both training and current data.

    Parameters
    ----------
    x_panel : pd.DataFrame
        Full feature panel indexed by year_month.
    train_end_idx : Any
        The label of the last month in the training set (inclusive).
        The 'current' month is assumed to be the one immediately following,
        or we can just return the scaler for external use.
        Here, we return the standardized training set and the scaler params.
    study_config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        - 'x_train_std': Standardized training DataFrame.
        - 'scaler_params': Dict with 'mu' and 'sigma'.
    """
    epsilon = study_config["feature_engineering"].get("std_epsilon", 1e-8)

    # Slice Training Data
    # Get integer location of train_end_idx to slice safely if index is not unique (though it should be)
    # Better: use boolean mask <= train_end_idx
    train_mask = x_panel.index <= train_end_idx
    x_train = x_panel.loc[train_mask]

    # Step 1: Compute Moments
    mu, sigma = _compute_training_moments(x_train, epsilon)

    # Step 2: Transform Training Data
    x_train_std = _apply_zscore(x_train, mu, sigma, epsilon)

    return {
        "x_train_std": x_train_std,
        "scaler_params": {"mu": mu, "sigma": sigma}
    }


In [None]:
# Task 14 — Tune hyperparameters via forward-chaining time-series cross-validation in the initial window

# ================================================================================
# Task 14: Tune hyperparameters via forward-chaining time-series cross-validation
# ================================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1: Define the initial training window and forward-chaining split structure.
# -------------------------------------------------------------------------------------------------------------------------------
def _generate_cv_splits(
    panel_index: pd.Index,
    initial_window_months: int,
    n_splits: int = 5,
    val_size: int = 12,
    step_size: int = 12
) -> List[Tuple[np.ndarray, np.ndarray]]:
    """
    Generates forward-chaining time-series cross-validation splits within the initial window.

    Parameters
    ----------
    panel_index : pd.Index
        Index of the full modeling panel (year_month).
    initial_window_months : int
        Length of the initial training window (T0).
    n_splits : int
        Number of CV splits.
    val_size : int
        Size of validation set in months.
    step_size : int
        Step size between splits.

    Returns
    -------
    List[Tuple[np.ndarray, np.ndarray]]
        List of (train_indices, val_indices) tuples.
    """
    # Get unique sorted months in the initial window
    # We assume panel_index is sorted
    unique_months = panel_index.unique()
    if len(unique_months) < initial_window_months:
        raise ValueError(f"Data length ({len(unique_months)}) < initial window ({initial_window_months})")

    initial_months = unique_months[:initial_window_months]
    n_months = len(initial_months)

    splits = []
    # We work backwards from the end of the initial window to define splits
    # Or forwards. Standard forward chaining:
    # Split 1: Train [0...k], Val [k+1...k+v]
    # Split 2: Train [0...k+s], Val [k+s+1...k+s+v]
    # ...
    # Last split validation should end at initial_window_months - 1

    # Let's define the end of the last validation set as n_months - 1
    # Start of last val set = n_months - val_size
    # End of last train set = n_months - val_size - 1

    # We iterate backwards to find split points
    for i in range(n_splits):
        val_end = n_months - 1 - (i * step_size)
        val_start = val_end - val_size + 1
        train_end = val_start - 1

        if train_end < 0:
            raise ValueError("Configuration results in empty training set for early splits.")

        # Get month labels
        train_months = initial_months[:train_end+1]
        val_months = initial_months[val_start:val_end+1]

        # Map back to panel indices (boolean masks or integer positions)
        # We use boolean masks for safety
        train_mask = panel_index.isin(train_months)
        val_mask = panel_index.isin(val_months)

        # Convert to integer indices for sklearn compatibility if needed,
        # but here we return masks or the subsetted data directly in the loop.
        # Let's return the actual month labels for slicing.
        splits.append((train_months, val_months))

    # Reverse to be chronological (Split 1 is earliest)
    return splits[::-1]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 2: Tune lambda for lasso-logit and ridge penalty for benchmark.
# -------------------------------------------------------------------------------------------------------------------------------
def _tune_model_hyperparameters(
    panel: pd.DataFrame,
    splits: List[Tuple[pd.Index, pd.Index]],
    features: List[str],
    target: str,
    penalty_type: str,
    param_grid: List[float],
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Tunes regularization parameter using time-series CV.

    Parameters
    ----------
    panel : pd.DataFrame
        Modeling panel.
    splits : List[Tuple[pd.Index, pd.Index]]
        CV splits (train_months, val_months).
    features : List[str]
        List of feature column names.
    target : str
        Target column name.
    penalty_type : str
        'l1' or 'l2'.
    param_grid : List[float]
        Grid of C values (inverse regularization).
    standardizer_func : callable
        Function to standardize data (Task 13).
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Best hyperparameters and CV results.
    """
    results = []

    for C in param_grid:
        fold_scores = []

        for train_months, val_months in splits:
            # Slice Data
            train_data = panel[panel.index.isin(train_months)]
            val_data = panel[panel.index.isin(val_months)]

            # Standardize (Fit on Train, Transform Train & Val)
            # We use the standardizer from Task 13
            # Note: Task 13 standardizer takes full panel and train_end_idx.
            # Here we can just pass the train slice directly if we adapt or reuse logic.
            # Let's reuse logic: compute moments on train, apply to both.

            # Extract X and Y
            X_train_raw = train_data[features]
            y_train = train_data[target]
            X_val_raw = val_data[features]
            y_val = val_data[target]

            # Standardize
            # We manually call the internal logic of Task 13 for efficiency/clarity here
            # or assume standardizer_func returns scaler params
            std_result = standardizer_func(panel[features], train_months[-1], study_config)
            scaler = std_result["scaler_params"]

            # Apply z-score (Task 13 logic)
            # We need to import _apply_zscore or reimplement simple (x-mu)/sigma
            # Reimplementing for self-containment using scaler params
            epsilon = study_config["feature_engineering"].get("std_epsilon", 1e-8)

            def apply_z(x, mu, sigma):
                z = (x - mu) / sigma.replace(0, 1) # Handle 0 sigma
                z.loc[:, sigma <= epsilon] = 0.0
                return z

            X_train = apply_z(X_train_raw, scaler["mu"], scaler["sigma"])
            X_val = apply_z(X_val_raw, scaler["mu"], scaler["sigma"])

            # Fit Model
            # Solver: liblinear for small datasets supports l1/l2
            model = LogisticRegression(
                penalty=penalty_type,
                C=C,
                solver='liblinear', # Deterministic, supports l1/l2
                random_state=study_config["reproducibility"]["random_seed"],
                fit_intercept=True
            )
            model.fit(X_train, y_train)

            # Predict
            probs = model.predict_proba(X_val)[:, 1]

            # Score (Log Loss)
            eps = study_config["evaluation_inference"].get("proba_clip_epsilon", 1e-6)
            probs_clipped = np.clip(probs, eps, 1 - eps)
            score = log_loss(y_val, probs_clipped)
            fold_scores.append(score)

        avg_score = np.mean(fold_scores)
        results.append({"C": C, "score": avg_score})

    # Select Best
    best_result = min(results, key=lambda x: x["score"])

    return {
        "best_C": best_result["C"],
        "best_score": best_result["score"],
        "grid_results": results
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def tune_hyperparameters(
    modeling_panel: pd.DataFrame,
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates hyperparameter tuning for MSPI (Lasso) and Benchmark (Ridge).

    Parameters
    ----------
    modeling_panel : pd.DataFrame
        Panel with features and targets.
    standardizer_func : callable
        Task 13 standardizer.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Frozen hyperparameters.
    """
    # Extract Parameters
    proto = study_config["learning_protocol"]
    tune_config = proto["hyperparameter_tuning"]

    # 1. Generate Splits
    splits = _generate_cv_splits(
        modeling_panel.index,
        proto["initial_training_window_months"],
        tune_config.get("cv_n_splits", 5),
        tune_config.get("cv_val_size_months", 12),
        tune_config.get("cv_step_size_months", 12)
    )

    # 2. Tune MSPI (Lasso)
    # Define Grid (C = 1/lambda)
    # Paper uses lambda, sklearn uses C. We tune C.
    # Grid: 10 values log-spaced
    c_grid = np.logspace(-4, 4, 10)

    mspi_feats = study_config["feature_engineering"]["final_feature_names_monthly"]
    mspi_res = _tune_model_hyperparameters(
        modeling_panel, splits, mspi_feats, "Y_next", "l1", c_grid, standardizer_func, study_config
    )

    # 3. Tune Benchmark (Ridge)
    bench_feats = ["R_mkt", "sigma_mkt"]
    bench_res = _tune_model_hyperparameters(
        modeling_panel, splits, bench_feats, "Y_next", "l2", c_grid, standardizer_func, study_config
    )

    frozen_params = {
        "mspi_lasso_C": mspi_res["best_C"],
        "benchmark_ridge_C": bench_res["best_C"],
        "tuning_audit": {
            "mspi": mspi_res,
            "benchmark": bench_res
        }
    }

    return frozen_params


In [None]:
# Task 15 — Fit and forecast MSPI under the expanding-window lasso-logit protocol

# ==============================================================================
# Task 15: Fit and forecast MSPI under the expanding-window lasso-logit protocol
# ==============================================================================

def _fit_predict_lasso_logit(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    X_test: pd.DataFrame,
    C: float,
    random_seed: int
) -> Tuple[float, float]:
    """
    Fits an L1-regularized logistic regression on training data and predicts
    probability for the test instance.

    Parameters
    ----------
    X_train : pd.DataFrame
        Standardized training features.
    y_train : pd.Series
        Training targets.
    X_test : pd.DataFrame
        Standardized test features (single row).
    C : float
        Inverse regularization strength.
    random_seed : int
        Seed for reproducibility.

    Returns
    -------
    Tuple[float, float]
        - Predicted probability (MSPI_t).
        - Raw score (linear predictor).
    """
    # Configure Lasso-Logit
    # Solver 'liblinear' is good for small datasets and supports L1
    model = LogisticRegression(
        penalty='l1',
        C=C,
        solver='liblinear',
        random_state=random_seed,
        fit_intercept=True
    )

    # Fit
    model.fit(X_train, y_train)

    # Predict Probability
    # predict_proba returns [prob_0, prob_1]
    prob = model.predict_proba(X_test)[0, 1]

    # Compute Raw Score (decision function)
    # decision_function returns raw score (z)
    score = model.decision_function(X_test)[0]

    return prob, score

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 3: Execute the expanding-window forecasting loop.
# -------------------------------------------------------------------------------------------------------------------------------
def _expanding_window_forecast_loop(
    panel: pd.DataFrame,
    features: List[str],
    target: str,
    frozen_C: float,
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Executes the real-time expanding window forecasting loop.

    Parameters
    ----------
    panel : pd.DataFrame
        Full modeling panel indexed by year_month.
    features : List[str]
        List of feature names.
    target : str
        Target column name (Y_next).
    frozen_C : float
        Frozen hyperparameter C.
    standardizer_func : callable
        Task 13 standardizer.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Forecasts indexed by forecast month t.
    """
    # Define OOS Period
    # We start forecasting after the initial training window
    initial_window = study_config["learning_protocol"]["initial_training_window_months"]
    oos_start_idx = initial_window

    # Get all unique months
    all_months = panel.index.unique().sort_values()

    # We need at least initial_window months of history to start
    if len(all_months) <= initial_window:
        raise ValueError("Panel length insufficient for initial training window.")

    forecast_months = all_months[oos_start_idx:]

    results = []

    for t in forecast_months:
        # Define Training Set
        # At month t, we observe X_t. We want to predict Y_{t+1}.
        # The latest available label is S_t (which is Y_t).
        # So we can train on all pairs (X_tau, Y_{tau+1}) where tau+1 <= t.
        # This means the label month must be <= t.
        # In our panel, the target column 'Y_next' at row tau holds Y_{tau+1}.
        # So we filter rows where the index (tau) satisfies tau < t?
        # No, let's be precise.
        # Row tau has X_tau and Y_{tau+1}.
        # We can use row tau if Y_{tau+1} is known at time t.
        # Y_{tau+1} corresponds to S_{tau+1}.
        # S_{tau+1} is known at end of month tau+1.
        # So we need tau+1 <= t.
        # Thus tau <= t-1.

        # Training rows: index < t
        train_mask = panel.index < t
        # We also need to ensure we have valid targets
        train_data = panel.loc[train_mask].dropna(subset=[target])

        # Current row: index == t
        current_data = panel.loc[[t]]

        if len(train_data) < initial_window:
            # Should not happen given loop start, but safety check
            continue

        # Standardize
        # Fit on training features only
        # We use the standardizer_func which expects full panel and train_end_idx
        # But here we have sliced data. Let's adapt.
        # We can pass the train_data directly if we modify standardizer or use internal logic.
        # Let's use the internal logic we implemented in Task 14 helper for clarity.
        # Or better, call standardizer_func with the full panel and the label of the last training month.
        last_train_month = train_data.index[-1]

        # Note: standardizer_func (Task 13) computes moments on X[...:train_end_idx]
        # Here train_end_idx is last_train_month.
        # This matches our training set definition.
        std_result = standardizer_func(panel[features], last_train_month, study_config)
        scaler = std_result["scaler_params"]

        # Apply Standardization
        # Reimplement apply_z logic locally or import
        epsilon = study_config["feature_engineering"].get("std_epsilon", 1e-8)
        def apply_z(x, mu, sigma):
            z = (x - mu) / sigma.replace(0, 1)
            z.loc[:, sigma <= epsilon] = 0.0
            return z

        # Z-score Feature Sets and Extract Target
        X_train = apply_z(train_data[features], scaler["mu"], scaler["sigma"])
        X_test = apply_z(current_data[features], scaler["mu"], scaler["sigma"])
        y_train = train_data[target]

        # Fit and Predict
        prob, score = _fit_predict_lasso_logit(
            X_train, y_train, X_test, frozen_C,
            study_config["reproducibility"]["random_seed"]
        )

        # Store Result
        # We store the realized target Y_{t+1} (which is in current_data['Y_next']) for evaluation
        realized_target = current_data[target].iloc[0]

        results.append({
            "forecast_month": t,
            "MSPI": prob,
            "raw_score": score,
            "Y_next": realized_target
        })

    return pd.DataFrame(results).set_index("forecast_month")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def forecast_mspi(
    modeling_panel: pd.DataFrame,
    frozen_hyperparams: Dict[str, Any],
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the generation of MSPI forecasts.

    Parameters
    ----------
    modeling_panel : pd.DataFrame
        Panel with features and targets.
    frozen_hyperparams : Dict[str, Any]
        Dictionary containing 'mspi_lasso_C'.
    standardizer_func : callable
        Task 13 standardizer.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Forecasts indexed by forecast month.
    """
    # Extract parameters
    features = study_config["feature_engineering"]["final_feature_names_monthly"]
    target = "Y_next"
    C = frozen_hyperparams["mspi_lasso_C"]

    # Make forecasts
    forecasts = _expanding_window_forecast_loop(
        modeling_panel, features, target, C, standardizer_func, study_config
    )

    return forecasts


In [None]:
# Task 16 — Fit and forecast the benchmark ridge-logit under the same protocol

# ==============================================================================
# Task 16: Fit and forecast the benchmark ridge-logit under the same protocol
# ==============================================================================

def _fit_predict_ridge_logit(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    X_test: pd.DataFrame,
    C: float,
    random_seed: int
) -> Tuple[float, float]:
    """
    Fits an L2-regularized logistic regression (Ridge) and predicts probability.

    Parameters
    ----------
    X_train : pd.DataFrame
        Standardized training features.
    y_train : pd.Series
        Training targets.
    X_test : pd.DataFrame
        Standardized test features.
    C : float
        Inverse regularization strength.
    random_seed : int
        Seed for reproducibility.

    Returns
    -------
    Tuple[float, float]
        - Predicted probability.
        - Raw score.
    """
    # Initialize Model
    model = LogisticRegression(
        penalty='l2',
        C=C,
        solver='liblinear', # Supports l2
        random_state=random_seed,
        fit_intercept=True
    )

    # Fit Model
    model.fit(X_train, y_train)
    prob = model.predict_proba(X_test)[0, 1]
    score = model.decision_function(X_test)[0]

    return prob, score

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2: Apply expanding-window standardization and ridge-logit estimation.
# -------------------------------------------------------------------------------------------------------------------------------
def _expanding_window_benchmark_loop(
    panel: pd.DataFrame,
    target: str,
    frozen_C: float,
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Executes the real-time expanding window forecasting loop for the benchmark.
    Features are fixed to ['R_mkt', 'sigma_mkt'].

    Parameters
    ----------
    panel : pd.DataFrame
        Full modeling panel.
    target : str
        Target column name.
    frozen_C : float
        Frozen hyperparameter C.
    standardizer_func : callable
        Task 13 standardizer.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Forecasts indexed by forecast month t.
    """
    # Extract Parameters
    features = ["R_mkt", "sigma_mkt"]
    initial_window = study_config["learning_protocol"]["initial_training_window_months"]
    oos_start_idx = initial_window

    # Sort and Slice Data Structures
    all_months = panel.index.unique().sort_values()
    forecast_months = all_months[oos_start_idx:]

    results = []

    for t in forecast_months:
        # Training rows: index < t (label available at t)
        train_mask = panel.index < t
        train_data = panel.loc[train_mask].dropna(subset=[target])
        current_data = panel.loc[[t]]

        if len(train_data) < initial_window:
            continue

        # Standardize
        last_train_month = train_data.index[-1]
        std_result = standardizer_func(panel[features], last_train_month, study_config)
        scaler = std_result["scaler_params"]

        epsilon = study_config["feature_engineering"].get("std_epsilon", 1e-8)
        def apply_z(x, mu, sigma):
            z = (x - mu) / sigma.replace(0, 1)
            z.loc[:, sigma <= epsilon] = 0.0
            return z

        X_train = apply_z(train_data[features], scaler["mu"], scaler["sigma"])
        X_test = apply_z(current_data[features], scaler["mu"], scaler["sigma"])
        y_train = train_data[target]

        # Fit and Predict
        prob, score = _fit_predict_ridge_logit(
            X_train, y_train, X_test, frozen_C,
            study_config["reproducibility"]["random_seed"]
        )

        realized_target = current_data[target].iloc[0]

        results.append({
            "forecast_month": t,
            "p_benchmark": prob,
            "raw_score_benchmark": score,
            "Y_next": realized_target
        })

    return pd.DataFrame(results).set_index("forecast_month")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def forecast_benchmark(
    modeling_panel: pd.DataFrame,
    frozen_hyperparams: Dict[str, Any],
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the generation of Benchmark forecasts.

    Parameters
    ----------
    modeling_panel : pd.DataFrame
        Panel with features and targets.
    frozen_hyperparams : Dict[str, Any]
        Dictionary containing 'benchmark_ridge_C'.
    standardizer_func : callable
        Task 13 standardizer.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Forecasts indexed by forecast month.
    """
    # Extract Parameters
    target = "Y_next"
    C = frozen_hyperparams["benchmark_ridge_C"]

    # Make Forecasts
    forecasts = _expanding_window_benchmark_loop(
        modeling_panel, target, C, standardizer_func, study_config
    )

    return forecasts


In [None]:
# Task 17 — Create end-to-end orchestrator callable

# ==============================================================================
# Task 17: Create end-to-end orchestrator callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 1/3: Define orchestrator-level anti–look-ahead unit tests.
# -------------------------------------------------------------------------------------------------------------------------------
def _run_anti_look_ahead_checks(
    mspi_forecasts: pd.DataFrame,
    labels: pd.DataFrame,
    audit_log: Dict[str, Any]
) -> None:
    """
    Performs rigorous post-execution safety checks to ensure no look-ahead bias occurred
    during the forecasting process. Verifies alignment between forecasts and realized labels.

    Parameters
    ----------
    mspi_forecasts : pd.DataFrame
        DataFrame of MSPI forecasts indexed by forecast month t.
        Must contain 'Y_next' (the realized target for that forecast).
    labels : pd.DataFrame
        DataFrame of ground-truth labels indexed by month t.
        Must contain 'Y_next' (S_{t+1}).
    audit_log : Dict[str, Any]
        Accumulated audit logs from the pipeline.

    Raises
    ------
    ValueError
        If target alignment checks fail, indicating potential data leakage or shifting errors.
    """
    # Check 1: Target Alignment
    # The 'Y_next' column in the forecast dataframe represents the realized target
    # that the model was trying to predict at time t. This must match the
    # 'Y_next' column in the master label dataframe for the same month t.

    # Find common months (intersection of indices)
    common_months = mspi_forecasts.index.intersection(labels.index)

    if len(common_months) == 0:
        # This might happen if OOS period is disjoint from label period (should not happen)
        raise ValueError("CRITICAL: No overlapping months between forecasts and labels for validation.")

    forecast_targets = mspi_forecasts.loc[common_months, "Y_next"]
    label_targets = labels.loc[common_months, "Y_next"]

    # Check for equality (handling potential floating point types for binary labels)
    # We assume labels are 0/1.
    if not forecast_targets.equals(label_targets):
        # Detailed check for mismatches
        mismatches = (forecast_targets != label_targets)
        if mismatches.any():
            n_mismatch = mismatches.sum()
            example_mismatch = common_months[mismatches][0]
            raise ValueError(
                f"CRITICAL: Target misalignment detected. {n_mismatch} months have mismatched targets. "
                f"Example at {example_mismatch}: Forecast Y_next={forecast_targets[example_mismatch]}, "
                f"Label Y_next={label_targets[example_mismatch]}."
            )

    # Check 2: Forecast Availability
    # Ensure we didn't produce forecasts for months where inputs were missing
    # (This is implicitly handled by the loop, but good to verify coverage)
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def run_mspi_pipeline(
    df_crsp_daily: pd.DataFrame,
    df_crsp_index: pd.DataFrame,
    raw_study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Executes the complete Market Stress Probability Index (MSPI) research pipeline.

    This orchestrator enforces the strict sequential dependency graph required to
    reproduce the results of 'Algorithmic Monitoring' (Schmitt, 2026). It manages
    data flow from raw CRSP inputs through feature engineering, labeling,
    hyperparameter tuning, and out-of-sample forecasting.

    Parameters
    ----------
    df_crsp_daily : pd.DataFrame
        Raw CRSP Daily Stock File (micro-data).
    df_crsp_index : pd.DataFrame
        Raw CRSP Daily Index File (market aggregate data).
    raw_study_config : Dict[str, Any]
        Configuration dictionary specifying all study parameters.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing the following keys:
        - 'monthly_features': pd.DataFrame of X_t features.
        - 'market_aggregates': pd.DataFrame of R_mkt and sigma_mkt.
        - 'labels': pd.DataFrame of S_t and Y_next.
        - 'mspi_forecasts': pd.DataFrame of MSPI out-of-sample forecasts.
        - 'benchmark_forecasts': pd.DataFrame of Benchmark out-of-sample forecasts.
        - 'frozen_hyperparams': Dict of tuned hyperparameters.
        - 'audit_log': Dict of detailed execution logs and statistics.
    """
    # Initialize Audit Log
    audit_log = {}

    # -------------------------------------------------------------------------
    # Phase 1: Configuration and Validation
    # -------------------------------------------------------------------------
    print("Phase 1: Configuration and Validation")

    # Task 1: Validate Config
    config_res = validate_study_config(raw_study_config)
    study_config = config_res["validated_config"]
    audit_log["config_validation"] = config_res["validation_report"]

    # Task 2: Validate Micro Data
    val_daily = validate_crsp_daily(df_crsp_daily, study_config)
    audit_log["validation_daily"] = val_daily

    # Task 3: Validate Index Data
    val_index = validate_crsp_index(df_crsp_daily, df_crsp_index, study_config)
    audit_log["validation_index"] = val_index

    # -------------------------------------------------------------------------
    # Phase 2: Data Cleansing and Universe Construction
    # -------------------------------------------------------------------------
    print("Phase 2: Data Cleansing and Universe Construction")

    # Task 4: Cleanse Micro Data
    clean_daily_res = cleanse_crsp_daily(df_crsp_daily, study_config)
    df_daily_clean = clean_daily_res["df_clean"]
    audit_log["cleansing_daily"] = clean_daily_res["audit_log"]

    # Task 5: Cleanse Index Data
    clean_index_res = cleanse_crsp_index(df_daily_clean, df_crsp_index, study_config)
    df_index_clean = clean_index_res["df_index_clean"]
    audit_log["cleansing_index"] = clean_index_res["audit_log"]

    # Task 6: Construct Eligible Universe
    universe_res = construct_eligible_universe(df_daily_clean, study_config)
    df_eligible = universe_res["df_eligible"]
    nd_series = universe_res["nd_series"]
    audit_log["universe_construction"] = universe_res["audit_log"]

    # -------------------------------------------------------------------------
    # Phase 3: Feature Engineering (Daily Signals)
    # -------------------------------------------------------------------------
    print("Phase 3: Feature Engineering (Daily Signals)")

    # Task 7: Return Moments
    moments = compute_daily_return_moments(df_eligible, study_config)

    # Task 8: Tail Measures
    tails = compute_daily_tail_measures(df_eligible, study_config)

    # Task 9: Trading Proxies
    proxies = compute_daily_trading_proxies(df_eligible, study_config)

    # Merge Daily Statistics
    # We align on DATE index. All should have same index from df_eligible.
    daily_stats_all = pd.concat([moments, tails, proxies], axis=1)

    # -------------------------------------------------------------------------
    # Phase 4: Monthly Aggregation and Labeling
    # -------------------------------------------------------------------------
    print("Phase 4: Monthly Aggregation and Labeling")

    # Task 10: Aggregate Monthly Features
    agg_res = aggregate_monthly_features(daily_stats_all, nd_series, study_config)
    X_t = agg_res["X_t"]
    audit_log["monthly_aggregation"] = agg_res["audit_log"]

    # Task 11: Construct Market Aggregates
    market_res = construct_market_aggregates(df_index_clean, X_t, study_config)
    aligned_panel = market_res["aligned_panel"]
    audit_log["market_aggregates"] = market_res["audit_log"]

    # Task 12: Construct Stress Labels
    label_res = construct_stress_labels(aligned_panel, study_config)
    modeling_panel = label_res["modeling_panel"]
    audit_log["label_construction"] = label_res["audit_log"]

    # -------------------------------------------------------------------------
    # Phase 5: Model Training and Forecasting
    # -------------------------------------------------------------------------
    print("Phase 5: Model Training and Forecasting")

    # Task 14: Tune Hyperparameters
    # We pass the expanding_window_standardizer callable explicitly
    tune_res = tune_hyperparameters(
        modeling_panel,
        expanding_window_standardizer,
        study_config
    )
    frozen_params = tune_res
    audit_log["hyperparameter_tuning"] = tune_res["tuning_audit"]

    # Task 15: Forecast MSPI
    mspi_forecasts = forecast_mspi(
        modeling_panel,
        frozen_params,
        expanding_window_standardizer,
        study_config
    )

    # Task 16: Forecast Benchmark
    bench_forecasts = forecast_benchmark(
        modeling_panel,
        frozen_params,
        expanding_window_standardizer,
        study_config
    )

    # -------------------------------------------------------------------------
    # Phase 6: Finalization
    # -------------------------------------------------------------------------
    print("Phase 6: Finalization and Safety Checks")

    # Run Anti-Look-Ahead Checks
    _run_anti_look_ahead_checks(mspi_forecasts, modeling_panel, audit_log)

    # Compile Final Results
    results = {
        "monthly_features": X_t,
        "market_aggregates": aligned_panel[["R_mkt", "sigma_mkt"]],
        "labels": modeling_panel[["S_t", "Y_next"]],
        "mspi_forecasts": mspi_forecasts,
        "benchmark_forecasts": bench_forecasts,
        "frozen_hyperparams": frozen_params,
        "audit_log": audit_log
    }

    return results


In [None]:
# Task 18 — Fit RF and GB with real-time Platt calibration (robustness horse race)

# ==============================================================================
# Task 18: Fit RF and GB with real-time Platt calibration (robustness horse race)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 1: Random forest: fit, score, and calibrate in real time.
# -------------------------------------------------------------------------------------------------------------------------------
def _fit_predict_calibrate_model(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    X_test: pd.DataFrame,
    model_type: str,
    hyperparams: Dict[str, Any],
    random_seed: int
) -> Tuple[float, float]:
    """
    Fits a nonlinear model (RF or GB), computes raw scores, and applies real-time
    Platt calibration using the training set.

    Parameters
    ----------
    X_train : pd.DataFrame
        Standardized training features.
    y_train : pd.Series
        Training targets.
    X_test : pd.DataFrame
        Standardized test features.
    model_type : str
        'rf' or 'gb'.
    hyperparams : Dict[str, Any]
        Model hyperparameters.
    random_seed : int
        Seed for reproducibility.

    Returns
    -------
    Tuple[float, float]
        - Calibrated probability.
        - Raw score (uncalibrated).
    """
    # 1. Initialize Model
    # Initialize Random Forest Model
    if model_type == 'rf':
        model = RandomForestClassifier(
            random_state=random_seed,
            n_jobs=1, # Determinism
            **hyperparams
        )

    # Initialize Gradient Boosting Classifier
    elif model_type == 'gb':
        model = GradientBoostingClassifier(
            random_state=random_seed,
            **hyperparams
        )
    else:
        raise ValueError(f"Unknown model type: {model_type}")

    # 2. Fit Model
    model.fit(X_train, y_train)

    # 3. Get Raw Scores
    # For RF, raw score is mean predicted probability (fraction of trees)
    # For GB, raw score is decision function (log-odds)
    if model_type == 'rf':
        # RF "raw score" is the uncalibrated probability
        # We need scores for training set to fit calibration
        train_scores = model.predict_proba(X_train)[:, 1]
        test_score = model.predict_proba(X_test)[0, 1]

        # Platt scaling expects log-odds or similar, but can work on probs.
        # However, standard Platt is logistic regression on scores.
        # If scores are probs [0,1], we might need to logit transform them first
        # or just fit logistic regression on them directly (which learns a sigmoid on top of sigmoid-like).
        # The task says "random forest score is the ensemble average... apply calibration map".
        # We use the raw ensemble average as the feature for calibration.

    elif model_type == 'gb':
        # GB raw score is decision function
        train_scores = model.decision_function(X_train)
        test_score = model.decision_function(X_test)[0]

    # 4. Fit Calibration (Platt Scaling)
    # Logistic Regression of y_train on train_scores
    # We reshape scores to (n_samples, 1)
    calibrator = LogisticRegression(solver='liblinear', penalty='none')

    # Note: penalty='none' might fail if separation exists, use l2 with small C if needed.
    # Let's use default l2 with C=1.0 for stability as per standard Platt implementations.
    calibrator = LogisticRegression(solver='liblinear', C=1.0)

    calibrator.fit(train_scores.reshape(-1, 1), y_train)

    # 5. Apply Calibration
    calibrated_prob = calibrator.predict_proba(np.array([[test_score]]))[0, 1]

    return calibrated_prob, test_score

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 3: Store all raw scores and calibrated probabilities for evaluation.
# -------------------------------------------------------------------------------------------------------------------------------
def _expanding_window_nonlinear_loop(
    panel: pd.DataFrame,
    features: List[str],
    target: str,
    model_type: str,
    hyperparams: Dict[str, Any],
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Executes the real-time expanding window loop for nonlinear models.

    Parameters
    ----------
    panel : pd.DataFrame
        Full modeling panel.
    features : List[str]
        Feature names.
    target : str
        Target column name.
    model_type : str
        'rf' or 'gb'.
    hyperparams : Dict[str, Any]
        Model hyperparameters.
    standardizer_func : callable
        Task 13 standardizer.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Forecasts indexed by forecast month t.
    """
    initial_window = study_config["learning_protocol"]["initial_training_window_months"]
    oos_start_idx = initial_window

    all_months = panel.index.unique().sort_values()
    forecast_months = all_months[oos_start_idx:]

    results = []

    for t in forecast_months:
        # Training rows: index < t
        train_mask = panel.index < t
        train_data = panel.loc[train_mask].dropna(subset=[target])
        current_data = panel.loc[[t]]

        if len(train_data) < initial_window:
            continue

        # Standardize (Optional for trees, but mandated by task text "using standardized X")
        last_train_month = train_data.index[-1]
        std_result = standardizer_func(panel[features], last_train_month, study_config)
        scaler = std_result["scaler_params"]

        epsilon = study_config["feature_engineering"].get("std_epsilon", 1e-8)
        def apply_z(x, mu, sigma):
            z = (x - mu) / sigma.replace(0, 1)
            z.loc[:, sigma <= epsilon] = 0.0
            return z

        X_train = apply_z(train_data[features], scaler["mu"], scaler["sigma"])
        X_test = apply_z(current_data[features], scaler["mu"], scaler["sigma"])
        y_train = train_data[target]

        # Fit, Predict, Calibrate
        prob, score = _fit_predict_calibrate_model(
            X_train, y_train, X_test, model_type, hyperparams,
            study_config["reproducibility"]["random_seed"]
        )

        realized_target = current_data[target].iloc[0]

        results.append({
            "forecast_month": t,
            f"p_{model_type}": prob,
            f"raw_score_{model_type}": score,
            "Y_next": realized_target
        })

    return pd.DataFrame(results).set_index("forecast_month")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def run_robustness_horse_race(
    modeling_panel: pd.DataFrame,
    mspi_forecasts: pd.DataFrame,
    benchmark_forecasts: pd.DataFrame,
    frozen_hyperparams: Dict[str, Any],
    standardizer_func: callable,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the nonlinear model horse race (RF and GB) and unifies results.

    Parameters
    ----------
    modeling_panel : pd.DataFrame
        Panel with features and targets.
    mspi_forecasts : pd.DataFrame
        MSPI forecasts.
    benchmark_forecasts : pd.DataFrame
        Benchmark forecasts.
    frozen_hyperparams : Dict[str, Any]
        Dictionary containing RF/GB params (if tuned, else defaults).
    standardizer_func : callable
        Task 13 standardizer.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Unified evaluation table with all models.
    """
    features = study_config["feature_engineering"]["final_feature_names_monthly"]
    target = "Y_next"

    # Get Hyperparams (or defaults if not tuned in Task 14)
    # In a full implementation, these should be tuned. Here we use defaults or config.
    rf_params = frozen_hyperparams.get("rf_params", {"n_estimators": 100, "max_depth": 5})
    gb_params = frozen_hyperparams.get("gb_params", {"n_estimators": 100, "learning_rate": 0.1, "max_depth": 3})

    # Forecast RF
    rf_forecasts = _expanding_window_nonlinear_loop(
        modeling_panel, features, target, "rf", rf_params, standardizer_func, study_config
    )

    # Forecast GB
    gb_forecasts = _expanding_window_nonlinear_loop(
        modeling_panel, features, target, "gb", gb_params, standardizer_func, study_config
    )

    # Unify Results
    # Join on forecast_month
    # We start with MSPI as base
    unified = mspi_forecasts.rename(columns={"MSPI": "p_mspi", "raw_score": "raw_score_mspi"})

    # Join Benchmark
    unified = unified.join(
        benchmark_forecasts[["p_benchmark", "raw_score_benchmark"]],
        how="inner"
    )

    # Join RF
    unified = unified.join(
        rf_forecasts[["p_rf", "raw_score_rf"]],
        how="inner"
    )

    # Join GB
    unified = unified.join(
        gb_forecasts[["p_gb", "raw_score_gb"]],
        how="inner"
    )

    return unified


In [None]:
# Task 19 — Compute out-of-sample discrimination metrics (raw scores)

# ==============================================================================
# Task 19: Compute out-of-sample discrimination metrics (raw scores)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 1 and 2: Compute ROC-AUC for each model / Compute PR-AUC for each model.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_auc_metrics(
    df: pd.DataFrame,
    target_col: str,
    score_cols: Dict[str, str]
) -> pd.DataFrame:
    """
    Computes ROC-AUC and PR-AUC for multiple models on a common evaluation set.

    Parameters
    ----------
    df : pd.DataFrame
        Evaluation DataFrame containing targets and raw scores.
    target_col : str
        Name of the target column (e.g., 'Y_next').
    score_cols : Dict[str, str]
        Dictionary mapping Model Name -> Raw Score Column Name.

    Returns
    -------
    pd.DataFrame
        DataFrame indexed by Model Name with columns 'AUC', 'PR-AUC'.
    """
    # Ensure no missing values in target or scores for fair comparison
    # We use the intersection of valid data
    cols_to_check = [target_col] + list(score_cols.values())
    df_clean = df.dropna(subset=cols_to_check)

    y_true = df_clean[target_col]

    # Check for degenerate labels
    if y_true.nunique() < 2:
        # Cannot compute AUC if only one class is present
        # Return NaNs or raise warning
        # We return NaNs to allow pipeline to proceed (e.g. if OOS is very short)
        results = []
        for model_name in score_cols.keys():
            results.append({
                "Model": model_name,
                "AUC": np.nan,
                "PR-AUC": np.nan
            })
        return pd.DataFrame(results).set_index("Model")

    results = []

    # Iterate through models and scores
    for model_name, score_col in score_cols.items():
        y_score = df_clean[score_col]

        # Compute ROC AUC and Precision Scores
        auc = roc_auc_score(y_true, y_score)
        pr_auc = average_precision_score(y_true, y_score)

        results.append({
            "Model": model_name,
            "AUC": auc,
            "PR-AUC": pr_auc
        })

    return pd.DataFrame(results).set_index("Model")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 3: Store discrimination results in a comparison table.
# -------------------------------------------------------------------------------------------------------------------------------
def _add_metadata_to_metrics(
    metrics_df: pd.DataFrame,
    df_eval: pd.DataFrame,
    target_col: str
) -> pd.DataFrame:
    """
    Adds sample size and event rate metadata to the metrics table.

    Parameters
    ----------
    metrics_df : pd.DataFrame
        DataFrame with AUC/PR-AUC.
    df_eval : pd.DataFrame
        Evaluation DataFrame.
    target_col : str
        Target column name.

    Returns
    -------
    pd.DataFrame
        Metrics DataFrame with added metadata columns.
    """
    # Compute metadata on the same cleaned set used for metrics
    # (Implicitly assumed df_eval is the one passed to _compute_auc_metrics or we re-clean)
    # For robustness, we re-clean based on target (scores assumed valid if passed)
    df_clean = df_eval.dropna(subset=[target_col])

    # Compute Samples and Event Rate
    n_samples = len(df_clean)
    event_rate = df_clean[target_col].mean()

    # Store Variables
    metrics_df["N"] = n_samples
    metrics_df["Event_Rate"] = event_rate

    return metrics_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_discrimination_metrics(
    unified_evaluation_panel: pd.DataFrame
) -> pd.DataFrame:
    """
    Orchestrates the computation of discrimination metrics (AUC, PR-AUC).

    Parameters
    ----------
    unified_evaluation_panel : pd.DataFrame
        DataFrame containing targets and raw scores for all models.
        Expected columns: 'Y_next', 'raw_score_mspi', 'raw_score_benchmark',
        'raw_score_rf', 'raw_score_gb'.

    Returns
    -------
    pd.DataFrame
        Comparison table of discrimination metrics.
    """
    # Initialize name of target
    target_col = "Y_next"

    # Define mapping of Model Name -> Raw Score Column
    # We check which columns exist to support partial runs (e.g. if RF/GB skipped)
    score_map = {
        "MSPI": "raw_score_mspi",
        "Benchmark": "raw_score_benchmark"
    }

    if "raw_score_rf" in unified_evaluation_panel.columns:
        score_map["Random Forest"] = "raw_score_rf"
    if "raw_score_gb" in unified_evaluation_panel.columns:
        score_map["Gradient Boosting"] = "raw_score_gb"

    # Compute Metrics
    metrics_df = _compute_auc_metrics(unified_evaluation_panel, target_col, score_map)

    # Add Metadata
    final_df = _add_metadata_to_metrics(metrics_df, unified_evaluation_panel, target_col)

    return final_df


In [None]:
# Task 20 — Compute probability accuracy metrics and calibration diagnostics

# ==============================================================================
# Task 20: Compute probability accuracy metrics and calibration diagnostics
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 1: Compute Brier score and log loss for each model on probability forecasts.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_prob_scores(
    df: pd.DataFrame,
    target_col: str,
    prob_cols: Dict[str, str],
    epsilon: float = 1e-6
) -> pd.DataFrame:
    """
    Computes Brier Score, Log Loss, and Mean Predicted Probability.

    Parameters
    ----------
    df : pd.DataFrame
        Evaluation DataFrame.
    target_col : str
        Target column name.
    prob_cols : Dict[str, str]
        Mapping Model Name -> Probability Column.
    epsilon : float
        Clipping epsilon for log loss.

    Returns
    -------
    pd.DataFrame
        Metrics DataFrame indexed by Model Name.
    """
    cols_to_check = [target_col] + list(prob_cols.values())
    df_clean = df.dropna(subset=cols_to_check)
    y_true = df_clean[target_col]

    results = []
    for model_name, prob_col in prob_cols.items():
        y_prob = df_clean[prob_col]

        # Brier Score
        brier = brier_score_loss(y_true, y_prob)

        # Log Loss (with clipping)
        y_prob_clipped = np.clip(y_prob, epsilon, 1 - epsilon)
        ll = log_loss(y_true, y_prob_clipped)

        # Mean Prob
        mean_prob = y_prob.mean()

        results.append({
            "Model": model_name,
            "Brier": brier,
            "LogLoss": ll,
            "Mean_Predicted_Prob": mean_prob
        })

    return pd.DataFrame(results).set_index("Model")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 2: Compute calibration curve and Expected Calibration Error (ECE).
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_ece(
    df: pd.DataFrame,
    target_col: str,
    prob_cols: Dict[str, str],
    n_bins: int = 10
) -> pd.DataFrame:
    """
    Computes Expected Calibration Error (ECE) using equal-mass quantile binning.

    Parameters
    ----------
    df : pd.DataFrame
        Evaluation DataFrame.
    target_col : str
        Target column name.
    prob_cols : Dict[str, str]
        Mapping Model Name -> Probability Column.
    n_bins : int
        Number of bins.

    Returns
    -------
    pd.DataFrame
        DataFrame with ECE per model.
    """
    cols_to_check = [target_col] + list(prob_cols.values())
    df_clean = df.dropna(subset=cols_to_check)
    y_true = df_clean[target_col]

    results = []
    for model_name, prob_col in prob_cols.items():
        y_prob = df_clean[prob_col]

        # Quantile Binning
        # duplicates='drop' handles ties by reducing number of bins
        try:
            bins = pd.qcut(y_prob, q=n_bins, duplicates='drop')
        except ValueError:
            # Fallback if too few unique values
            bins = pd.cut(y_prob, bins=n_bins)

        # Compute bin stats
        bin_df = pd.DataFrame({"y_true": y_true, "y_prob": y_prob, "bin": bins})
        bin_stats = bin_df.groupby("bin", observed=True).agg(
            mean_pred=("y_prob", "mean"),
            mean_obs=("y_true", "mean"),
            count=("y_true", "count")
        )

        # ECE = sum(weight * |mean_pred - mean_obs|)
        total_count = len(df_clean)
        bin_stats["weight"] = bin_stats["count"] / total_count
        bin_stats["abs_err"] = (bin_stats["mean_pred"] - bin_stats["mean_obs"]).abs()
        ece = (bin_stats["weight"] * bin_stats["abs_err"]).sum()

        results.append({
            "Model": model_name,
            "ECE": ece
        })

    return pd.DataFrame(results).set_index("Model")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_probability_metrics(
    unified_evaluation_panel: pd.DataFrame,
    discrimination_metrics: pd.DataFrame,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the computation of probability accuracy metrics and combines them
    with discrimination metrics.

    Parameters
    ----------
    unified_evaluation_panel : pd.DataFrame
        DataFrame with targets and probabilities.
    discrimination_metrics : pd.DataFrame
        DataFrame from Task 19.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Complete evaluation table.
    """
    target_col = "Y_next"
    epsilon = study_config["evaluation_inference"].get("proba_clip_epsilon", 1e-6)
    n_bins = study_config["evaluation_inference"]["calibration"].get("ece_bins", 10)

    # Define mapping
    prob_map = {
        "MSPI": "p_mspi",
        "Benchmark": "p_benchmark"
    }
    if "p_rf" in unified_evaluation_panel.columns:
        prob_map["Random Forest"] = "p_rf"
    if "p_gb" in unified_evaluation_panel.columns:
        prob_map["Gradient Boosting"] = "p_gb"

    # Compute Prob Scores
    prob_scores = _compute_prob_scores(unified_evaluation_panel, target_col, prob_map, epsilon)

    # Compute ECE
    ece_scores = _compute_ece(unified_evaluation_panel, target_col, prob_map, n_bins)

    # Merge All
    # discrimination_metrics is base
    final_table = discrimination_metrics.join(prob_scores, how="outer")
    final_table = final_table.join(ece_scores, how="outer")

    return final_table


In [None]:
# Task 21 — Block-bootstrap inference for out-of-sample performance differences

# ==============================================================================
# Task 21: Block-bootstrap inference for out-of-sample performance differences
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 1: Configure the bootstrap per the paper's specification.
# -------------------------------------------------------------------------------------------------------------------------------
def _generate_block_bootstrap_indices(
    n_samples: int,
    block_length: int,
    n_replications: int,
    random_seed: int
) -> List[np.ndarray]:
    """
    Generates indices for moving block bootstrap.

    Parameters
    ----------
    n_samples : int
        Total number of time steps in OOS.
    block_length : int
        Length of each block (L).
    n_replications : int
        Number of bootstrap samples (B).
    random_seed : int
        Seed.

    Returns
    -------
    List[np.ndarray]
        List of index arrays, each of length n_samples.
    """
    rng = np.random.default_rng(random_seed)
    indices = []

    # Number of possible blocks
    n_blocks = n_samples - block_length + 1

    if n_blocks <= 0:
        raise ValueError(f"Block length {block_length} > sample size {n_samples}")

    # Iterate through replications
    for _ in range(n_replications):
        sample_indices = []
        while len(sample_indices) < n_samples:
            # Pick random start index
            start_idx = rng.integers(0, n_blocks)
            # Add block
            block = np.arange(start_idx, start_idx + block_length)
            sample_indices.extend(block)

        # Truncate to exact length
        indices.append(np.array(sample_indices[:n_samples]))

    return indices

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 2: Compute performance differences within each bootstrap resample.
# -------------------------------------------------------------------------------------------------------------------------------
def _compute_bootstrap_deltas(
    df: pd.DataFrame,
    indices_list: List[np.ndarray],
    target_col: str,
    score_cols: Dict[str, str],
    prob_cols: Dict[str, str],
    epsilon: float
) -> Dict[str, List[float]]:
    """
    Computes the difference in metrics (MSPI - Benchmark) for each bootstrap sample.

    Parameters
    ----------
    df : pd.DataFrame
        Evaluation DataFrame.
    indices_list : List[np.ndarray]
        Bootstrap indices.
    target_col : str
        Target column.
    score_cols : Dict[str, str]
        Raw score columns for AUC/PR-AUC.
    prob_cols : Dict[str, str]
        Prob columns for Brier/LogLoss.
    epsilon : float
        Log loss clipping.

    Returns
    -------
    Dict[str, List[float]]
        Dictionary of delta lists per metric.
    """
    deltas = {
        "AUC": [], "PR-AUC": [], "Brier": [], "LogLoss": []
    }

    # Extract arrays for speed
    y_true = df[target_col].values

    # MSPI
    s_mspi = df[score_cols["MSPI"]].values
    p_mspi = df[prob_cols["MSPI"]].values

    # Benchmark
    s_bench = df[score_cols["Benchmark"]].values
    p_bench = df[prob_cols["Benchmark"]].values

    for idx in indices_list:
        y_boot = y_true[idx]

        # Check for degeneracy
        if len(np.unique(y_boot)) < 2:
            continue # Skip degenerate samples

        # MSPI Metrics
        auc_m = roc_auc_score(y_boot, s_mspi[idx])
        pr_m = average_precision_score(y_boot, s_mspi[idx])
        br_m = brier_score_loss(y_boot, p_mspi[idx])
        ll_m = log_loss(y_boot, np.clip(p_mspi[idx], epsilon, 1-epsilon))

        # Bench Metrics
        auc_b = roc_auc_score(y_boot, s_bench[idx])
        pr_b = average_precision_score(y_boot, s_bench[idx])
        br_b = brier_score_loss(y_boot, p_bench[idx])
        ll_b = log_loss(y_boot, np.clip(p_bench[idx], epsilon, 1-epsilon))

        # Deltas (MSPI - Bench)
        deltas["AUC"].append(auc_m - auc_b)
        deltas["PR-AUC"].append(pr_m - pr_b)
        deltas["Brier"].append(br_m - br_b)
        deltas["LogLoss"].append(ll_m - ll_b)

    return deltas

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 3: Report bootstrap summaries (mean delta and confidence intervals).
# -------------------------------------------------------------------------------------------------------------------------------
def _summarize_bootstrap(
    deltas: Dict[str, List[float]]
) -> pd.DataFrame:
    """
    Computes mean and 95% CI for bootstrap deltas.

    Parameters
    ----------
    deltas : Dict[str, List[float]]
        Bootstrap deltas.

    Returns
    -------
    pd.DataFrame
        Inference table.
    """
    results = []

    # Iterate through metrics
    for metric, values in deltas.items():
        arr = np.array(values)
        n_valid = len(arr)

        if n_valid == 0:
            results.append({
                "Metric": metric,
                "Delta_Mean": np.nan,
                "CI_2.5": np.nan,
                "CI_97.5": np.nan,
                "Valid_Replications": 0
            })
            continue

        # Compute mean, low, high
        mean_delta = np.mean(arr)
        ci_low = np.percentile(arr, 2.5)
        ci_high = np.percentile(arr, 97.5)

        # Interpretation
        # AUC/PR-AUC: Positive is good
        # Brier/LogLoss: Negative is good
        if metric in ["AUC", "PR-AUC"]:
            sig = (ci_low > 0)
        else:
            sig = (ci_high < 0)

        results.append({
            "Metric": metric,
            "Delta_Mean": mean_delta,
            "CI_2.5": ci_low,
            "CI_97.5": ci_high,
            "Significant_95": sig,
            "Valid_Replications": n_valid
        })

    return pd.DataFrame(results).set_index("Metric")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def perform_bootstrap_inference(
    unified_evaluation_panel: pd.DataFrame,
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates block-bootstrap inference for MSPI vs Benchmark.

    Parameters
    ----------
    unified_evaluation_panel : pd.DataFrame
        Evaluation data.
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Inference table.
    """
    # Get Bootstrap Configuration
    boot_config = study_config["evaluation_inference"]["block_bootstrap"]
    if not boot_config.get("enabled", True):
        return pd.DataFrame()

    # Extract key parameters
    L = boot_config.get("block_length_months", 12)
    B = boot_config.get("replications", 2000)
    seed = study_config["reproducibility"]["random_seed"]
    epsilon = study_config["evaluation_inference"].get("proba_clip_epsilon", 1e-6)

    # Clean data (intersection)
    target_col = "Y_next"
    score_cols = {"MSPI": "raw_score_mspi", "Benchmark": "raw_score_benchmark"}
    prob_cols = {"MSPI": "p_mspi", "Benchmark": "p_benchmark"}

    cols = [target_col] + list(score_cols.values()) + list(prob_cols.values())
    df_clean = unified_evaluation_panel.dropna(subset=cols)

    # 1. Generate Indices
    indices = _generate_block_bootstrap_indices(len(df_clean), L, B, seed)

    # 2. Compute Deltas
    deltas = _compute_bootstrap_deltas(
        df_clean, indices, target_col, score_cols, prob_cols, epsilon
    )

    # 3. Summarize
    inference_table = _summarize_bootstrap(deltas)

    return inference_table


In [None]:
# Task 22 — Economic analysis: innovations and local projections

# ==============================================================================
# Task 22: Economic analysis: innovations and local projections
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 1: Estimate predictive regression for next-month realized volatility (paper's Eq. (4)).
# -------------------------------------------------------------------------------------------------------------------------------
def _estimate_volatility_prediction(
    mspi_series: pd.Series,
    market_df: pd.DataFrame,
    target_vol: pd.Series
) -> Dict[str, Any]:
    """
    Estimates the predictive regression of next-month realized volatility on MSPI.
    Equation: sigma_{t+1} = alpha + gamma * MSPI_t + phi' * Z_t + eta_{t+1}

    Parameters
    ----------
    mspi_series : pd.Series
        MSPI forecasts indexed by t.
    market_df : pd.DataFrame
        Controls Z_t (R_mkt, sigma_mkt) indexed by t.
    target_vol : pd.Series
        Realized volatility sigma_mkt indexed by t (will be shifted to t+1).

    Returns
    -------
    Dict[str, Any]
        Regression statistics (gamma, t-stat, R2).
    """
    # Align Data
    # LHS: sigma_{t+1} (shift target_vol backwards by 1 to align t+1 with t)
    # Note: target_vol index is time of realization.
    # If we shift(-1), the value at index t becomes the value from t+1.
    lhs = target_vol.shift(-1).rename("sigma_next")

    # RHS: MSPI_t, R_mkt_t, sigma_mkt_t
    rhs = pd.concat([mspi_series, market_df], axis=1)
    rhs = sm.add_constant(rhs)

    # Combine and Drop NaNs
    data = pd.concat([lhs, rhs], axis=1).dropna()

    # Fit OLS with HAC (Newey-West, lag=1)
    model = sm.OLS(data["sigma_next"], data.drop(columns=["sigma_next"]))
    results = model.fit(cov_type='HAC', cov_kwds={'maxlags': 1})

    # Extract MSPI coefficient (gamma)
    mspi_col = mspi_series.name
    gamma = results.params[mspi_col]
    t_stat = results.tvalues[mspi_col]
    p_val = results.pvalues[mspi_col]

    return {
        "gamma": gamma,
        "t_stat": t_stat,
        "p_value": p_val,
        "R2": results.rsquared,
        "N": int(results.nobs)
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 2: Construct stress-risk innovations.
# -------------------------------------------------------------------------------------------------------------------------------
def _construct_innovations(
    mspi_series: pd.Series,
    market_df: pd.DataFrame
) -> pd.Series:
    """
    Constructs stress-risk innovations u_t by orthogonalizing MSPI wrt lagged information.
    Equation: MSPI_t = delta_0 + delta_1 * MSPI_{t-1} + Delta' * Z_{t-1} + u_t

    Parameters
    ----------
    mspi_series : pd.Series
        MSPI forecasts indexed by t.
    market_df : pd.DataFrame
        Market variables indexed by t.

    Returns
    -------
    pd.Series
        Innovation series u_t indexed by t.
    """
    # Lagged Regressors
    # Shift forward by 1 to get t-1 values at index t
    lagged_mspi = mspi_series.shift(1).rename("MSPI_lag")
    lagged_market = market_df.shift(1).add_suffix("_lag")

    rhs = pd.concat([lagged_mspi, lagged_market], axis=1)
    rhs = sm.add_constant(rhs)

    # LHS: MSPI_t
    lhs = mspi_series

    # Align
    data = pd.concat([lhs, rhs], axis=1).dropna()

    # Fit OLS
    model = sm.OLS(data[mspi_series.name], data.drop(columns=[mspi_series.name]))
    results = model.fit()

    # Residuals are innovations
    u_t = results.resid.rename("stress_innovation")

    return u_t

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 3: Estimate local projections.
# -------------------------------------------------------------------------------------------------------------------------------
def _estimate_local_projections(
    innovations: pd.Series,
    outcome_series: pd.Series,
    controls: pd.DataFrame,
    horizons: int = 12
) -> pd.DataFrame:
    """
    Estimates local projections of outcome on stress innovations.
    Equation: y_{t+h} = a_h + b_h * u_t + Gamma_h' * W_{t-1} + eps_{t+h}

    Parameters
    ----------
    innovations : pd.Series
        Stress innovations u_t.
    outcome_series : pd.Series
        Outcome variable y_t (e.g., sigma_mkt).
    controls : pd.DataFrame
        Control variables W_{t-1} (already lagged? No, usually contemporaneous to innovation generation time t-1 info).
        The task says W_{t-1}. We assume controls passed are Z_{t-1} aligned at t.
    horizons : int
        Max horizon H.

    Returns
    -------
    pd.DataFrame
        IRF table with coefficients and CI.
    """
    irf_results = []

    # Align base data
    # We need u_t and W_{t-1} aligned at index t
    # If controls are Z_t, we shift them.
    # Let's assume controls passed are Z_t, so we shift(1).
    w_lag = controls.shift(1).add_suffix("_lag")

    rhs_base = pd.concat([innovations, w_lag], axis=1)
    rhs_base = sm.add_constant(rhs_base)

    for h in range(horizons + 1):
        # LHS: y_{t+h}
        # Shift outcome backwards by h to align t+h with t
        lhs = outcome_series.shift(-h).rename("outcome_h")

        # Align
        data = pd.concat([lhs, rhs_base], axis=1).dropna()

        if len(data) < 20: # Safety check
            continue

        # Fit OLS with HAC
        # Lag for HAC should be at least h
        nw_lag = h + 1
        model = sm.OLS(data["outcome_h"], data.drop(columns=["outcome_h"]))
        results = model.fit(cov_type='HAC', cov_kwds={'maxlags': nw_lag})

        # Extract coefficient on innovation
        u_name = innovations.name
        b_h = results.params[u_name]
        se_h = results.bse[u_name]
        ci = results.conf_int().loc[u_name]

        irf_results.append({
            "Horizon": h,
            "Response": b_h,
            "SE": se_h,
            "CI_Lower": ci[0],
            "CI_Upper": ci[1]
        })

    return pd.DataFrame(irf_results).set_index("Horizon")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def perform_economic_analysis(
    mspi_forecasts: pd.DataFrame,
    market_aggregates: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the economic analysis: predictive regressions and local projections.

    Parameters
    ----------
    mspi_forecasts : pd.DataFrame
        MSPI forecasts (p_mspi).
    market_aggregates : pd.DataFrame
        Market variables (R_mkt, sigma_mkt).
    study_config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Analysis results.
    """
    # Extract Parameters
    econ_config = study_config["economic_analysis"]
    horizons = econ_config["local_projections"].get("horizons_h", 12)

    # Prepare Data
    mspi_col = "MSPI" if "MSPI" in mspi_forecasts.columns else "p_mspi"
    mspi_series = mspi_forecasts[mspi_col]

    # Intersection of indices
    common_idx = mspi_series.index.intersection(market_aggregates.index)
    mspi_series = mspi_series.loc[common_idx]
    market_df = market_aggregates.loc[common_idx]

    # 1. Predictive Regression
    pred_vol_results = _estimate_volatility_prediction(
        mspi_series, market_df, market_df["sigma_mkt"]
    )

    # 2. Innovations
    innovations = _construct_innovations(mspi_series, market_df)

    # 3. Local Projections
    # Outcome: Realized Volatility
    lp_vol = _estimate_local_projections(
        innovations, market_df["sigma_mkt"], market_df, horizons
    )

    return {
        "predictive_regression_vol": pred_vol_results,
        "innovations": innovations,
        "irf_volatility": lp_vol
    }


In [None]:
# Top-Level Orchestor Callable

# ==============================================================================
# Final Top-Level Orchestrator Function
# ==============================================================================

def execute_complete_mspi_research_pipeline(
    df_crsp_daily: pd.DataFrame,
    df_crsp_index: pd.DataFrame,
    raw_study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Executes the complete Market Stress Probability Index (MSPI) research pipeline,
    from raw data ingestion to final economic analysis and inference.

    This orchestrator enforces the strict sequential dependency graph required to
    reproduce the results of 'Algorithmic Monitoring' (Schmitt, 2026). It manages
    data flow from raw CRSP inputs through feature engineering, labeling,
    hyperparameter tuning, out-of-sample forecasting, robustness checks,
    performance evaluation, and economic impact analysis.

    Parameters
    ----------
    df_crsp_daily : pd.DataFrame
        Raw CRSP Daily Stock File (micro-data).
    df_crsp_index : pd.DataFrame
        Raw CRSP Daily Index File (market aggregate data).
    raw_study_config : Dict[str, Any]
        Configuration dictionary specifying all study parameters.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing the following keys:
        - 'monthly_features': pd.DataFrame of X_t features.
        - 'market_aggregates': pd.DataFrame of R_mkt and sigma_mkt.
        - 'labels': pd.DataFrame of S_t and Y_next.
        - 'mspi_forecasts': pd.DataFrame of MSPI out-of-sample forecasts.
        - 'benchmark_forecasts': pd.DataFrame of Benchmark out-of-sample forecasts.
        - 'unified_evaluation_panel': pd.DataFrame of all model forecasts (MSPI, Bench, RF, GB).
        - 'discrimination_metrics': pd.DataFrame of AUC/PR-AUC.
        - 'probability_metrics': pd.DataFrame of Brier/LogLoss/ECE.
        - 'bootstrap_inference': pd.DataFrame of statistical significance tests.
        - 'economic_analysis': Dict of predictive regressions, innovations, and IRFs.
        - 'frozen_hyperparams': Dict of tuned hyperparameters.
        - 'audit_log': Dict of detailed execution logs and statistics.
    """
    # Initialize Audit Log
    audit_log = {}

    # -------------------------------------------------------------------------
    # Phase 1: Configuration and Validation
    # -------------------------------------------------------------------------
    print("Phase 1: Configuration and Validation")

    # Task 1: Validate Config
    config_res = validate_study_config(raw_study_config)
    study_config = config_res["validated_config"]
    audit_log["config_validation"] = config_res["validation_report"]

    # Task 2: Validate Micro Data
    val_daily = validate_crsp_daily(df_crsp_daily, study_config)
    audit_log["validation_daily"] = val_daily

    # Task 3: Validate Index Data
    val_index = validate_crsp_index(df_crsp_daily, df_crsp_index, study_config)
    audit_log["validation_index"] = val_index

    # -------------------------------------------------------------------------
    # Phase 2: Data Cleansing and Universe Construction
    # -------------------------------------------------------------------------
    print("Phase 2: Data Cleansing and Universe Construction")

    # Task 4: Cleanse Micro Data
    clean_daily_res = cleanse_crsp_daily(df_crsp_daily, study_config)
    df_daily_clean = clean_daily_res["df_clean"]
    audit_log["cleansing_daily"] = clean_daily_res["audit_log"]

    # Task 5: Cleanse Index Data
    clean_index_res = cleanse_crsp_index(df_daily_clean, df_crsp_index, study_config)
    df_index_clean = clean_index_res["df_index_clean"]
    audit_log["cleansing_index"] = clean_index_res["audit_log"]

    # Task 6: Construct Eligible Universe
    universe_res = construct_eligible_universe(df_daily_clean, study_config)
    df_eligible = universe_res["df_eligible"]
    nd_series = universe_res["nd_series"]
    audit_log["universe_construction"] = universe_res["audit_log"]

    # -------------------------------------------------------------------------
    # Phase 3: Feature Engineering (Daily Signals)
    # -------------------------------------------------------------------------
    print("Phase 3: Feature Engineering (Daily Signals)")

    # Task 7: Return Moments
    moments = compute_daily_return_moments(df_eligible, study_config)

    # Task 8: Tail Measures
    tails = compute_daily_tail_measures(df_eligible, study_config)

    # Task 9: Trading Proxies
    proxies = compute_daily_trading_proxies(df_eligible, study_config)

    # Merge Daily Statistics
    # We align on DATE index. All should have same index from df_eligible.
    daily_stats_all = pd.concat([moments, tails, proxies], axis=1)

    # -------------------------------------------------------------------------
    # Phase 4: Monthly Aggregation and Labeling
    # -------------------------------------------------------------------------
    print("Phase 4: Monthly Aggregation and Labeling")

    # Task 10: Aggregate Monthly Features
    agg_res = aggregate_monthly_features(daily_stats_all, nd_series, study_config)
    X_t = agg_res["X_t"]
    audit_log["monthly_aggregation"] = agg_res["audit_log"]

    # Task 11: Construct Market Aggregates
    market_res = construct_market_aggregates(df_index_clean, X_t, study_config)
    aligned_panel = market_res["aligned_panel"]
    audit_log["market_aggregates"] = market_res["audit_log"]

    # Task 12: Construct Stress Labels
    label_res = construct_stress_labels(aligned_panel, study_config)
    modeling_panel = label_res["modeling_panel"]
    audit_log["label_construction"] = label_res["audit_log"]

    # -------------------------------------------------------------------------
    # Phase 5: Model Training and Forecasting
    # -------------------------------------------------------------------------
    print("Phase 5: Model Training and Forecasting")

    # Task 14: Tune Hyperparameters
    # We pass the expanding_window_standardizer callable explicitly
    tune_res = tune_hyperparameters(
        modeling_panel,
        expanding_window_standardizer,
        study_config
    )
    frozen_params = tune_res
    audit_log["hyperparameter_tuning"] = tune_res["tuning_audit"]

    # Task 15: Forecast MSPI
    mspi_forecasts = forecast_mspi(
        modeling_panel,
        frozen_params,
        expanding_window_standardizer,
        study_config
    )

    # Task 16: Forecast Benchmark
    bench_forecasts = forecast_benchmark(
        modeling_panel,
        frozen_params,
        expanding_window_standardizer,
        study_config
    )

    # Task 18: Robustness Horse Race (RF/GB)
    unified_evaluation_panel = run_robustness_horse_race(
        modeling_panel,
        mspi_forecasts,
        bench_forecasts,
        frozen_params,
        expanding_window_standardizer,
        study_config
    )

    # -------------------------------------------------------------------------
    # Phase 6: Evaluation and Inference
    # -------------------------------------------------------------------------
    print("Phase 6: Evaluation and Inference")

    # Task 19: Discrimination Metrics
    discrim_metrics = compute_discrimination_metrics(unified_evaluation_panel)

    # Task 20: Probability Metrics
    prob_metrics = compute_probability_metrics(
        unified_evaluation_panel, discrim_metrics, study_config
    )

    # Task 21: Bootstrap Inference
    bootstrap_res = perform_bootstrap_inference(
        unified_evaluation_panel, study_config
    )

    # -------------------------------------------------------------------------
    # Phase 7: Economic Analysis
    # -------------------------------------------------------------------------
    print("Phase 7: Economic Analysis")

    # Task 22: Economic Analysis
    # We need market aggregates aligned with forecasts
    # modeling_panel has R_mkt, sigma_mkt aligned with X_t
    # We use the intersection of indices
    econ_res = perform_economic_analysis(
        mspi_forecasts,
        modeling_panel[["R_mkt", "sigma_mkt"]],
        study_config
    )

    # -------------------------------------------------------------------------
    # Phase 8: Finalization
    # -------------------------------------------------------------------------
    print("Phase 8: Finalization and Safety Checks")

    # Run Anti-Look-Ahead Checks
    _run_anti_look_ahead_checks(mspi_forecasts, modeling_panel, audit_log)

    # Compile Final Results
    results = {
        "monthly_features": X_t,
        "market_aggregates": aligned_panel[["R_mkt", "sigma_mkt"]],
        "labels": modeling_panel[["S_t", "Y_next"]],
        "mspi_forecasts": mspi_forecasts,
        "benchmark_forecasts": bench_forecasts,
        "unified_evaluation_panel": unified_evaluation_panel,
        "discrimination_metrics": discrim_metrics,
        "probability_metrics": prob_metrics,
        "bootstrap_inference": bootstrap_res,
        "economic_analysis": econ_res,
        "frozen_hyperparams": frozen_params,
        "audit_log": audit_log
    }

    return results