# README.md

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Type Checking: mypy](https://img.shields.io/badge/type_checking-mypy-blue)](http://mypy-lang.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%23025596?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![Statsmodels](https://img.shields.io/badge/Statsmodels-150458.svg?style=flat&logo=python&logoColor=white)](https://www.statsmodels.org/stable/index.html)
[![Joblib](https://img.shields.io/badge/Joblib-013243.svg?style=flat&logo=python&logoColor=white)](https://joblib.readthedocs.io/en/latest/)
[![Matplotlib](https://img.shields.io/badge/Matplotlib-%23ffffff.svg?style=flat&logo=Matplotlib&logoColor=black)](https://matplotlib.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2507.17599v1-b31b1b.svg)](https://arxiv.org/abs/2507.17599v1)
[![Research](https://img.shields.io/badge/Research-Empirical%20Asset%20Pricing-green)](https://github.com/chirindaopensource/randomized_test_alpha)
[![Discipline](https://img.shields.io/badge/Discipline-Econometrics-blue)](https://github.com/chirindaopensource/randomized_test_alpha)
[![Methodology](https://img.shields.io/badge/Methodology-Panel%20Data%20%7C%20Monte%20Carlo-orange)](https://github.com/chirindaopensource/randomized_test_alpha)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/randomized_test_alpha)

**Repository:** `https://github.com/chirindaopensource/randomized_test_alpha`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"A General Randomized Test for Alpha"** by:

*   Daniele Massacci
*   Lucio Sarno
*   Lorenzo Trapani
*   Pierluigi Vallarino

The project provides a complete, end-to-end pipeline for testing the joint null hypothesis of zero alpha in high-dimensional linear factor models. It replicates the paper's novel randomized testing procedure, which is robust to common violations of classical statistical assumptions such as non-Gaussianity, conditional heteroskedasticity, and strong cross-sectional dependence. The goal is to provide a transparent, robust, and computationally efficient "truth detector" for asset pricing models.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: run_full_study](#key-callable-run_full_study)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "A General Randomized Test for Alpha." The core of this repository is the iPython Notebook `randomized_test_alpha_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation and cleansing to the final generation of empirical results and Monte Carlo simulations.

The central question in empirical asset pricing is whether a proposed factor model can fully explain the cross-section of expected returns. A model is considered well-specified if the pricing errors, or "alphas," are jointly indistinguishable from zero. This project provides the tools to rigorously test this hypothesis in challenging, high-dimensional settings (where the number of assets `N` can be larger than the number of time periods `T`).

This codebase enables users to:
-   Rigorously validate, clean, and align large panels of asset and factor return data.
-   Apply the novel randomized alpha test to various factor model specifications in a rolling-window fashion.
-   Conduct comprehensive Monte Carlo simulations to verify the test's statistical properties (size and power) under various data generating processes.
-   Benchmark the test's performance against other methods from the literature.
-   Automatically generate publication-quality tables and visualizations to report the findings.

## Theoretical Background

The implemented methods are grounded in modern panel data econometrics and Extreme Value Theory.

**The Linear Factor Model:** The analysis begins with the standard time-series regression for a panel of `N` assets over `T` periods:
$$
y_{i,t} = \alpha_i + \beta'_i f_t + u_{i,t}
$$
The null hypothesis is that the model is correctly specified, meaning all pricing errors (`α_i`) are jointly zero:
$$
H_0: \max_{1 \le i \le N} |\alpha_i| = 0
$$

**Econometric Challenges:** Traditional tests like the GRS test fail in modern settings due to:
1.  **High Dimensionality:** When `N > T`, the `N x N` residual covariance matrix is singular and cannot be inverted.
2.  **Restrictive Assumptions:** Classical tests often assume normally distributed, homoskedastic, and serially uncorrelated errors, which are frequently violated by financial returns.

**The Randomized Test Methodology:** The paper's key innovation is a multi-step procedure that circumvents these issues:
1.  **OLS Estimation:** Estimate `α̂_i` and residuals `û_{i,t}` for each asset `i`.
2.  **Normalization:** Transform the alphas into a scale-free statistic `ψ_{i,NT}` that converges to zero under `H₀` but diverges under the alternative `Hₐ`.
    $$ \psi_{i,NT} = \left| \frac{T^{1/\nu} \hat{\alpha}_{i,T}}{\hat{s}_{NT}} \right|^{\nu/2} $$
3.  **Randomization:** Perturb the `ψ` statistics with i.i.d. standard normal shocks `ω_i` to create `z_{i,NT} = ψ_{i,NT} + ω_i`. Under `H₀`, the `z` statistics behave like a standard normal sample.
4.  **Test Statistic:** The final test statistic is the maximum of the perturbed series, `Z_{N,T} = max(z_{i,NT})`. By Extreme Value Theory, the distribution of `Z_{N,T}` under `H₀` is asymptotically Gumbel, providing a known distribution for inference without estimating any covariance matrices.
5.  **De-randomization:** To ensure a deterministic result, the randomization is repeated `B` times, and the final decision is based on the fraction of times `Z_{N,T}` falls below its asymptotic critical value.

## Features

The provided iPython Notebook (`randomized_test_alpha_draft.ipynb`) implements the full research pipeline, including:

-   **Data Pipeline:** A robust validation and cleansing module for preparing large panel datasets.
-   **Empirical Analysis Engine:** A high-level orchestrator that automates the rolling-window analysis for multiple user-defined factor models.
-   **Core Test Implementation:** A precise, numerically stable implementation of the complete randomized testing procedure, from OLS estimation to the final de-randomized decision.
-   **Monte Carlo Framework:** A powerful, parallelized simulation engine to evaluate the test's size and power under various DGPs (Gaussian, Student's t, GARCH errors; weak, semi-strong, and strong factors).
-   **Comparative Analysis:** A framework for benchmarking the test against other methods from the literature (e.g., FLY, AS tests).
-   **Automated Reporting:** Functions to automatically generate publication-quality summary tables and visualizations for both empirical and simulation results.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Preparation (Tasks 1-2):** The pipeline ingests raw asset and factor returns, validates their structure, handles missing values via constrained imputation, treats outliers with cross-sectional winsorization, and aligns the datasets.
2.  **Empirical Analysis (Tasks 3-9):** It sets up a rolling-window schedule and, for each window and each model, executes the full testing procedure:
    -   Vectorized OLS estimation (Task 4).
    -   `ψ` statistic computation (Task 5).
    -   Randomization and `Z` statistic computation (Task 6).
    -   De-randomization and `Q` statistic computation (Task 7).
    -   Application of the final decision rule (Task 8).
3.  **Monte Carlo Analysis (Tasks 13-17):** It sets up a grid of simulation experiments, generates data from complex DGPs (Task 14), and runs the test `M` times to compute empirical size and power (Task 15), managed by a high-level orchestrator (Task 16).
4.  **Reporting (Tasks 11, 12, 18, 19):** It synthesizes all quantitative outputs into summary tables, power curve plots, and a structured textual interpretation of the findings.

## Core Components (Notebook Structure)

The `randomized_test_alpha_draft.ipynb` notebook is structured as a logical pipeline with modular functions for each task:

-   **Tasks 1-3:** Data Validation, Cleansing, and Empirical Setup.
-   **Tasks 4-8:** The core testing pipeline for a single window (Estimation, Statistic Computation, Randomization, De-randomization, Decision).
-   **Task 9:** `EmpiricalAnalysisOrchestrator` class to run the empirical study.
-   **Tasks 10-12:** Empirical Robustness, Compilation, and Visualization.
-   **Tasks 13-16:** Monte Carlo Setup, DGP, Simulation Engine, and Orchestrator.
-   **Tasks 17-19:** Monte Carlo Robustness, Compilation, and Interpretation.
-   **Task 20:** `run_full_study` master function to execute the entire project.

## Key Callable: run_full_study

The central function in this project is `run_full_study`. It orchestrates the entire analytical workflow from raw data to a final, comprehensive report object.

```python
def run_full_study(
    asset_returns: pd.DataFrame,
    factor_returns: pd.DataFrame,
    replication_config: Dict[str, Any],
    run_empirical: bool = True,
    run_monte_carlo: bool = True,
    n_jobs: int = -1,
    seed: int = 42
) -> Dict[str, Any]:
    """
    Executes the complete end-to-end research pipeline for the randomized alpha test.
    """
    # ... (implementation is in the notebook)
```

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `statsmodels`, `matplotlib`, `joblib`, `tqdm`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/randomized_test_alpha.git
    cd randomized_test_alpha
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy statsmodels matplotlib joblib tqdm
    ```

## Input Data Structure

The pipeline requires three inputs passed to the `run_full_study` function:

1.  **`asset_returns`**: A `pandas.DataFrame` where the index is a monthly `DatetimeIndex` and columns are individual asset returns.
2.  **`factor_returns`**: A `pandas.DataFrame` with the same monthly `DatetimeIndex` and columns for the factor returns (e.g., 'Mkt-RF', 'SMB', etc.).
3.  **`replication_config`**: A nested Python dictionary that controls every aspect of the empirical and Monte Carlo analyses. A fully specified example is provided in the notebook.

## Usage

The `randomized_test_alpha_draft.ipynb` notebook provides a complete, step-by-step guide. The core workflow is:

1.  **Prepare Inputs:** Load your asset and factor returns into DataFrames and define the `replication_config` dictionary.
2.  **Execute Pipeline:** Call the master orchestrator function:
    ```python
    full_study_results = run_full_study(
        asset_returns=my_asset_returns_df,
        factor_returns=my_factor_returns_df,
        replication_config=my_config
    )
    ```
3.  **Inspect Outputs:** Programmatically access any result from the returned `full_study_results` dictionary. For example, to view the main empirical summary table:
    ```python
    # The output is a pandas Styler object, access the data with .data
    empirical_table = full_study_results['empirical_summary_table'].data
    print(empirical_table)
    ```

## Output Structure

The `run_full_study` function returns a single, comprehensive dictionary with the following top-level keys:

-   `empirical_timeseries_results`: A `pd.DataFrame` with the detailed, window-by-window results of the empirical analysis.
-   `empirical_summary_table`: A styled `pd.DataFrame` summarizing the empirical rejection rates.
-   `empirical_q_statistic_plot`: A tuple containing the `matplotlib` Figure and Axes objects for the main empirical plot.
-   `monte_carlo_raw_results`: A tidy `pd.DataFrame` with the results from every Monte Carlo experimental cell.
-   `monte_carlo_summary_tables`: A dictionary of styled `pd.DataFrame`s, one for each simulation scenario.
-   `final_quantitative_summary`: A nested dictionary containing a quantitative interpretation of the key findings.

## Project Structure

```
randomized_test_alpha/
│
├── randomized_test_alpha_draft.ipynb  # Main implementation notebook
├── requirements.txt                     # Python package dependencies
├── LICENSE                              # MIT license file
└── README.md                            # This documentation file
```

## Customization

The pipeline is highly customizable via the master `replication_config` dictionary. Users can easily modify:
-   The `model_specifications` to test different factor models.
-   The `rolling_window_size_months` and date ranges for the empirical study.
-   The `N_grid`, `T_grid`, `scenarios`, and all DGP parameters for the Monte Carlo simulations.
-   The parameters of the test itself, such as `nu` and `tau`.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{massacci2025general,
  title={A general randomized test for Alpha},
  author={Massacci, Daniele and Sarno, Lucio and Trapani, Lorenzo and Vallarino, Pierluigi},
  journal={arXiv preprint arXiv:2507.17599v1},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Python Implementation of "A general randomized test for Alpha".
GitHub repository: https://github.com/chirindaopensource/randomized_test_alpha
```

## Acknowledgments

-   Credit to Daniele Massacci, Lucio Sarno, Lorenzo Trapani, and Pierluigi Vallarino for their innovative and rigorous research.
-   Thanks to the developers of the scientific Python ecosystem (`numpy`, `pandas`, `scipy`, `statsmodels`, `matplotlib`, `joblib`) that makes this work possible.

--

*This README was generated based on the structure and content of `randomized_test_alpha_draft.ipynb` and follows best practices for research software documentation.*

# Paper

Title: "*A general randomized test for Alpha*"

Authors: Daniele Massacci, Lucio Sarno, Lorenzo Trapani, Pierluigi Vallarino

E-Journal Submission Date: 23 July 2025

Link: https://arxiv.org/abs/2507.17599v1

Abstract:

We propose a methodology to construct tests for the null hypothesis that the pricing errors of a panel of asset returns are jointly equal to zero in a linear factor asset pricing model -- that is, the null of "zero alpha". We consider, as a leading example, a model with observable, tradable factors, but we also develop extensions to accommodate for non-tradable and latent factors. The test is based on equation-by-equation estimation, using a randomized version of the estimated alphas, which only requires rates of convergence. The distinct features of the proposed methodology are that it does not require the estimation of any covariance matrix, and that it allows for both N and T to pass to infinity, with the former possibly faster than the latter. Further, unlike extant approaches, the procedure can accommodate conditional heteroskedasticity, non-Gaussianity, and even strong cross-sectional dependence in the error terms. We also propose a de-randomized decision rule to choose in favor or against the correct specification of a linear factor pricing model. Monte Carlo simulations show that the test has satisfactory properties and it compares favorably to several existing tests. The usefulness of the testing procedure is illustrated through an application of linear factor pricing models to price the constituents of the S&P 500.

# Summary

The authors propose a new, highly robust statistical test to determine if a set of assets has "alpha" – that is, returns that cannot be explained by a given linear factor model. Their method is designed for the modern, high-dimensional world of finance (many assets, `N`, relative to time periods, `T`).

The key innovation is a **randomization technique** that circumvents the single biggest hurdle in this area: the need to estimate and invert a massive, often ill-conditioned, `N x N` covariance matrix of asset returns. Their test works well even with non-normal returns, time-varying volatility (like GARCH), and strong cross-asset correlations. They also provide a "de-randomized" procedure to ensure results are reproducible, a critical feature for practical application.

--

### **Step-by-Step Academic Breakdown**

#### **Step 1: The Core Problem in Asset Pricing**

The fundamental question is whether a linear factor model, like the CAPM or the Fama-French models, can fully explain the expected returns of a large panel of assets. The model is typically specified as:

*   **Model:** `y_i,t = α_i + β_i' * f_t + u_i,t`
    *   `y_i,t`: Excess return of asset `i` at time `t`.
    *   `α_i`: The "alpha" or pricing error for asset `i`. This is the part of the return not explained by the factors.
    *   `f_t`: A vector of common risk factors (e.g., market return, size, value).
    *   `β_i`: The factor loadings (sensitivity) of asset `i` to the factors.
    *   `u_i,t`: The idiosyncratic error term.

*   **The Null Hypothesis (H₀):** The model is correctly specified. This means all pricing errors are jointly zero: `H₀: α₁ = α₂ = ... = α_N = 0`. Equivalently, `max|α_i| = 0`.

*   **The Challenge:** Testing this hypothesis is notoriously difficult. Classic tests like the Gibbons-Ross-Shanken (GRS) test require `N < T` and make strong assumptions (e.g., Gaussian, i.i.d. errors) that are routinely violated by real financial data. More modern tests that allow for large `N` still struggle with estimating the `N x N` error covariance matrix and often impose restrictive assumptions on the dependence structure of the data.

#### **Step 2: The Proposed Solution - A Randomized Maximum Test**

The authors build their test in a clever, multi-stage process that avoids the covariance matrix problem entirely.

1.  **Equation-by-Equation Estimation:** For each of the `N` assets, they run a simple time-series OLS regression to get an estimate of its alpha, `α̂_i`. This is computationally trivial and perfectly parallelizable.

2.  **Transformation:** They create a new statistic, `ψ_i,NT`, by scaling the absolute value of the estimated alpha, `|α̂_i|`. This scaling is designed so that:
    *   Under the null hypothesis (`α_i = 0`), `ψ_i,NT` converges to zero.
    *   Under the alternative hypothesis (`α_i ≠ 0`), `ψ_i,NT` diverges to positive infinity.

3.  **The "Magic" of Randomization:** They perturb each `ψ_i,NT` by adding an independent, standard normal random shock, `ω_i ~ N(0,1)`. This creates a new set of statistics: `z_i,NT = ψ_i,NT + ω_i`.
    *   **Why?** This step is crucial. It effectively "washes out" the complex and unknown cross-sectional dependence structure of the original error terms. Under the null hypothesis, since `ψ_i,NT` goes to zero, the `z_i,NT` behave like an i.i.d. sequence of standard normal variables, regardless of the original correlation structure.

4.  **The Test Statistic:** The final test statistic, `Z_N,T`, is the **maximum** of these randomized statistics: `Z_N,T = max(z_i,NT)`.

#### **Step 3: The Asymptotic Theory**

The choice of the maximum statistic leads to a result from **Extreme Value Theory**.

*   **Under the Null (H₀):** The distribution of the (appropriately centered and scaled) `Z_N,T` converges to a **Gumbel distribution**. This is a standard, known distribution, so critical values can be easily calculated.
*   **Under the Alternative (Hₐ):** At least one `α_i` is non-zero, causing the corresponding `ψ_i,NT` to diverge. This "spike" ensures that the maximum statistic `Z_N,T` also diverges to infinity, guaranteeing the test has power to detect mispricing.

#### **Step 4: De-Randomization for Practical Use**

A clear drawback of the test in Step 2 is that the result depends on the specific random shocks (`ω_i`) drawn. Two researchers with the same data would get different p-values. To solve this, the authors propose a de-randomization procedure:

1.  **Replication:** Run the entire test `B` times (e.g., `B = 1000`), each time with a new set of random shocks `ω_i`.
2.  **Aggregation:** Calculate the statistic `Q_N,T,B(τ)`, which is simply the *fraction of times* the null hypothesis was *not* rejected at a given significance level `τ` (e.g., 5%).
3.  **Decision Rule:**
    *   Under H₀, `Q` will converge to `1 - τ` (e.g., 0.95).
    *   Under Hₐ, `Q` will converge to 0.
    *   The researcher makes a final, deterministic decision by comparing `Q` to a threshold close to `1 - τ`. This result is stable and reproducible.

#### **Step 5: Generality and Extensions**

The authors demonstrate the flexibility of their framework by extending it beyond the simple case of observable, tradable factors. The core randomization machinery works as long as a consistent estimator for alpha exists. They show it can be adapted to:

*   **Non-Tradable Factors:** Using the two-pass Fama-MacBeth estimation procedure.
*   **Latent (Unobservable) Factors:** Using Principal Component Analysis (PCA) to first estimate the factors from the asset returns themselves.

This significantly broadens the applicability of the test to a wider range of modern asset pricing models.

#### **Step 6: Validation and Empirical Findings**

*   **Monte Carlo Simulations:** The authors rigorously test their procedure against simulated data with known properties (Gaussian, fat-tailed Student's t, and GARCH errors). Their test consistently shows correct **size** (it doesn't reject the null too often when it's true), a property where many competing tests fail. It also exhibits excellent **power** (it correctly rejects the null when it's false).

*   **Empirical Application (S&P 500):** They apply the test to constituents of the S&P 500 from 1985-2024, using 5-year rolling windows.
    *   **Key Finding:** No single factor model (from CAPM to the Fama-French 6-factor model) is correctly specified across all time periods.
    *   **Crisis Performance:** All models perform particularly poorly during periods of market turmoil (e.g., the 2008 Global Financial Crisis, the COVID-19 pandemic), consistently failing to price assets correctly.
    *   **Post-2008:** The performance of all models seems to improve after the GFC, suggesting a possible structural shift in markets.

### **Conclusion**

This paper makes a significant methodological contribution to the field of empirical asset pricing. It provides a powerful, robust, and computationally feasible tool for a foundational question in finance. By cleverly using randomization to bypass the estimation of the covariance matrix, the authors have developed a test that is well-suited for the high-dimensional and complex nature of modern financial data. The de-randomization step makes it a practical tool for academic and industry research.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ==============================================================================#
#
#  A General Randomized Test for Alpha in High-Dimensional Factor Models
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "A general randomized test for Alpha"
#  by Massacci, Sarno, Trapani, and Vallarino (2025). It delivers a robust
#  and computationally sound "truth detector" for any proposed linear factor
#  model, allowing practitioners and researchers to rigorously validate whether
#  a given model fully captures all systematic drivers of return for a large
#  universe of assets, even when returns violate classical statistical assumptions.
#
#  Core Methodological Components:
#   • Test for the joint null hypothesis of zero alphas (pricing errors).
#   • A novel randomization procedure that avoids covariance matrix estimation.
#   • A de-randomized decision rule for a deterministic, reproducible outcome.
#   • Asymptotic theory based on Extreme Value Theory (Gumbel distribution).
#   • Robustness to conditional heteroskedasticity, non-Gaussianity, and strong
#     cross-sectional dependence in residuals.
#   • Applicability to high-dimensional panels where N (assets) can be > T (time).
#
#  Technical Implementation Features:
#   • End-to-end pipeline from data cleansing to final report generation.
#   • High-performance, vectorized OLS estimation for large panels.
#   • Rolling-window analysis framework for empirical applications.
#   • Comprehensive Monte Carlo simulation engine for evaluating test properties
#     (size, power) under various data generating processes (DGP).
#   • Modular and extensible design for both empirical and simulation studies.
#   • Professional-grade visualization for time-series and cross-sectional results.
#
#  Paper Reference:
#  Massacci, D., Sarno, L., Trapani, L., & Vallarino, P. (2025). A general
#  randomized test for Alpha. arXiv preprint arXiv:2507.17599v1.
#  https://arxiv.org/abs/2507.17599v1
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# =============================================================================

# =============================================================================
# Standard Library Imports
# =============================================================================

import copy
import logging
import math
from typing import (
    Any,
    Callable,
    Dict,
    Iterator,
    List,
    NamedTuple,
    Optional,
    Tuple,
    Union
)

# =============================================================================
# Third-Party Library Imports
# =============================================================================

# Core numerical and data manipulation libraries
import numpy as np
import pandas as pd

# Statistical modeling and hypothesis testing
from scipy.stats import chi2, cauchy, t
from statsmodels.stats.proportion import proportion_confint

# Parallel processing and progress tracking
from joblib import Parallel, delayed
from tqdm.auto import tqdm

# Visualization
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

# =============================================================================
#  Logging Configuration
# =============================================================================

# A single, consistent logging configuration for the entire project.
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# =============================================================================
# Matplotlib Style Configuration
# =============================================================================

# Set a professional and consistent style for all plots.
plt.style.use('seaborn-v0_8-whitegrid')


# Implementation

## Draft 1

### Analysis of the Inputs, Processes and Outputs (IPO) of Key Callables

### **Analysis of All Final Callables**

#### **Task 1: Data and Parameter Validation**

*   **Callable:** `validate_inputs`
    *   **Inputs:** `asset_returns` (raw `pd.DataFrame`), `factor_returns` (raw `pd.DataFrame`), `replication_config` (`Dict`).
    *   **Process:** This function acts as a master gateway. It does not transform data but performs a series of checks by calling its private helpers. It verifies that the DataFrames have the correct structure (`DatetimeIndex`, monthly frequency), are not empty, and contain numeric data. It checks for excessive missing values and flags potential outliers. It recursively validates the structure and key parameter values of the `replication_config` dictionary against a predefined schema.
    *   **Outputs:** A boolean `True` upon successful validation of all inputs. Its primary output is a side effect: it raises a specific `TypeError` or `ValueError` if any check fails, halting the pipeline before any computation occurs.
    *   **Role in Research Pipeline:** This callable implements the foundational step of **input sanitation and verification**. It ensures that the raw data and parameters are suitable for the analysis, preventing a "garbage in, garbage out" scenario and ensuring the preconditions for the subsequent econometric procedures are met.

--

#### **Task 2: Data Cleansing**

*   **Callable:** `clean_and_align_data`
    *   **Inputs:** `asset_returns` (raw `pd.DataFrame`), `factor_returns` (raw `pd.DataFrame`), and several scalar configuration parameters (`min_obs_frac_col`, `winsorize_quantiles`, etc.).
    *   **Process:** This function executes a sequential data transformation pipeline.
        1.  **Missing Value Handling:** It first filters assets and time periods with excessive `NaN`s. It then applies a constrained forward-fill to impute small, consecutive gaps in the asset returns. Finally, it prunes any assets that still contain `NaN`s.
        2.  **Outlier Treatment:** It performs cross-sectional Winsorization on the imputed asset returns. For each time period, it calculates specified quantiles (e.g., 1st and 99th) and caps all returns in that period to fall within this range.
        3.  **Alignment:** It determines the common `DatetimeIndex` between the treated asset returns and the factor returns. It slices both DataFrames to this common index and performs a final filter on the assets to ensure each has a sufficient number of observations for the subsequent rolling window analysis.
    *   **Outputs:** A tuple containing:
        1.  `final_assets` (`pd.DataFrame`): The cleansed, treated, and aligned asset returns.
        2.  `aligned_factors` (`pd.DataFrame`): The aligned factor returns.
        3.  `full_log` (`Dict`): A detailed dictionary auditing every action taken (assets dropped, values imputed, outliers winsorized).
    *   **Role in Research Pipeline:** This callable implements the crucial **data preparation** step. High-quality empirical research requires clean, well-behaved data. This function ensures the dataset is dense, free of extreme outliers that could bias OLS, and perfectly aligned, making it ready for the econometric analysis.

--

#### **Task 3: Empirical Analysis Setup**

*   **Callable:** `setup_empirical_analysis_windows`
    *   **Inputs:** `aligned_data_index` (`pd.DatetimeIndex`), `replication_config` (`Dict`).
    *   **Process:** The function parses the `empirical:run_controls` section of the configuration to get the `window_size`, `start_date`, and `end_date`. It determines the *effective* date range by finding the intersection of the configured range and the actual range of the provided data index. It then programmatically generates a list of all possible full-sized rolling windows within this effective range. Each window is represented as a `(start_date, end_date)` tuple.
    *   **Outputs:** A `List[Tuple[pd.Timestamp, pd.Timestamp]]` representing the complete schedule for the rolling window analysis.
    *   **Role in Research Pipeline:** This callable implements the **experimental design** for the empirical study. It translates the high-level parameters of the study into a concrete, iterable plan of execution for the main analysis orchestrator.

--

#### **Task 4: Model Estimation**

*   **Callable:** `estimate_factor_model_params`
    *   **Inputs:** `asset_returns_window` (`pd.DataFrame`), `factor_returns_window` (`pd.DataFrame`).
    *   **Process:** This function performs a vectorized OLS estimation for all assets in the provided window simultaneously. It constructs the regressor matrix `X` (factors plus an intercept) and the dependent variable matrix `Y` (asset returns). It then solves the system of normal equations `(X'X)B = X'Y` for the `(K+1) x N` coefficient matrix `B` using the numerically stable `np.linalg.solve`. From `B`, it extracts the vector of alphas and the matrix of betas. Finally, it calculates the `T x N` matrix of residuals `U = Y - XB`.
    *   **Outputs:** A `FactorModelEstimationResult` `NamedTuple` containing the estimated `alphas` (`pd.Series`), `betas` (`pd.DataFrame`), `residuals` (`pd.DataFrame`), and the `condition_number` of the regressor matrix.
    *   **Role in Research Pipeline:** This callable is the workhorse of the empirical analysis, implementing the **parameter estimation** step. It directly estimates the key components of the linear factor model for each asset `i`:
        $$ y_{i,t} = \alpha_i + \beta'_i f_t + u_{i,t} $$
        The estimated `α̂_i` and `û_{i,t}` are the critical inputs for the subsequent test statistic calculation.

--

#### **Task 5: Test Statistic Computation**

*   **Callable:** `compute_psi_statistic`
    *   **Inputs:** `estimation_result` (`FactorModelEstimationResult`), `nu` (`float`).
    *   **Process:** The function first calls its helper, `_calculate_scaling_factor`, which computes the scaling factor `s_NT` from the input residuals. It then applies the main non-linear transformation to the estimated alphas.
    *   **Outputs:** A `pd.Series` of the `ψ_i,NT` statistics, indexed by asset.
    *   **Role in Research Pipeline:** This callable implements the **core transformation** of the test statistic. It takes the raw alpha estimates and converts them into a normalized, unit-free quantity that has the desired asymptotic properties under the null and alternative hypotheses. It implements:
        *   **Equation (3.3):** $$ \hat{s}_{NT} = \sqrt{\frac{1}{NT} \sum_{i=1}^{N} \sum_{t=1}^{T} \hat{u}_{i,t}^2} $$
        *   **Equation (3.2):** $$ \psi_{i,NT} = \left| \frac{T^{1/\nu} \hat{\alpha}_{i,T}}{\hat{s}_{NT}} \right|^{\nu/2} $$

--

#### **Task 6: Randomization and Test Statistic**

*   **Callable:** `compute_randomized_test_statistic`
    *   **Inputs:** `psi_values` (`pd.Series`), `rng` (`np.random.Generator`).
    *   **Process:** The function first generates `N` i.i.d. standard normal shocks, `ω_i`. It then adds these shocks element-wise to the input `ψ_i,NT` values to create the perturbed statistics `z_i,NT`. Finally, it finds the maximum value of this new series.
    *   **Outputs:** A single scalar `float`, `Z_N,T`.
    *   **Role in Research Pipeline:** This callable implements the **randomization procedure**, which is the central innovation of the paper. It transforms the deterministic (but with unknown distribution) `ψ` statistics into a new statistic `Z_N,T` whose asymptotic distribution under the null is a known Gumbel distribution. It implements:
        *   $$ z_{i,NT} = \psi_{i,NT} + \omega_i, \quad \text{where } \omega_i \sim \mathcal{N}(0, 1) $$
        *   $$ Z_{N,T} = \max_{1 \le i \le N} z_{i,NT} $$

--

#### **Task 7: De-randomization**

*   **Callable:** `perform_de_randomization`
    *   **Inputs:** `psi_values` (`pd.Series`), `tau` (`float`), `b_function` (`Callable`), `rng` (`np.random.Generator`).
    *   **Process:** The function first computes the asymptotic Gumbel critical value, `c_τ`. It then enters a loop that runs `B` times (where `B` is determined by `b_function`). In each iteration, it calls `compute_randomized_test_statistic` to get one realization of `Z_N,T`. After the loop, it calculates the proportion of these realizations that were less than or equal to `c_τ`.
    *   **Outputs:** A `DeRandomizationResult` `NamedTuple` containing the final `q_statistic`, the `critical_value` used, the number of `b_replications`, and `tau`.
    *   **Role in Research Pipeline:** This callable implements the **de-randomization procedure** from Section 3.1. It removes the dependence of the final p-value on a single random draw by integrating over the distribution of the randomized statistic. It produces the key decision variable, `Q_N,T,B(τ)`. It implements:
        *   **Equation (3.6):** $$ c_{\tau} = b_N - a_N \log(-\log(1-\tau)) $$
        *   **Equation (3.7):** $$ Q_{N,T,B}(\tau) = B^{-1} \sum_{b=1}^{B} I(Z_{N,T}^{(b)} \le c_{\tau}) $$

--

#### **Task 8: Decision Rule**

*   **Callable:** `make_test_decision`
    *   **Inputs:** `de_randomization_result` (`DeRandomizationResult`), `f_b_function` (`Callable`).
    *   **Process:** The function calculates the decision threshold by evaluating `(1 - tau) - f(B)`. It then compares the input `q_statistic` to this threshold.
    *   **Outputs:** A `TestDecisionResult` `NamedTuple` containing the boolean decision (`fail_to_reject_h0`), a human-readable `outcome` string, and the inputs to the decision for auditability.
    *   **Role in Research Pipeline:** This callable implements the final **hypothesis testing decision**. It applies the practical decision rule proposed in the paper to make a deterministic choice between the null and alternative hypotheses. It implements:
        *   **Decision Rule from Equation (3.12):** Fail to Reject H₀ if $$ Q_{N,T,B}(\tau) \ge (1 - \tau) - f(B) $$

--

#### **Task 9: Create Orchestrator Function**

*   **Callable:** `EmpiricalAnalysisOrchestrator`
    *   **Inputs:** `asset_returns` (`pd.DataFrame`), `factor_returns` (`pd.DataFrame`), `replication_config` (`Dict`), `seed` (`int`).
    *   **Process:** This class acts as the master controller for the entire empirical study. Its `__init__` method orchestrates the calls to the validation, cleaning, and setup functions. Its `run_analysis` method implements the nested loops over models and rolling windows, calling the full testing pipeline (Tasks 4-8) for each combination and collecting the results.
    *   **Outputs:** The `run_analysis` method returns a single, tidy, multi-indexed `pd.DataFrame` containing the detailed results of every test performed.
    *   **Role in Research Pipeline:** This is the **empirical study orchestrator**. It automates the entire process of applying the test to real data, managing the complexity of the rolling window analysis and the comparison of multiple factor models.

--

#### **Task 17 (Revised): Monte Carlo Robustness Analysis**

*   **Callable:** `analyze_power_by_sparsity`
    *   **Inputs:** `base_experiment` (`SimulationExperiment`), `n_jobs` (`int`).
    *   **Process:** The function iterates through a list of sparsity levels defined in the configuration. For each level, it modifies a copy of the base experiment's configuration to set the new sparsity, then calls the full `run_monte_carlo_simulation` engine (Task 15) for the alternative hypothesis. It collects the resulting power and confidence interval for each level.
    *   **Outputs:** A `pd.DataFrame` indexed by sparsity level, containing the empirical power and its confidence interval, suitable for plotting a power curve.
    *   **Role in Research Pipeline:** This callable performs a **robustness check on the test's power**. It systematically investigates how the test's ability to detect false nulls changes as the signal (the pervasiveness of mispricing) weakens.

*   **Callable:** `run_comparison_simulation`
    *   **Inputs:** `experiment` (`SimulationExperiment`), `n_jobs` (`int`), `seed` (`int`).
    *   **Process:** This function orchestrates a simulation for a single experimental cell, but for multiple statistical tests. Its helper, `_run_single_replication_for_comparison`, generates one dataset and then applies "OurTest", the "FLY" test, and the "AS" test to that same dataset, returning a dictionary of their decisions. The main function runs this `M` times (in parallel) for both the null and alternative hypotheses and aggregates the rejection rates for each test.
    *   **Outputs:** A `pd.DataFrame` with rows 'size' and 'power' and columns for each test ('OurTest', 'FLY', 'AS'), allowing for direct comparison of their statistical properties.
    *   **Role in Research Pipeline:** This callable performs a **comparative performance analysis**. It benchmarks the paper's proposed test against other methods from the literature under controlled, simulated conditions, which is a cornerstone of methodological research.

--

#### **Task 18: Results Compilation and Visualization (Monte Carlo)**

*   **Callable:** `create_mc_results_table`
    *   **Inputs:** `mc_results_df` (`pd.DataFrame`), `scenario_filter` (`str`).
    *   **Process:** The function takes the long-format "tidy" data from the Monte Carlo orchestrator. It filters for a specific scenario, then pivots the data twice: once for size and once for power, creating tables with `N` as the index and `T` as the columns. These two tables are then concatenated side-by-side and styled for publication.
    *   **Outputs:** A `pandas.Styler` object representing a formatted, publication-quality table.
    *   **Role in Research Pipeline:** This is a **reporting and summarization** tool. It transforms the raw simulation output into the standard tabular format used in econometrics papers (like Tables 5.1-5.6) for easy interpretation of size and power across different panel dimensions.

*   **Callable:** `plot_power_curves`
    *   **Inputs:** `power_curve_data` (`Dict[str, pd.DataFrame]`), `title` (`str`), `figsize` (`Tuple`).
    *   **Process:** The function takes one or more power curve DataFrames (as generated by `analyze_power_by_sparsity`). It plots the empirical power against the sparsity level for each test, adding a shaded confidence interval around the primary test's curve.
    *   **Outputs:** `matplotlib.figure.Figure` and `matplotlib.axes.Axes` objects.
    *   **Role in Research Pipeline:** This is a **visualization and reporting** tool. It creates a plot analogous to Figure 5.1, providing a clear visual representation of the test's power and how it compares to other methods as the alternative hypothesis changes.

--

#### **Task 19: Interpretation and Discussion**

*   **Callable:** `generate_results_summary`
    *   **Inputs:** `empirical_results` (`pd.DataFrame`), `mc_results` (`pd.DataFrame`), `replication_config` (`Dict`).
    *   **Process:** This function acts as a final analytical layer. It takes the summary tables and raw results from the previous stages and performs a series of quantitative comparisons. It ranks models, identifies best/worst performers in different regimes, programmatically checks for size distortions in the simulations, and identifies areas of low power.
    *   **Outputs:** A nested `Dict` containing a structured, quantitative summary of the key findings from the entire study.
    *   **Role in Research Pipeline:** This callable automates the **first-pass interpretation of results**. It distills the vast amount of numerical output from the study into a set of key, data-driven statements, guiding the researcher's attention to the most significant findings and ensuring a consistent, objective initial analysis.

--

#### **Task 20: Master Orchestrator**

*   **Callable:** `run_full_study`
    *   **Inputs:** `asset_returns` (`pd.DataFrame`), `factor_returns` (`pd.DataFrame`), `replication_config` (`Dict`), and control flags.
    *   **Process:** This is the top-level function. It sequentially calls the main orchestrators and compilation functions: `EmpiricalAnalysisOrchestrator`, `compile_empirical_results`, `plot_q_statistic_time_series`, `MonteCarloOrchestrator`, `create_mc_results_table`, and `generate_results_summary`. It manages the entire workflow from start to finish, with robust error handling for each major stage.
    *   **Outputs:** A single, comprehensive `Dict` where keys are descriptive names (e.g., `'empirical_summary_table'`) and values are the major artifacts produced during the study (DataFrames, plots, summary dicts).
    *   **Role in Research Pipeline:** This is the **master controller or main entry point**. It encapsulates the entire research project into a single, reproducible function call, ensuring that the study can be re-run from raw data to final interpretation with a single command.


### Usage Example

### **Example Usage: End-to-End Alpha Test Pipeline**

This example demonstrates how a researcher would use the complete software suite to replicate the study from "A general randomized test for Alpha". It covers the setup of input data, the definition of the configuration, the execution of the main pipeline, and the interpretation of the structured output.

#### **Step 1: Prepare Input Data**

The first step is to load and prepare the two required datasets: asset returns and factor returns. For this example, we will generate synthetic data that mimics the structure of real-world financial data.

```python
import pandas as pd
import numpy as np

# --- 1a: Generate Synthetic Asset Returns ---
# In a real application, this data would be loaded from a source like CRSP or Bloomberg.
# We simulate a panel of 500 assets over 40 years (480 months).
N_assets = 500
T_periods = 480
# Create a realistic monthly DatetimeIndex ending in the present.
dates = pd.to_datetime(pd.date_range(end='2024-12-31', periods=T_periods, freq='M'))
# Generate random return data (e.g., from a normal distribution).
asset_data = np.random.normal(loc=0.008, scale=0.05, size=(T_periods, N_assets))
# Create the DataFrame with appropriate column names.
asset_returns_df = pd.DataFrame(asset_data, index=dates, columns=[f'ASSET_{i+1}' for i in range(N_assets)])
# Introduce some missing data to simulate real-world conditions.
# This will be handled by our cleansing pipeline.
for col in asset_returns_df.columns[:50]:
    missing_indices = np.random.choice(T_periods, size=int(T_periods * 0.1), replace=False)
    asset_returns_df.iloc[missing_indices, asset_returns_df.columns.get_loc(col)] = np.nan

print("--- Sample of Asset Returns DataFrame ---")
print(asset_returns_df.head())
print("\n")

# --- 1b: Generate Synthetic Factor Returns ---
# In a real application, this would be downloaded from the Kenneth French Data Library.
# We need the 5 Fama-French factors plus the Momentum factor.
factor_names = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
# Generate random factor data.
factor_data = np.random.normal(loc=0.005, scale=0.03, size=(T_periods, len(factor_names)))
# Create the DataFrame.
factor_returns_df = pd.DataFrame(factor_data, index=dates, columns=factor_names)

print("--- Sample of Factor Returns DataFrame ---")
print(factor_returns_df.head())
print("\n")
```

#### **Step 2: Define the Study Configuration**

The second step is to define the `replication_config` dictionary. This object controls every aspect of the study. We will use the default configuration provided in the original prompt, which is a complete specification for replicating the paper's empirical and simulation studies.

```python
# This single object contains all parameters for both the empirical analysis
# and the Monte Carlo simulations.

replication_config = {
    # =========================================================================
    # PART I: MONTE CARLO SIMULATION PARAMETERS (SECTION 5 & APPENDIX A)
    # =========================================================================
    'monte_carlo': {
        'run_controls': {
            'M_replications': 100, # Reduced for a quick example run
            'N_grid': [100, 200],   # Reduced for a quick example run
            'T_grid': [100, 200],   # Reduced for a quick example run
            'scenarios': ['main', 'no_persistence'] # Reduced for a quick example
        },
        'dgp_base_params': {
            'K_factors': 3,
            'factor_mean': [0.53, 0.19, 0.19],
            'factor_var_matrix': {'type': 'diag', 'values': [-0.1, 0.2, -0.2]},
            'omitted_factor_loading_dist': {'type': 'uniform', 'args': (0.7, 0.9)},
            'factor_loading_dist': [
                {'type': 'uniform', 'args': (0.3, 1.8)},
                {'type': 'uniform', 'args': (-1.0, 1.0)},
                {'type': 'uniform', 'args': (-0.6, 0.9)},
            ],
            'error_dgp_types': {
                'gaussian': {'type': 'normal', 'args': (0, 1)}
            }
        },
        'hypothesis_params': {
            'null': {'type': 'zero'},
            'alternative': {
                'type': 'sparse_normal',
                'sparsity_levels': [0.05, 0.10, 0.15], # Reduced for example
                'default_sparsity': 0.05,
                'dist_args': (0, 1)
            }
        },
        'test_config': {
            'our_test': {
                'nu': 5.0,
                'tau': 0.05,
                'de_randomization': {
                    'B_function': lambda n: int(np.floor(np.log(n)**2)),
                    'f_B_function': lambda b: b**(-0.25)
                }
            }
        }
    },
    # =======================================================================
    # PART II: EMPIRICAL APPLICATION PARAMETERS (SECTION 6)             
    # =======================================================================
    'empirical': {
        'run_controls': {
            'rolling_window_size_months': 60,
            'start_date': '1985-01-01',
            'end_date': '2024-12-31',
        },
        'model_specifications': {
            'CAPM': ['Mkt-RF'],
            'FF3':  ['Mkt-RF', 'SMB', 'HML'],
            'FF5':  ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA'],
            'FF6':  ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA', 'MOM']
        },
        'test_config': {
            'our_test': {
                'nu': 4.0,
                'tau': 0.05,
                'de_randomization': {
                    'B_function': lambda n: int(np.floor(np.log(n)**2)),
                    'f_B_function': lambda b: b**(-0.25)
                }
            }
        },
        'reporting': {
            'crisis_periods': {
                'Global_Financial_Crisis':('2007-12-01', '2009-06-30'),
                'COVID_19_Pandemic':      ('2020-01-01', '2021-05-31')
            }
        }
    }
}

print("--- Replication Configuration Defined ---")
print("The study is now fully configured and ready to run.")
print("\n")
```

#### **Step 3: Execute the End-to-End Pipeline**

With the data and configuration prepared, we execute the entire study with a single call to the `run_full_study` function. We can use the control flags to run only the empirical part for this example to ensure a timely result.

```python
# Assume all the previously defined functions (run_full_study, etc.) are imported
# from the project's modules.

# Execute the full study. For this example, we will only run the empirical
# part to get a result quickly. Setting `run_monte_carlo=False` skips the
# most time-consuming part.
# `n_jobs=-1` would use all available CPU cores for parallelizable tasks.
# `seed=42` ensures the entire study is reproducible.

full_study_results = run_full_study(
    asset_returns=asset_returns_df,
    factor_returns=factor_returns_df,
    replication_config=replication_config,
    run_empirical=True,
    run_monte_carlo=False, # Set to False for a quick demonstration
    n_jobs=-1,
    seed=42
)

print("--- End-to-End Pipeline Execution Complete ---")
print("The 'full_study_results' dictionary now contains all artifacts of the study.")
print("\n")
```

#### **Step 4: Inspect and Use the Outputs**

The `run_full_study` function returns a dictionary containing all the major outputs. We can now easily access and analyze these artifacts.

```python
# --- 4a: Inspect the Empirical Results Summary Table ---
# The summary table is a styled pandas DataFrame, ideal for display in notebooks.
print("--- Empirical Results Summary Table ---")
# In a Jupyter environment, this would render a beautifully formatted table.
# In a script, we access the underlying data.
empirical_summary_table = full_study_results['empirical_summary_table']
display(empirical_summary_table) # Use display() in a notebook
# print(empirical_summary_table.data) # Or print the raw data in a script
print("\n")


# --- 4b: Inspect the Time Series Plot ---
# The output dictionary contains the matplotlib figure and axes objects.
print("--- Visualizing the Q-Statistic Time Series ---")
fig, ax = full_study_results['empirical_q_statistic_plot']
# This would display the plot in a Jupyter notebook or an interactive session.
plt.show()
# You can also save the figure to a file for a publication.
# fig.savefig("q_statistic_time_series.pdf", bbox_inches='tight')
print("\n")


# --- 4c: Inspect the Quantitative Interpretation ---
# The final summary provides a quantitative, text-based analysis.
print("--- Quantitative Summary of Findings ---")
final_summary = full_study_results['final_quantitative_summary']

# Print the empirical findings.
print("Empirical Findings:")
empirical_findings = final_summary['empirical_findings']
print(f"  - Best Overall Model: {empirical_findings['full_sample']['best_performing_model']}")
print(f"  - Worst Overall Model: {empirical_findings['full_sample']['worst_performing_model']}")
for period, finding in empirical_findings['crisis_analysis'].items():
    if isinstance(finding, str):
        print(f"  - {period.replace('_', ' ')}: {finding}")

# If we had run the Monte Carlo part, we could inspect those results too:
# if 'monte_carlo_findings' in final_summary:
#     mc_findings = final_summary['monte_carlo_findings']
#     print("\nMonte Carlo Findings:")
#     print(f"  - Size Control: {mc_findings['size_control']['summary']}")
#     print(f"  - Average Power: {mc_findings['power_analysis']['average_power']}")
```

This example provides a complete walkthrough of how to use the professional-grade pipeline. It demonstrates the simplicity of the final user interface (`run_full_study`), which abstracts away the immense complexity of the underlying econometric and computational machinery, delivering a robust, reproducible, and insightful analysis with a single function call.



In [None]:
# Task 1: Data and Parameter Validation

def _validate_dataframe_structure(
    df: pd.DataFrame,
    df_name: str,
    min_numeric_cols: int = 1
) -> None:
    """
    Validates the fundamental structure of a pandas DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame to validate.
        df_name (str): The name of the DataFrame for logging and error messages.
        min_numeric_cols (int): The minimum number of numeric columns required.

    Raises:
        TypeError: If the input is not a pandas DataFrame.
        ValueError: If the DataFrame is empty, lacks a DatetimeIndex, has an
                    incorrect frequency, or contains insufficient numeric data.
    """
    # Check if the input is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError(f"Input '{df_name}' must be a pandas DataFrame, but got {type(df)}.")

    # Check if the DataFrame is empty.
    if df.empty:
        raise ValueError(f"Input DataFrame '{df_name}' cannot be empty.")

    # Validate that the index is a DatetimeIndex.
    if not pd.api.types.is_datetime64_any_dtype(df.index):
        raise ValueError(f"Index of '{df_name}' must be a DatetimeIndex.")

    # Infer the frequency of the index.
    freq = pd.infer_freq(df.index)
    # Check if the frequency is monthly (Month-End).
    if freq not in ['M', 'ME']:
        logging.warning(
            f"Could not infer monthly frequency for '{df_name}'. "
            f"Inferred frequency: {freq}. Proceeding with caution."
        )

    # Ensure the DataFrame contains a sufficient number of numeric columns.
    numeric_cols = df.select_dtypes(include=np.number).columns
    if len(numeric_cols) < min_numeric_cols:
        raise ValueError(
            f"'{df_name}' must contain at least {min_numeric_cols} numeric "
            f"column(s), but found {len(numeric_cols)}."
        )

    logging.info(f"Basic structure validation passed for '{df_name}'.")


def _validate_returns_data(
    df: pd.DataFrame,
    df_name: str,
    max_nan_col_frac: float = 0.2,
    max_nan_total_frac: float = 0.5,
    outlier_z_threshold: float = 5.0
) -> None:
    """
    Performs specific validation for a returns DataFrame.

    Args:
        df (pd.DataFrame): The returns DataFrame to validate.
        df_name (str): The name of the DataFrame for logging.
        max_nan_col_frac (float): Maximum allowable fraction of NaNs per column.
        max_nan_total_frac (float): Maximum allowable fraction of NaNs in total.
        outlier_z_threshold (float): Z-score threshold to flag potential outliers.

    Raises:
        ValueError: If NaN or outlier checks fail.
    """
    # Calculate the total fraction of missing values.
    total_nan_frac = df.isna().sum().sum() / df.size
    # Check if the total missing value fraction exceeds the threshold.
    if total_nan_frac > max_nan_total_frac:
        raise ValueError(
            f"Total fraction of missing values in '{df_name}' is "
            f"{total_nan_frac:.2%}, which exceeds the threshold of {max_nan_total_frac:.2%}."
        )

    # Calculate the fraction of missing values for each column.
    nan_col_frac = df.isna().mean()
    # Check if any column's missing value fraction exceeds the threshold.
    if (nan_col_frac > max_nan_col_frac).any():
        problematic_cols = nan_col_frac[nan_col_frac > max_nan_col_frac].index.tolist()
        raise ValueError(
            f"Columns in '{df_name}' exceed the NaN threshold of {max_nan_col_frac:.2%}: "
            f"{problematic_cols}"
        )

    # Perform a preliminary check for extreme outliers using Z-scores.
    numeric_df = df.select_dtypes(include=np.number)
    # Calculate Z-scores, handling potential division by zero for constant columns.
    z_scores = np.abs(
        (numeric_df - numeric_df.mean()) / numeric_df.std().replace(0, np.nan)
    )
    # Count the number of outliers.
    outlier_count = (z_scores > outlier_z_threshold).sum().sum()
    # Issue a warning if a significant number of outliers are detected.
    if outlier_count > 0:
        logging.warning(
            f"Detected {outlier_count} potential outliers in '{df_name}' "
            f"(Z-score > {outlier_z_threshold}). Consider reviewing the data."
        )

    logging.info(f"Returns-specific data validation passed for '{df_name}'.")


def _validate_factor_data(
    factor_df: pd.DataFrame,
    required_cols: List[str]
) -> None:
    """
    Performs specific validation for a factor returns DataFrame.

    Args:
        factor_df (pd.DataFrame): The factor DataFrame to validate.
        required_cols (List[str]): A list of column names required to be in the DataFrame.

    Raises:
        ValueError: If required columns are missing or if factors are near-constant.
    """
    # Check for the presence of all required factor columns.
    missing_cols = set(required_cols) - set(factor_df.columns)
    if missing_cols:
        raise ValueError(
            f"Factor DataFrame is missing required columns: {sorted(list(missing_cols))}"
        )

    # Check for near-constant factors, which can cause issues in regression.
    factor_variances = factor_df[required_cols].var()
    # Identify columns with variance below a small threshold.
    near_constant_factors = factor_variances[factor_variances < 1e-12].index.tolist()
    if near_constant_factors:
        raise ValueError(
            f"The following factors are near-constant (variance < 1e-12) and may "
            f"cause numerical instability: {near_constant_factors}"
        )

    logging.info("Factor data validation passed.")


def _validate_config_dict(
    config: Dict[str, Any],
    schema: Dict[str, Any]
) -> None:
    """
    Recursively validates a nested dictionary against a schema.

    Args:
        config (Dict[str, Any]): The configuration dictionary to validate.
        schema (Dict[str, Any]): The schema to validate against. The schema uses
                                 types or nested dictionaries to define the expected structure.

    Raises:
        TypeError: If the config is not a dictionary or if a value has an incorrect type.
        ValueError: If a required key is missing or a parameter value is out of bounds.
    """
    # Check if the config object is a dictionary.
    if not isinstance(config, dict):
        raise TypeError(f"Configuration must be a dict, but got {type(config)}.")

    # Iterate through the keys and values of the schema.
    for key, expected_type in schema.items():
        # Check if a required key is missing from the config.
        if key not in config:
            raise ValueError(f"Missing required configuration key: '{key}'.")

        # Get the actual value from the config.
        actual_value = config[key]

        # If the expected type is a nested dictionary, recurse.
        if isinstance(expected_type, dict):
            _validate_config_dict(actual_value, expected_type)
        # If the expected type is a list, validate the type of its elements.
        elif isinstance(expected_type, list):
            if not isinstance(actual_value, list):
                raise TypeError(f"Expected list for key '{key}', but got {type(actual_value)}.")
            # This is a simplified check; a more robust implementation might check each element.
        # If the expected type is a callable (e.g., a lambda function).
        elif expected_type is Callable:
            if not callable(actual_value):
                raise TypeError(f"Expected a callable function for key '{key}', but got {type(actual_value)}.")
        # Otherwise, check the type directly.
        elif not isinstance(actual_value, expected_type):
            raise TypeError(
                f"Invalid type for key '{key}'. Expected {expected_type}, but got {type(actual_value)}."
            )

    # Perform specific value-based checks after structural validation.
    # This part can be expanded based on the full schema.
    if 'empirical' in config and 'test_config' in config['empirical']:
        nu = config['empirical']['test_config']['our_test']['nu']
        if not (isinstance(nu, (int, float)) and nu > 4):
            raise ValueError(f"Parameter 'nu' must be a number > 4, but got {nu}.")

        tau = config['empirical']['test_config']['our_test']['tau']
        if not (isinstance(tau, float) and 0 < tau < 1):
            raise ValueError(f"Parameter 'tau' must be a float between 0 and 1, but got {tau}.")

    logging.info("Configuration dictionary validation passed.")


def validate_inputs(
    asset_returns: pd.DataFrame,
    factor_returns: pd.DataFrame,
    replication_config: Dict[str, Any]
) -> bool:
    """
    Orchestrates the validation of all inputs for the Alpha Test Framework.

    This function serves as the main entry point for input validation. It checks
    the structural and statistical integrity of the asset and factor returns
    DataFrames, and validates the structure and parameter values of the
    configuration dictionary.

    Args:
        asset_returns (pd.DataFrame): A datetime-indexed DataFrame of monthly
                                      asset returns. Columns are asset tickers.
        factor_returns (pd.DataFrame): A datetime-indexed DataFrame of monthly
                                       factor returns (e.g., Fama-French).
        replication_config (Dict[str, Any]): A nested dictionary containing all
                                             parameters for the study.

    Returns:
        bool: True if all validations pass successfully.

    Raises:
        TypeError: If inputs are of the wrong type.
        ValueError: If inputs fail structural, content, or parameter checks.
    """
    try:
        # --- Step 1: Validate Asset Returns DataFrame ---
        logging.info("--- Starting Asset Returns Validation ---")
        # Perform basic structural checks on the asset returns DataFrame.
        _validate_dataframe_structure(asset_returns, 'asset_returns')
        # Perform data-specific checks for returns data.
        _validate_returns_data(asset_returns, 'asset_returns')
        logging.info("--- Asset Returns Validation Complete ---")

        # --- Step 2: Validate Factor Returns DataFrame ---
        logging.info("--- Starting Factor Returns Validation ---")
        # Perform basic structural checks on the factor returns DataFrame.
        _validate_dataframe_structure(factor_returns, 'factor_returns')
        # Define the list of required columns for the Fama-French 5-factor model.
        required_factors = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA']
        # Perform factor-specific data checks.
        _validate_factor_data(factor_returns, required_factors)
        logging.info("--- Factor Returns Validation Complete ---")

        # --- Step 3: Validate Configuration Dictionary ---
        logging.info("--- Starting Configuration Dictionary Validation ---")
        # Define a simplified schema for the configuration dictionary.
        # A full implementation would have a much more detailed schema.
        config_schema = {
            'monte_carlo': dict,
            'empirical': {
                'run_controls': dict,
                'model_specifications': dict,
                'test_config': {
                    'our_test': {
                        'nu': (int, float),
                        'tau': float,
                        'de_randomization': {
                            'B_function': Callable,
                            'f_B_function': Callable
                        }
                    }
                },
                'reporting': dict
            }
        }
        # Validate the configuration dictionary against the schema.
        _validate_config_dict(replication_config, config_schema)
        logging.info("--- Configuration Dictionary Validation Complete ---")

    except (TypeError, ValueError) as e:
        # Log the specific error that occurred during validation.
        logging.error(f"Input validation failed: {e}")
        # Re-raise the exception to halt execution.
        raise

    # If all checks pass, log a success message and return True.
    logging.info("All input validations passed successfully.")

    return True


In [None]:
# Task 2: Data Cleansing

def _handle_missing_returns(
    returns_df: pd.DataFrame,
    min_obs_frac_col: float,
    min_obs_frac_row: float,
    max_consecutive_fill: int
) -> Tuple[pd.DataFrame, Dict[str, List]]:
    """
    Handles missing values in the asset returns DataFrame with high rigor.

    The process involves:
    1. Filtering out assets with excessive missing data.
    2. Filtering out time periods with insufficient asset data.
    3. Applying a constrained forward-fill for small, isolated gaps.
    4. A final pruning of any assets that still contain NaNs.

    Args:
        returns_df (pd.DataFrame): The input DataFrame of asset returns.
        min_obs_frac_col (float): The minimum fraction of non-NaN observations
                                  required for an asset (column) to be kept.
        min_obs_frac_row (float): The minimum fraction of non-NaN assets
                                  (columns) required for a time period (row)
                                  to be kept.
        max_consecutive_fill (int): The maximum number of consecutive NaNs to
                                    forward-fill.

    Returns:
        Tuple[pd.DataFrame, Dict[str, List]]: A tuple containing:
            - The cleansed DataFrame.
            - A log dictionary detailing the cleansing actions.
    """
    # Work on a deep copy to avoid modifying the original DataFrame.
    df = returns_df.copy(deep=True)
    original_assets = set(df.columns)
    original_dates = set(df.index)

    # Initialize the log.
    log = {
        'assets_dropped_initial_filter': [],
        'dates_dropped_initial_filter': [],
        'imputations': []
    }

    # 1. Filter columns (assets) with too many missing values.
    min_obs_col = int(min_obs_frac_col * len(df))
    df.dropna(axis=1, thresh=min_obs_col, inplace=True)
    assets_after_col_filter = set(df.columns)
    log['assets_dropped_initial_filter'] = sorted(list(original_assets - assets_after_col_filter))
    logging.info(f"Dropped {len(log['assets_dropped_initial_filter'])} assets due to excessive NaNs.")

    # 2. Filter rows (dates) with too many missing values.
    min_obs_row = int(min_obs_frac_row * len(df.columns))
    df.dropna(axis=0, thresh=min_obs_row, inplace=True)
    dates_after_row_filter = set(df.index)
    log['dates_dropped_initial_filter'] = sorted(list(original_dates - dates_after_row_filter))
    logging.info(f"Dropped {len(log['dates_dropped_initial_filter'])} dates due to insufficient asset data.")

    # 3. Apply constrained forward-fill.
    for col in df.columns:
        # Identify consecutive NaN blocks.
        nan_blocks = df[col].isna().astype(int).groupby(df[col].notna().astype(int).cumsum()).sum()
        # Iterate through blocks that are small enough to fill.
        for block_start_idx, block_size in nan_blocks[nan_blocks <= max_consecutive_fill].items():
            if block_size > 0:
                # Find the actual start and end index of the NaN block in the DataFrame.
                start_loc = df.index.get_loc(df[col].iloc[block_start_idx - 1:block_start_idx + block_size].index[0])
                # Apply ffill only to this specific slice.
                df.iloc[start_loc:start_loc + block_size, df.columns.get_loc(col)] = df.iloc[start_loc-1:start_loc, df.columns.get_loc(col)].values[0]
                # Log the imputation.
                for i in range(int(block_size)):
                    log['imputations'].append({
                        'asset': col,
                        'date': df.index[start_loc + i],
                        'imputed_value': df.iloc[start_loc + i, df.columns.get_loc(col)]
                    })

    logging.info(f"Performed {len(log['imputations'])} constrained forward-fill imputations.")

    # 4. Final pruning: remove any columns that still have NaNs.
    assets_before_final_prune = set(df.columns)
    df.dropna(axis=1, how='any', inplace=True)
    assets_after_final_prune = set(df.columns)
    log['assets_dropped_final_prune'] = sorted(list(assets_before_final_prune - assets_after_final_prune))
    logging.info(f"Dropped {len(log['assets_dropped_final_prune'])} assets with remaining NaNs after imputation.")

    return df, log


def _treat_outliers_by_winsorization(
    returns_df: pd.DataFrame,
    lower_quantile: float,
    upper_quantile: float
) -> Tuple[pd.DataFrame, Dict[str, List]]:
    """
    Treats outliers by cross-sectional Winsorization for each time period.

    Args:
        returns_df (pd.DataFrame): The input DataFrame of asset returns.
        lower_quantile (float): The lower percentile for capping (e.g., 0.01).
        upper_quantile (float): The upper percentile for capping (e.g., 0.99).

    Returns:
        Tuple[pd.DataFrame, Dict[str, List]]: A tuple containing:
            - The DataFrame with outliers treated.
            - A log of all modifications.
    """
    # Work on a deep copy.
    df = returns_df.copy(deep=True)
    log = {'winsorizations': []}

    # Calculate cross-sectional quantiles for each time period (row).
    lower_bounds = df.quantile(q=lower_quantile, axis=1)
    upper_bounds = df.quantile(q=upper_quantile, axis=1)

    # Create a boolean DataFrame indicating where clipping will occur.
    is_clipped = (df.lt(lower_bounds, axis=0)) | (df.gt(upper_bounds, axis=0))

    # Log the changes before applying them.
    for date, row in is_clipped.iterrows():
        clipped_assets = row[row].index
        for asset in clipped_assets:
            original_value = df.loc[date, asset]
            # Determine the new value after clipping.
            new_value = np.clip(original_value, lower_bounds[date], upper_bounds[date])
            log['winsorizations'].append({
                'asset': asset,
                'date': date,
                'original_value': original_value,
                'new_value': new_value
            })

    # Apply the clipping (Winsorization) efficiently.
    df_winsorized = df.clip(lower=lower_bounds, upper=upper_bounds, axis=0)

    logging.info(f"Performed {len(log['winsorizations'])} outlier treatments via Winsorization.")

    return df_winsorized, log


def clean_and_align_data(
    asset_returns: pd.DataFrame,
    factor_returns: pd.DataFrame,
    min_obs_frac_col: float = 0.8,
    min_obs_frac_row: float = 0.5,
    max_consecutive_fill: int = 2,
    winsorize_quantiles: Tuple[float, float] = (0.01, 0.99),
    min_obs_for_analysis: int = 60
) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the data cleansing and alignment pipeline.

    This function takes raw asset and factor returns, handles missing values,
    treats outliers, and produces two perfectly aligned DataFrames ready for
    econometric analysis, along with a comprehensive log of all actions taken.

    Args:
        asset_returns (pd.DataFrame): Raw DataFrame of asset returns.
        factor_returns (pd.DataFrame): Raw DataFrame of factor returns.
        min_obs_frac_col (float): Min fraction of data for an asset to be kept.
        min_obs_frac_row (float): Min fraction of assets for a date to be kept.
        max_consecutive_fill (int): Max number of consecutive NaNs to fill.
        winsorize_quantiles (Tuple[float, float]): Lower and upper quantiles for
                                                   outlier treatment.
        min_obs_for_analysis (int): Minimum number of observations required for
                                    an asset to be included in the final universe.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]: A tuple containing:
            - The clean, aligned asset returns DataFrame.
            - The clean, aligned factor returns DataFrame.
            - A comprehensive log dictionary.
    """
    # --- Step 1: Handle Missing Values in Asset Returns ---
    logging.info("--- Starting Missing Value Handling ---")
    # Apply the rigorous missing value handling process.
    clean_returns, missing_log = _handle_missing_returns(
        asset_returns,
        min_obs_frac_col,
        min_obs_frac_row,
        max_consecutive_fill
    )

    # --- Step 2: Treat Outliers in Asset Returns ---
    logging.info("--- Starting Outlier Treatment ---")
    # Apply cross-sectional Winsorization to the cleaned returns.
    winsorized_returns, outlier_log = _treat_outliers_by_winsorization(
        clean_returns,
        winsorize_quantiles[0],
        winsorize_quantiles[1]
    )

    # --- Step 3: Align DataFrames and Final Filtering ---
    logging.info("--- Starting DataFrame Alignment and Final Filtering ---")
    # Determine the common date range by intersecting the indices.
    common_index = winsorized_returns.index.intersection(factor_returns.index)
    if common_index.empty:
        raise ValueError("No common dates found between asset and factor returns.")

    # Slice both DataFrames to the common date range.
    aligned_assets = winsorized_returns.loc[common_index].copy()
    aligned_factors = factor_returns.loc[common_index].copy()

    # Final asset filtering: ensure each asset has enough data for rolling analysis.
    obs_counts = aligned_assets.notna().sum()
    # Identify assets that meet the minimum observation requirement.
    assets_to_keep = obs_counts[obs_counts >= min_obs_for_analysis].index
    # Filter the asset returns DataFrame to keep only these viable assets.
    final_assets = aligned_assets[assets_to_keep]

    # Log the final filtering step.
    assets_dropped_final = sorted(list(set(aligned_assets.columns) - set(final_assets.columns)))
    logging.info(
        f"Dropped {len(assets_dropped_final)} assets with insufficient observations "
        f"for rolling analysis (min_obs_for_analysis={min_obs_for_analysis})."
    )
    logging.info(
        f"Final aligned dataset contains {len(final_assets.columns)} assets "
        f"and {len(final_assets)} time periods."
    )

    # Compile the comprehensive log.
    full_log = {
        'missing_value_handling': missing_log,
        'outlier_treatment': outlier_log,
        'alignment_and_filtering': {
            'common_start_date': common_index.min(),
            'common_end_date': common_index.max(),
            'n_common_periods': len(common_index),
            'assets_dropped_insufficient_history': assets_dropped_final,
            'final_n_assets': final_assets.shape[1],
            'final_n_periods': final_assets.shape[0]
        }
    }

    return final_assets, aligned_factors, full_log


In [None]:
# Task 3: Empirical Analysis Setup

def setup_empirical_analysis_windows(
    aligned_data_index: pd.DatetimeIndex,
    replication_config: Dict[str, Any]
) -> List[Tuple[pd.Timestamp, pd.Timestamp]]:
    """
    Sets up the rolling window schedule for the empirical analysis.

    This function reads the empirical analysis parameters from the configuration,
    validates them, determines the effective date range based on available data,
    and generates a list of (start_date, end_date) tuples for each rolling
    window.

    Args:
        aligned_data_index (pd.DatetimeIndex): The datetime index from the
                                               aligned and cleansed data, which
                                               defines the universe of available dates.
        replication_config (Dict[str, Any]): The study's configuration dictionary.

    Returns:
        List[Tuple[pd.Timestamp, pd.Timestamp]]: A list where each tuple contains
                                                 the start and end Timestamps for
                                                 one rolling window.

    Raises:
        ValueError: If configuration parameters are invalid, if the specified
                    date range has no overlap with the available data, or if
                    no full windows can be constructed.
    """
    # --- Step 1: Define and Validate Rolling Window Parameters ---
    try:
        # Extract the empirical run controls from the configuration.
        controls = replication_config['empirical']['run_controls']
        # Extract the window size in months.
        window_size = controls['rolling_window_size_months']
        # Extract the user-specified start date string.
        config_start_str = controls['start_date']
        # Extract the user-specified end date string.
        config_end_str = controls['end_date']
    except KeyError as e:
        # Handle cases where essential keys are missing in the config.
        logging.error(f"Missing key in replication_config['empirical']['run_controls']: {e}")
        raise ValueError(f"Configuration is missing required key for empirical setup: {e}")

    # Validate the type and value of the window size.
    if not isinstance(window_size, int) or window_size < 36:
        raise ValueError(
            f"rolling_window_size_months must be an integer >= 36, but got {window_size}."
        )

    # Parse date strings into pandas Timestamp objects for robust handling.
    try:
        config_start_date = pd.to_datetime(config_start_str)
        config_end_date = pd.to_datetime(config_end_str)
    except Exception as e:
        # Handle invalid date formats.
        logging.error(f"Could not parse dates from configuration: {e}")
        raise ValueError("Invalid date format in configuration. Please use 'YYYY-MM-DD'.")

    # Validate the chronological order of the configured dates.
    if config_start_date >= config_end_date:
        raise ValueError("Configuration 'start_date' must be before 'end_date'.")

    # Determine the effective date range by intersecting config range with available data.
    data_start_date = aligned_data_index.min()
    data_end_date = aligned_data_index.max()

    # The effective start is the later of the two start dates.
    effective_start_date = max(config_start_date, data_start_date)
    # The effective end is the earlier of the two end dates.
    effective_end_date = min(config_end_date, data_end_date)

    # Check if there is a valid overlapping period for the analysis.
    if effective_start_date >= effective_end_date:
        raise ValueError(
            "No overlapping date range between configured dates and available data. "
            f"Config Range: {config_start_date.date()} to {config_end_date.date()}. "
            f"Data Range: {data_start_date.date()} to {data_end_date.date()}."
        )

    logging.info(f"Empirical analysis parameters validated. Effective date range: "
                 f"{effective_start_date.date()} to {effective_end_date.date()}.")

    # --- Step 2: Generate the List of Rolling Windows ---

    # The first possible end date for a full window.
    first_window_end = effective_start_date + pd.DateOffset(months=window_size - 1)

    # Check if the data range is long enough to form at least one window.
    if first_window_end > effective_end_date:
        raise ValueError(
            f"The effective data range ({effective_start_date.date()} to "
            f"{effective_end_date.date()}) is shorter than the window size "
            f"of {window_size} months. No full windows can be constructed."
        )

    # Generate all possible month-end dates that can serve as the end of a window.
    # We use the aligned data index to ensure our window dates are actual data points.
    possible_end_dates = aligned_data_index[
        (aligned_data_index >= first_window_end) &
        (aligned_data_index <= effective_end_date)
    ]

    # If no valid end dates are found, no windows can be created.
    if possible_end_dates.empty:
        raise ValueError(
            "Could not form any valid rolling windows. Check alignment of data "
            "and configured date range."
        )

    # Construct the list of (start_date, end_date) tuples.
    windows = []
    for end_date in possible_end_dates:
        # Calculate the start date for the current window.
        # Note: This start date might not be an exact data point, but defines the slice.
        start_date = end_date - pd.DateOffset(months=window_size - 1)
        # Append the window tuple to the list.
        windows.append((start_date, end_date))

    logging.info(f"Successfully generated {len(windows)} rolling windows of "
                 f"{window_size} months each.")

    return windows


In [None]:
# Task 4: Model Estimation

class FactorModelEstimationResult(NamedTuple):
    """
    A structured container for the results of factor model estimation.

    Attributes:
        alphas (pd.Series): A series of estimated alphas (α̂), indexed by asset.
        betas (pd.DataFrame): A DataFrame of estimated betas (β̂), with assets
                              as the index and factors as columns.
        residuals (pd.DataFrame): A DataFrame of residuals (û), with the same
                                  dimensions and indices as the input asset returns.
        condition_number (float): The condition number of the regressor matrix,
                                  used to diagnose multicollinearity.
    """
    alphas: pd.Series
    betas: pd.DataFrame
    residuals: pd.DataFrame
    condition_number: float

def estimate_factor_model_params(
    asset_returns_window: pd.DataFrame,
    factor_returns_window: pd.DataFrame
) -> FactorModelEstimationResult:
    """
    Estimates alpha, beta, and residuals for a panel of assets using OLS.

    This function performs a time-series regression for each asset against the
    provided factors for a specific time window. It is highly optimized to
    estimate parameters for all assets simultaneously using matrix algebra.

    The regression model for each asset `i` is:
    y_i,t = α_i + β'_i * f_t + u_i,t

    Args:
        asset_returns_window (pd.DataFrame): A (T x N) DataFrame of asset
                                             returns for the estimation window.
                                             Index is datetime, columns are assets.
        factor_returns_window (pd.DataFrame): A (T x K) DataFrame of factor
                                              returns for the same window.
                                              Index is datetime, columns are factors.

    Returns:
        FactorModelEstimationResult: A NamedTuple containing the estimated alphas,
                                     betas, residuals, and the regressor matrix
                                     condition number.

    Raises:
        ValueError: If input DataFrames are not aligned, empty, or contain NaNs.
        np.linalg.LinAlgError: If the regression fails due to a singular matrix.
    """
    # --- Input Validation ---
    # Check for empty inputs.
    if asset_returns_window.empty or factor_returns_window.empty:
        raise ValueError("Input DataFrames for estimation cannot be empty.")

    # Check for perfect temporal alignment.
    if not asset_returns_window.index.equals(factor_returns_window.index):
        raise ValueError("Asset and factor returns indices are not aligned.")

    # Ensure no missing values are present in the estimation window.
    if asset_returns_window.isna().any().any() or factor_returns_window.isna().any().any():
        raise ValueError("Input DataFrames for estimation must not contain NaN values.")

    # --- Step 1: Data Preparation and Matrix Construction ---

    # Extract dimensions for clarity and validation.
    T, N = asset_returns_window.shape
    _T, K = factor_returns_window.shape

    # Assert that the time dimension T is consistent.
    assert T == _T, "Mismatch in number of time periods (rows)."

    # Convert pandas DataFrames to NumPy arrays for performance, ensuring float64.
    Y = asset_returns_window.to_numpy(dtype=np.float64)
    F = factor_returns_window.to_numpy(dtype=np.float64)

    # Create the regressor matrix X by adding an intercept column of ones.
    # X will have dimensions T x (K+1).
    X = np.hstack([np.ones((T, 1), dtype=np.float64), F])

    # --- Step 2: OLS Estimation using Numerically Stable Solver ---

    # Check for potential multicollinearity before solving.
    # This is a crucial step for numerical stability.
    xtx = X.T @ X
    condition_number = np.linalg.cond(xtx)

    # Log a warning if the matrix is ill-conditioned.
    if condition_number > 1e3: # A common threshold for moderate multicollinearity
        logging.warning(
            f"High condition number detected: {condition_number:.2e}. "
            "OLS estimates may be unstable due to multicollinearity."
        )

    try:
        # Solve the normal equations (X'X)B = X'Y for B.
        # This is numerically superior to computing the inverse of X'X.
        # B is the (K+1) x N matrix of coefficients.
        # Equation: B̂ = (X'X)^-1 * X'Y
        coefficients = np.linalg.solve(xtx, X.T @ Y)
    except np.linalg.LinAlgError as e:
        # Handle cases where the matrix is singular and cannot be solved.
        logging.error(f"Linear algebra error during OLS estimation: {e}")
        raise

    # --- Step 3: Extract Parameters and Calculate Residuals ---

    # The first row of the coefficient matrix contains the alphas.
    # Equation: α̂_i = ȳ_i - β̂'_i * f̄ (This is implicitly calculated by the regression)
    alphas_arr = coefficients[0, :]

    # The remaining rows contain the betas.
    betas_arr = coefficients[1:, :]

    # Calculate the predicted values (Y_hat) for all assets.
    # Equation: Ŷ = X * B̂
    Y_hat = X @ coefficients

    # Calculate the residuals (û).
    # Equation: û = Y - Ŷ
    residuals_arr = Y - Y_hat

    # --- Step 4: Format and Return Results ---

    # Convert the NumPy array results back to labeled pandas objects.
    alphas_series = pd.Series(
        alphas_arr,
        index=asset_returns_window.columns,
        name='alpha'
    )
    betas_df = pd.DataFrame(
        betas_arr.T, # Transpose to get N x K shape
        index=asset_returns_window.columns,
        columns=factor_returns_window.columns
    )
    residuals_df = pd.DataFrame(
        residuals_arr,
        index=asset_returns_window.index,
        columns=asset_returns_window.columns
    )

    # Package the results into the structured NamedTuple.
    result = FactorModelEstimationResult(
        alphas=alphas_series,
        betas=betas_df,
        residuals=residuals_df,
        condition_number=condition_number
    )

    logging.info(f"Successfully estimated parameters for {N} assets and {K} factors over {T} periods.")

    return result


In [None]:
# Task 5: Test Statistic Computation

def _calculate_scaling_factor(
    residuals: pd.DataFrame
) -> float:
    """
    Calculates the scaling factor s_NT based on squared OLS residuals.

    This function implements Equation (3.3) from the paper.
    Equation (3.3): s_NT = sqrt((1/NT) * Σ_{i=1 to N} Σ_{t=1 to T} û²_i,t)

    Args:
        residuals (pd.DataFrame): A (T x N) DataFrame of model residuals (û).

    Returns:
        float: The scalar scaling factor s_NT.
    """
    # Convert the residuals DataFrame to a NumPy array for efficient computation.
    residuals_arr = residuals.to_numpy(dtype=np.float64)

    # Square all residual values element-wise.
    # Corresponds to: û²_i,t
    squared_residuals = np.square(residuals_arr)

    # Calculate the mean of all squared residuals.
    # This is equivalent to (1/NT) * ΣΣ û²_i,t
    mean_squared_residuals = np.mean(squared_residuals)

    # Take the square root to get the final scaling factor.
    s_nt = np.sqrt(mean_squared_residuals)

    # Ensure the scaling factor is strictly positive to prevent division by zero.
    # This adds a "nugget" for numerical stability.
    s_nt_stable = np.fmax(s_nt, 1e-12)

    return float(s_nt_stable)


def compute_psi_statistic(
    estimation_result: FactorModelEstimationResult,
    nu: float
) -> pd.Series:
    """
    Computes the transformed psi (ψ) statistic for each asset.

    This function implements the core transformation of the test, as specified
    in Equation (3.2) of the paper. It takes the estimated alphas and residuals,
    calculates the necessary scaling factors, and applies the non-linear
    transformation.

    Equation (3.2): ψ_i,NT = (|T^(1/ν) * α̂_i,T| / s_NT)^(ν/2)

    Args:
        estimation_result (FactorModelEstimationResult): A NamedTuple containing
            the outputs from the `estimate_factor_model_params` function.
        nu (float): The moment parameter ν, which must be greater than 4 as per
                    the paper's theoretical requirements.

    Returns:
        pd.Series: A pandas Series containing the calculated ψ_i,NT value for
                   each asset, indexed by the asset identifiers.

    Raises:
        ValueError: If the parameter nu is not > 4.
    """
    # --- Input Validation ---
    # Validate the moment parameter nu, a critical assumption of the test.
    if not (isinstance(nu, (int, float)) and nu > 4):
        raise ValueError(f"The moment parameter 'nu' (ν) must be a number strictly greater than 4, but got {nu}.")

    # Extract necessary components from the estimation result.
    alphas = estimation_result.alphas
    residuals = estimation_result.residuals

    # Get the number of time periods (T) from the residuals DataFrame.
    T = residuals.shape[0]

    # --- Step 1: Calculate the Scaling Factor s_NT ---
    # This helper function implements Equation (3.3).
    s_nt = _calculate_scaling_factor(residuals)

    # --- Step 2: Apply the Psi Transformation ---

    # Calculate the time-scaling component: T^(1/ν)
    time_scaler = np.power(T, 1.0 / nu, dtype=np.float64)

    # Calculate the numerator of the base: |T^(1/ν) * α̂_i,T|
    # We use .abs() for the absolute value of the alphas.
    numerator = time_scaler * alphas.abs()

    # Calculate the base of the exponentiation: numerator / s_NT
    base = numerator / s_nt

    # Check for potential overflow before final exponentiation.
    if base.max() > 1e10:
        logging.warning(
            f"Large base value ({base.max():.2e}) detected before final "
            "exponentiation. Results for psi may be inf."
        )

    # Calculate the final psi statistic by raising the base to the power of ν/2.
    # This implements the full Equation (3.2).
    psi_values = np.power(base, nu / 2.0, dtype=np.float64)

    # Ensure the final result is a pandas Series with the correct name and index.
    psi_values.name = 'psi_statistic'

    logging.info(f"Successfully computed psi statistics for {len(psi_values)} assets.")

    return psi_values


In [None]:
# Task 6: Randomization and Test Statistic

def _generate_omega_shocks(
    n: int,
    rng: np.random.Generator
) -> np.ndarray:
    """
    Generates N i.i.d. standard normal random shocks.

    Args:
        n (int): The number of shocks to generate (should equal the number of assets, N).
        rng (np.random.Generator): The random number generator instance to ensure
                                   reproducibility and control over the random state.

    Returns:
        np.ndarray: An array of shape (n,) containing the random shocks.
    """
    # Generate n random variates from the standard normal distribution.
    # Using the Generator instance is the modern, preferred NumPy API.
    return rng.standard_normal(size=n, dtype=np.float64)


def compute_randomized_test_statistic(
    psi_values: pd.Series,
    rng: np.random.Generator
) -> float:
    """
    Computes one realization of the randomized test statistic, Z_N,T.

    This function executes the core randomization step of the testing procedure.
    It takes the deterministic ψ statistics, adds a random standard normal shock
    to each, and then finds the maximum value, which is the test statistic.

    The procedure implements the following equations from the paper:
    1. z_i,NT = ψ_i,NT + ω_i, where ω_i ~ i.i.d. N(0, 1)
    2. Z_N,T = max_{1≤i≤N} z_i,NT

    Args:
        psi_values (pd.Series): A pandas Series of the calculated ψ_i,NT values,
                                indexed by asset identifiers.
        rng (np.random.Generator): The random number generator instance to use for
                                   generating the ω shocks. This is critical for
                                   reproducibility, especially within the
                                   de-randomization loop.

    Returns:
        float: The scalar value of the final test statistic, Z_N,T.

    Raises:
        ValueError: If the input psi_values Series is empty or contains NaNs.
    """
    # --- Input Validation ---
    # Check if the input Series is empty.
    if psi_values.empty:
        raise ValueError("Input 'psi_values' Series cannot be empty.")

    # The psi statistics must be finite and non-negative. NaNs are not allowed.
    if not np.all(np.isfinite(psi_values)) or (psi_values < 0).any():
        raise ValueError("Input 'psi_values' must contain only finite, non-negative numbers.")

    # Get the number of assets (N) from the length of the Series.
    N = len(psi_values)

    # --- Step 1: Generate Random Shocks (ω_i) ---
    # Generate N i.i.d. standard normal shocks using the provided generator.
    omega_shocks = _generate_omega_shocks(n=N, rng=rng)

    # --- Step 2: Compute Perturbed Statistics (z_i,NT) ---
    # Convert psi Series to NumPy array for efficient, non-indexed addition.
    psi_array = psi_values.to_numpy(dtype=np.float64)

    # Add the random shocks to the psi statistics element-wise.
    # Equation: z_i,NT = ψ_i,NT + ω_i
    z_array = psi_array + omega_shocks

    # --- Step 3: Compute Final Test Statistic (Z_N,T) ---
    # Find the maximum value among all z statistics.
    # Equation: Z_N,T = max_{1≤i≤N} z_i,NT
    z_nt_statistic = np.max(z_array)

    # The result must be a standard Python float.
    return float(z_nt_statistic)


In [None]:
# Task 7: De-randomization

class DeRandomizationResult(NamedTuple):
    """
    Structured container for the results of the de-randomization procedure.

    Attributes:
        q_statistic (float): The final Q_N,T,B(τ) statistic.
        critical_value (float): The asymptotic critical value c_τ used.
        b_replications (int): The number of replications (B) performed.
        tau (float): The significance level (τ) of the test.
    """
    q_statistic: float
    critical_value: float
    b_replications: int
    tau: float

# Configure a basic logger for the de-randomization module
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def _compute_asymptotic_critical_value(N: int, tau: float) -> float:
    """
    Computes the asymptotic critical value c_τ from the Gumbel distribution.

    This function implements the calculation of normalization constants a_N and b_N,
    and the final critical value c_τ as specified in Section 3 of the paper,
    leading up to Equation (3.6).

    Equation for b_N: b_N = sqrt(2logN) - (log(logN) + log(4π)) / (2*sqrt(2logN))
    Equation for a_N: a_N = b_N / (1 + b_N^2)
    Equation (3.6): c_τ = b_N - a_N * log(-log(1 - τ))

    Args:
        N (int): The number of assets (cross-sectional dimension).
        tau (float): The significance level of the test (e.g., 0.05).

    Returns:
        float: The scalar asymptotic critical value, c_τ.
    """
    # Validate inputs for theoretical soundness.
    if N < 3:
        raise ValueError(f"N must be >= 3 for asymptotic constants to be well-defined, but got {N}.")
    if not (0 < tau < 1):
        raise ValueError(f"Significance level tau must be in (0, 1), but got {tau}.")

    # Use float64 for all intermediate calculations to maintain precision.
    N_f = float(N)

    # Calculate log(N) and log(log(N)).
    log_N = np.log(N_f)
    log_log_N = np.log(log_N)
    sqrt_2_log_N = np.sqrt(2 * log_N)

    # Calculate the b_N normalization constant.
    b_N = sqrt_2_log_N - (log_log_N + np.log(4 * math.pi)) / (2 * sqrt_2_log_N)

    # Calculate the a_N normalization constant.
    a_N = b_N / (1 + np.square(b_N))

    # Calculate the final critical value c_τ using the Gumbel quantile function.
    c_tau = b_N - a_N * np.log(-np.log(1 - tau))

    return float(c_tau)


def perform_de_randomization(
    psi_values: pd.Series,
    tau: float,
    b_function: Callable[[int], int],
    seed: Optional[int] = None
) -> DeRandomizationResult:
    """
    Performs the de-randomization procedure to obtain a deterministic test outcome.

    This function orchestrates the entire de-randomization process:
    1. Calculates the number of replications, B.
    2. Computes the asymptotic critical value, c_τ.
    3. Runs a loop B times, each time computing a new randomized test statistic Z_N,T.
    4. Computes the Q statistic, which is the fraction of times Z_N,T <= c_τ.

    Args:
        psi_values (pd.Series): A pandas Series of the calculated ψ_i,NT values.
        tau (float): The significance level of the test (e.g., 0.05).
        b_function (Callable[[int], int]): A function that takes N and returns B,
                                           the number of replications.
        seed (Optional[int]): A seed for the random number generator to ensure
                              reproducibility.

    Returns:
        DeRandomizationResult: A NamedTuple containing the Q statistic and other
                               relevant parameters of the procedure.
    """
    # --- Input Validation ---
    if not isinstance(psi_values, pd.Series):
        raise TypeError(f"psi_values must be a pandas Series, but got {type(psi_values)}.")
    if psi_values.empty:
        raise ValueError("psi_values cannot be empty.")

    # Get the number of assets, N.
    N = len(psi_values)

    # --- Step 1: Calculate B and the Critical Value c_τ ---

    # Calculate the number of replications B using the provided function.
    # As per Theorem 3.2, B should be at least O(log^2 N).
    B = b_function(N)
    if not isinstance(B, int) or B <= 0:
        raise ValueError("b_function must return a positive integer.")

    # Compute the asymptotic critical value.
    critical_value = _compute_asymptotic_critical_value(N=N, tau=tau)

    logging.info(f"Starting de-randomization with N={N}, B={B}, τ={tau}, c_τ={critical_value:.4f}.")

    # --- Step 2: B-Replication Loop ---

    # Initialize a modern random number generator with the provided seed.
    # This is crucial for reproducibility.
    rng = np.random.default_rng(seed)

    # Pre-allocate a NumPy array to store the B realizations of the test statistic.
    z_statistics = np.empty(B, dtype=np.float64)

    # Loop B times to generate the distribution of the randomized statistic.
    for b in range(B):
        # In each iteration, compute one realization of Z_N,T.
        # The same rng object is used, ensuring its state evolves and
        # produces independent shocks for each replication.
        z_statistics[b] = compute_randomized_test_statistic(psi_values, rng)

    # --- Step 3: Compute the Q Statistic ---

    # Compare each Z statistic to the critical value to get a boolean array.
    # This implements the indicator function I(Z_N,T^(b) <= c_τ).
    indicator_results = (z_statistics <= critical_value)

    # Calculate the mean of the boolean array to get the Q statistic.
    # This implements Equation (3.7): Q_N,T,B(τ) = (1/B) * Σ I(...)
    q_statistic = np.mean(indicator_results)

    logging.info(f"De-randomization complete. Q-statistic: {q_statistic:.4f}.")

    # Package and return the results in a structured format.
    return DeRandomizationResult(
        q_statistic=float(q_statistic),
        critical_value=critical_value,
        b_replications=B,
        tau=tau
    )


In [None]:
# Task 8: Decision Rule

class TestDecisionResult(NamedTuple):
    """
    Structured container for the final hypothesis test decision.

    Attributes:
        q_statistic (float): The calculated Q_N,T,B(τ) statistic.
        decision_threshold (float): The threshold against which the Q statistic
                                    was compared. Calculated as (1-τ) - f(B).
        fail_to_reject_h0 (bool): The outcome of the test. True if the null
                                  hypothesis is not rejected, False if it is.
        outcome (str): A human-readable string describing the test result.
        de_randomization_result (DeRandomizationResult): The full result object
            from the de-randomization step for complete auditability.
    """
    q_statistic: float
    decision_threshold: float
    fail_to_reject_h0: bool
    outcome: str
    de_randomization_result: DeRandomizationResult

def make_test_decision(
    de_randomization_result: DeRandomizationResult,
    f_b_function: Callable[[int], float]
) -> TestDecisionResult:
    """
    Makes a final, deterministic decision on the null hypothesis of zero alpha.

    This function applies the decision rule from Section 3.1 of the paper,
    specifically the practical rule suggested in Equation (3.12). It compares
    the de-randomized Q statistic against a threshold to determine whether to
    reject or fail to reject the null hypothesis.

    Decision Rule (from Eq. 3.12):
    Fail to Reject H₀ if Q_N,T,B(τ) ≥ (1 - τ) - f(B)

    Args:
        de_randomization_result (DeRandomizationResult): The output from the
            `perform_de_randomization` function.
        f_b_function (Callable[[int], float]): A user-specified, non-increasing
            function of B that defines the tolerance for the decision rule.
            The paper suggests f(B) = B**(-0.25).

    Returns:
        TestDecisionResult: A NamedTuple containing the binary decision, the
                            threshold used, and other relevant statistics for
                            a complete and auditable result.

    Raises:
        TypeError: If f_b_function is not a callable or returns a non-numeric type.
        ValueError: If the result from f_b_function is not a small positive number.
    """
    # --- Input Validation ---
    # Validate that the f_b_function is a valid callable.
    if not callable(f_b_function):
        raise TypeError(f"f_b_function must be a callable, but got {type(f_b_function)}.")

    # Extract necessary values from the input result object for clarity.
    q_statistic = de_randomization_result.q_statistic
    B = de_randomization_result.b_replications
    tau = de_randomization_result.tau

    # --- Step 1: Evaluate the f(B) Function ---
    try:
        # Calculate the tolerance factor f(B).
        f_b_value = f_b_function(B)
    except Exception as e:
        # Catch potential errors during the execution of the provided lambda.
        logging.error(f"The provided f_b_function failed with error: {e}")
        raise TypeError("f_b_function could not be executed.")

    # Validate the output of the f(B) function.
    if not isinstance(f_b_value, (int, float)) or not (0 < f_b_value < 1):
        raise ValueError(
            f"f_b_function must return a small positive float, but returned {f_b_value}."
        )

    # --- Step 2: Calculate Threshold and Make Decision ---

    # Calculate the decision threshold based on the paper's rule.
    # Equation: threshold = (1 - τ) - f(B)
    decision_threshold = (1 - tau) - f_b_value

    # Apply the decision rule by comparing the Q statistic to the threshold.
    # Equation: Q_N,T,B(τ) ≥ (1 - τ) - f(B)
    fail_to_reject_h0 = (q_statistic >= decision_threshold)

    # Create a human-readable outcome string for clarity in reporting.
    outcome = "Fail to Reject H₀" if fail_to_reject_h0 else "Reject H₀"

    logging.info(
        f"Test Decision: Q-stat={q_statistic:.4f}, Threshold={decision_threshold:.4f} -> {outcome}"
    )

    # Package the final, comprehensive result into the TestDecisionResult NamedTuple.
    return TestDecisionResult(
        q_statistic=q_statistic,
        decision_threshold=decision_threshold,
        fail_to_reject_h0=fail_to_reject_h0,
        outcome=outcome,
        de_randomization_result=de_randomization_result
    )


In [None]:
# Task 9: Orchestrator Function

class EmpiricalAnalysisOrchestrator:
    """
    Orchestrates the end-to-end empirical analysis for the randomized alpha test.

    This class provides a professional-grade, robust pipeline for executing the
    entire testing procedure as described in Massacci, Sarno, Trapani, and
    Vallarino. It manages the workflow from raw data ingestion to the final
    results compilation, ensuring methodological rigor, reproducibility, and
    transparent logging at every step.

    The workflow is as follows:
    1. Initialization: Validates all inputs, cleans and aligns data, and
       pre-computes the rolling window schedule.
    2. Execution: The `run_analysis` method iterates through all specified
       factor models and rolling windows, applying the full test procedure.
    3. Output: Produces a clean, multi-indexed pandas DataFrame containing
       the detailed results for every test performed.
    """

    def __init__(
        self,
        asset_returns: pd.DataFrame,
        factor_returns: pd.DataFrame,
        replication_config: Dict[str, Any],
        seed: Optional[int] = 42
    ):
        """
        Initializes the orchestrator, validates, cleans, and prepares data.

        Args:
            asset_returns (pd.DataFrame): Raw DataFrame of asset returns.
            factor_returns (pd.DataFrame): Raw DataFrame of factor returns.
            replication_config (Dict[str, Any]): The study's configuration dict.
            seed (Optional[int]): A global seed for all random operations to
                                  ensure full reproducibility of the analysis.
        """
        # --- Initialization and State Setup ---
        logger.info("Initializing EmpiricalAnalysisOrchestrator...")

        # Create a master random number generator from the seed and store it.
        # This ensures that the entire analysis, including all de-randomization
        # loops, is fully reproducible.
        self.rng = np.random.default_rng(seed)

        # Store a deep copy of the configuration to prevent external modifications.
        self.config = copy.deepcopy(replication_config)

        # --- Data Validation, Cleansing, and Setup ---
        # Note: In a real package, the functions would be imported. Here we assume
        # they are available in the execution context.

        # Step 1: Validate raw inputs. This is a critical first gate.
        logger.info("Validating raw input data and configuration...")
        validate_inputs(asset_returns, factor_returns, self.config)

        # Step 2: Clean and align the data.
        logger.info("Cleaning and aligning input data...")
        # The clean_and_align_data function handles missing values, outliers,
        # and temporal alignment, returning the analysis-ready datasets.
        self.asset_returns, self.factor_returns, self.cleansing_log = \
            clean_and_align_data(
                asset_returns,
                factor_returns,
                # These parameters would ideally be in the config, but we use
                # robust defaults as specified in the task description.
                min_obs_frac_col=0.8,
                min_obs_frac_row=0.5,
                max_consecutive_fill=2,
                winsorize_quantiles=(0.01, 0.99),
                min_obs_for_analysis=self.config['empirical']['run_controls']['rolling_window_size_months']
            )

        # Step 3: Set up the rolling window schedule.
        logger.info("Setting up rolling window analysis schedule...")
        # This function generates the list of (start, end) date tuples.
        self.windows = setup_empirical_analysis_windows(
            self.asset_returns.index,
            self.config
        )

        logger.info(
            f"Orchestrator initialized successfully. "
            f"Data shape: {self.asset_returns.shape}. "
            f"Number of windows: {len(self.windows)}."
        )

    def _run_test_for_single_window(
        self,
        model_name: str,
        start_date: pd.Timestamp,
        end_date: pd.Timestamp
    ) -> Optional[Tuple[TestDecisionResult, int, int]]:
        """
        Executes the full test procedure for a single model in a single window.

        This is the core computational unit, performing estimation, statistic
        computation, de-randomization, and decision making.

        Args:
            model_name (str): The name of the factor model to test.
            start_date (pd.Timestamp): The start date of the window.
            end_date (pd.Timestamp): The end date of the window.

        Returns:
            An optional tuple containing the TestDecisionResult and the
            dimensions (N, T) used in the test. Returns None if the window
            data is insufficient for testing.
        """
        # Extract the list of factor names for the specified model.
        factor_names = self.config['empirical']['model_specifications'][model_name]

        # Slice the master dataframes for the current window.
        asset_window_raw = self.asset_returns.loc[start_date:end_date]
        factor_window = self.factor_returns.loc[start_date:end_date, factor_names]

        # CRITICAL STEP: Drop assets with any missing data *within this specific window*.
        # This ensures a balanced panel for the regression.
        asset_window = asset_window_raw.dropna(axis=1, how='any')

        # Get the final dimensions (T, N, K) for this specific test.
        T, N = asset_window.shape
        _T, K = factor_window.shape

        # Check if the remaining data is sufficient for OLS.
        # We need more observations (T) than regressors (K+1) and at least one asset.
        if N == 0 or T < K + 2:
            logger.warning(
                f"Skipping window {end_date.date()} for model {model_name} due to "
                f"insufficient data after window-specific cleaning (N={N}, T={T}, K={K})."
            )
            return None

        # Extract the specific test parameters from the configuration.
        test_params = self.config['empirical']['test_config']['our_test']
        nu = test_params['nu']
        tau = test_params['de_randomization']['tau']
        b_function = test_params['de_randomization']['B_function']
        f_b_function = test_params['de_randomization']['f_B_function']

        # --- Execute the Full Testing Pipeline (Tasks 4-8) ---

        # Task 4: Estimate model parameters.
        estimation_result = estimate_factor_model_params(asset_window, factor_window)

        # Task 5: Compute the psi statistic.
        psi_values = compute_psi_statistic(estimation_result, nu=nu)

        # Task 7: Perform the de-randomization procedure.
        de_rand_result = perform_de_randomization(
            psi_values, tau=tau, b_function=b_function, rng=self.rng
        )

        # Task 8: Make the final test decision.
        decision = make_test_decision(de_rand_result, f_b_function)

        return decision, N, T

    def run_analysis(self) -> pd.DataFrame:
        """
        Executes the full empirical analysis across all models and windows.

        This method iterates through all model specifications and all rolling
        windows, calling the testing procedure for each combination and
        compiling the results into a final, structured DataFrame.

        Returns:
            pd.DataFrame: A multi-indexed DataFrame containing the detailed
                          results of every test performed. The index levels
                          are 'model' and 'window_end_date'.
        """
        # Retrieve model specifications from the configuration.
        model_specs = self.config['empirical']['model_specifications']
        # Initialize an empty list to store the results dictionaries.
        results_list = []

        logger.info("--- Starting Full Empirical Analysis ---")
        # Outer loop: Iterate through each factor model specification with a progress bar.
        for model_name in tqdm(model_specs.keys(), desc="Processing Models"):

            # Inner loop: Iterate through each rolling window with a nested progress bar.
            for start_date, end_date in tqdm(self.windows, desc=f"  - Windows for {model_name}", leave=False):
                try:
                    # Run the full test procedure for this specific model and window.
                    result_tuple = self._run_test_for_single_window(
                        model_name, start_date, end_date
                    )

                    # If the window was skipped due to insufficient data, continue.
                    if result_tuple is None:
                        continue

                    # Unpack the results.
                    decision_result, N, T = result_tuple

                    # Compile a dictionary of results for this run.
                    results_list.append({
                        'model': model_name,
                        'window_end_date': end_date,
                        'N': N,
                        'T': T,
                        'q_statistic': decision_result.q_statistic,
                        'decision_threshold': decision_result.decision_threshold,
                        'reject_h0': not decision_result.fail_to_reject_h0,
                        'outcome': decision_result.outcome,
                        'critical_value': decision_result.de_randomization_result.critical_value,
                        'B': decision_result.de_randomization_result.b_replications
                    })

                except Exception as e:
                    # Log any unexpected errors that occur for a specific window and continue.
                    # This makes the entire analysis robust to failures in isolated windows.
                    logger.error(
                        f"Failed processing window {end_date.date()} for model {model_name}: {e}",
                        exc_info=True # Include traceback for debugging
                    )
                    continue

        # Check if any results were successfully generated.
        if not results_list:
            logger.warning("Analysis completed but produced no results. Check data and configuration.")
            return pd.DataFrame()

        # Convert the list of dictionaries into a structured DataFrame.
        results_df = pd.DataFrame(results_list)
        # Set a multi-index for easy slicing and analysis.
        results_df.set_index(['model', 'window_end_date'], inplace=True)

        logger.info(f"--- Full Empirical Analysis Complete. Generated {len(results_df)} results. ---")

        # Return the final, sorted results DataFrame.
        return results_df.sort_index()


In [None]:
# Task 10: Robustness Analysis

def run_robustness_analysis(
    asset_returns: pd.DataFrame,
    factor_returns: pd.DataFrame,
    base_config: Dict[str, Any],
    parameter_grid: Dict[str, List[Any]]
) -> Dict[str, pd.DataFrame]:
    """
    Performs a robustness analysis by re-running the empirical test with
    different parameter configurations.

    This function systematically varies key test parameters (like nu and f(B)),
    runs the full analysis for each combination, and then computes metrics to
    assess the stability of the results.

    Args:
        asset_returns (pd.DataFrame): Raw DataFrame of asset returns.
        factor_returns (pd.DataFrame): Raw DataFrame of factor returns.
        base_config (Dict[str, Any]): The baseline configuration dictionary.
        parameter_grid (Dict[str, List[Any]]): A dictionary defining the
            parameter variations to test. Keys are parameter names (e.g., 'nu')
            and values are lists of alternative values.

    Returns:
        A dictionary containing:
            'rejection_rates_df': A DataFrame comparing model rejection rates
                                  across all tested parameter sets.
            'correlation_matrix': A DataFrame showing the Pearson correlation
                                  of rejection rates between parameter sets.
    """
    # --- Step 1: Parameter Sensitivity Testing ---
    logger.info("--- Starting Robustness Analysis ---")

    # Generate all combinations of parameters from the provided grid.
    param_keys = parameter_grid.keys()
    param_values = parameter_grid.values()
    param_combinations = list(itertools.product(*param_values))

    # Dictionary to store the results DataFrame from each run.
    all_results = {}

    # Loop through each unique combination of parameters.
    for combo in param_combinations:
        # Create a descriptive name for this parameter set.
        param_set_name = ", ".join([f"{key}={val}" for key, val in zip(param_keys, combo)])

        # For callables, use the function's name or a description.
        param_set_name = param_set_name.replace('<lambda>', 'lambda')
        logger.info(f"Running analysis for parameter set: {param_set_name}")

        # Create a deep copy of the base configuration to modify.
        run_config = copy.deepcopy(base_config)

        # Update the configuration with the current parameter combination.
        # This uses a simplified path update; a real implementation might need
        # a more robust nested dictionary update function.
        test_config_path = run_config['empirical']['test_config']['our_test']
        for key, value in zip(param_keys, combo):
            if key == 'f_b_function_str': # Special handling for function strings
                test_config_path['de_randomization']['f_B_function'] = eval(value)
            else:
                test_config_path[key] = value

        try:
            # Instantiate and run the orchestrator with the modified config.
            # A unique seed is used for each run to ensure that while the overall
            # analysis is reproducible, the robustness check isn't biased by
            # using the exact same random numbers for different parameters.
            orchestrator = EmpiricalAnalysisOrchestrator(
                asset_returns=asset_returns,
                factor_returns=factor_returns,
                replication_config=run_config,
                seed=hash(param_set_name) % (2**32) # Create a deterministic seed
            )
            results_df = orchestrator.run_analysis()

            # Store the results if the analysis was successful.
            if not results_df.empty:
                all_results[param_set_name] = results_df
            else:
                logger.warning(f"Analysis for '{param_set_name}' produced no results.")

        except Exception as e:
            logger.error(
                f"Robustness run failed for parameter set '{param_set_name}': {e}",
                exc_info=True
            )
            continue

    if not all_results:
        raise RuntimeError("Robustness analysis failed to produce any valid results.")

    # --- Step 2: Robustness Metric Calculation ---
    logger.info("--- Calculating Robustness Metrics ---")

    # Calculate the overall rejection rate for each model under each param set.
    rejection_rates = {}
    for param_set_name, results_df in all_results.items():
        # Group by model and calculate the mean of the boolean 'reject_h0' column.
        rejection_rates[param_set_name] = results_df.groupby('model')['reject_h0'].mean()

    # Combine all rejection rate Series into a single DataFrame.
    rejection_rates_df = pd.DataFrame(rejection_rates)

    # Calculate the Pearson correlation matrix of the rejection rates.
    # High correlation indicates robust conclusions.
    correlation_matrix = rejection_rates_df.corr(method='pearson')

    # Calculate the standard deviation of rejection rates across parameter sets for each model.
    # Low std dev indicates stable absolute rejection rates.
    rejection_rates_df['std_dev'] = rejection_rates_df.std(axis=1)

    logger.info("--- Robustness Analysis Complete ---")

    return {
        'rejection_rates_df': rejection_rates_df,
        'correlation_matrix': correlation_matrix
    }


In [None]:
# Task 11: Results Compilation

def compile_empirical_results(
    results_df: pd.DataFrame,
    replication_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Compiles the raw, window-by-window results into a summary table.

    This function calculates the rejection frequencies for the null hypothesis
    of zero alpha for each factor model. It computes the overall rejection
    rate across the full sample period and also provides a breakdown for
    specific, economically significant subperiods (e.g., financial crises)
    as defined in the configuration. The output format is designed to mirror
    Table 6.1 in the source paper.

    Args:
        results_df (pd.DataFrame): The multi-indexed DataFrame of detailed
            results produced by the EmpiricalAnalysisOrchestrator.
        replication_config (Dict[str, Any]): The study's configuration
            dictionary, used to retrieve the crisis period definitions.

    Returns:
        pd.DataFrame: A summary DataFrame where rows represent different sample
                      periods (Full Sample and crises) and columns represent
                      the factor models. Cell values are the rejection rates.

    Raises:
        ValueError: If the results_df is empty or lacks the required structure.
        KeyError: If the configuration dictionary is missing the required
                  'crisis_periods' section.
    """
    # --- Input Validation ---
    # Check if the input DataFrame is empty.
    if results_df.empty:
        logger.warning("Input results_df is empty. Returning an empty DataFrame.")
        return pd.DataFrame()

    # Verify the required index levels and columns are present.
    required_index = ['model', 'window_end_date']
    required_cols = ['reject_h0']
    if not all(level in results_df.index.names for level in required_index):
        raise ValueError(f"results_df must have index levels: {required_index}")
    if not all(col in results_df.columns for col in required_cols):
        raise ValueError(f"results_df must have columns: {required_cols}")

    # --- Step 1: Calculate Overall Rejection Rates ---
    logger.info("Calculating overall rejection rates for the full sample.")

    # Group by the 'model' index level and calculate the mean of the boolean
    # 'reject_h0' column. This gives the rejection frequency.
    full_sample_rates = results_df.groupby(level='model')['reject_h0'].mean()

    # Store the full sample results in a dictionary to build the final table.
    summary_results = {'Full Sample': full_sample_rates}

    # --- Step 2: Calculate Rejection Rates for Crisis Periods ---
    try:
        # Extract the crisis period definitions from the configuration.
        crisis_periods = replication_config['empirical']['reporting']['crisis_periods']
    except KeyError:
        logger.error("Missing 'crisis_periods' in replication_config['empirical']['reporting'].")
        raise KeyError("Configuration must contain crisis period definitions.")

    logger.info(f"Calculating rejection rates for {len(crisis_periods)} defined subperiods.")

    # Get the 'window_end_date' from the index for efficient slicing.
    window_end_dates = results_df.index.get_level_values('window_end_date')

    # Iterate through each defined crisis period.
    for period_name, (start_str, end_str) in crisis_periods.items():
        # Parse the date strings into Timestamp objects.
        start_date = pd.to_datetime(start_str)
        end_date = pd.to_datetime(end_str)

        # Create a boolean mask to select windows ending within this period.
        period_mask = (window_end_dates >= start_date) & (window_end_dates <= end_date)

        # Slice the results DataFrame using the mask.
        period_results_df = results_df[period_mask]

        # Check if any windows fall within this period.
        if period_results_df.empty:
            logger.warning(f"No windows found for the period '{period_name}'. "
                           f"Results for this period will be NaN.")
            # Create a Series of NaNs to ensure consistent table structure.
            period_rates = pd.Series(np.nan, index=full_sample_rates.index)
        else:
            # If data exists, calculate rejection rates for this subperiod.
            period_rates = period_results_df.groupby(level='model')['reject_h0'].mean()

        # Add the results for this period to the summary dictionary.
        summary_results[period_name] = period_rates

    # --- Step 3: Assemble the Final Summary DataFrame ---

    # Convert the dictionary of results into a single DataFrame.
    # The `orient='index'` argument makes the dictionary keys the rows.
    summary_df = pd.DataFrame(summary_results).T

    # Ensure the column order matches the model specifications in the config.
    model_order = list(replication_config['empirical']['model_specifications'].keys())
    # Reorder columns, dropping any that might not be in the results (if a model failed completely).
    summary_df = summary_df.reindex(columns=[m for m in model_order if m in summary_df.columns])

    logger.info("Successfully compiled summary results table.")

    return summary_df


In [None]:
# Task 12: Visualization

def plot_q_statistic_time_series(
    results_df: pd.DataFrame,
    replication_config: Dict[str, Any],
    title: str = "Time Series of De-Randomized Q-Statistics",
    figsize: Tuple[int, int] = (15, 8)
) -> Tuple[plt.Figure, plt.Axes]:
    """
    Creates a publication-quality plot of the Q-statistic time series.

    This function visualizes the results of the empirical analysis, replicating
    the style of Figure 6.1 from the source paper. It plots the time series
    of the Q-statistic for each factor model, includes the decision threshold
    as a horizontal line, and shades the specified crisis periods.

    Args:
        results_df (pd.DataFrame): The multi-indexed DataFrame of detailed
            results from the EmpiricalAnalysisOrchestrator.
        replication_config (Dict[str, Any]): The study's configuration
            dictionary, used for model ordering and crisis period definitions.
        title (str): The title for the plot.
        figsize (Tuple[int, int]): The size of the figure in inches.

    Returns:
        A tuple containing the matplotlib Figure and Axes objects, allowing for
        further customization or saving.

    Raises:
        ValueError: If the results_df is empty or lacks the required structure.
    """
    # --- Input Validation ---
    # Check if the input DataFrame is empty.
    if results_df.empty:
        raise ValueError("Cannot generate plot from an empty results DataFrame.")
    # Verify the required index levels and columns are present.
    required_index = ['model', 'window_end_date']
    required_cols = ['q_statistic', 'decision_threshold']
    if not all(level in results_df.index.names for level in required_index):
        raise ValueError(f"results_df must have index levels: {required_index}")
    if not all(col in results_df.columns for col in required_cols):
        raise ValueError(f"results_df must have columns: {required_cols}")

    # --- Step 1: Reshape Data for Plotting ---
    logger.info("Reshaping results data for visualization.")

    # Select the q_statistic column for plotting.
    q_stats = results_df['q_statistic']

    # Unstack the 'model' level to create a wide-format DataFrame where
    # columns are models and the index is the date.
    q_stats_wide = q_stats.unstack(level='model')

    # Ensure the plotting order of models is consistent with the configuration.
    model_order = list(replication_config['empirical']['model_specifications'].keys())
    q_stats_wide = q_stats_wide.reindex(columns=[m for m in model_order if m in q_stats_wide.columns])

    # --- Step 2: Create the Plot ---
    logger.info("Generating the Q-statistic time series plot.")

    # Initialize the matplotlib figure and axes with a professional style.
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=figsize)

    # Plot the time series of the Q-statistic for each model.
    q_stats_wide.plot(ax=ax, lw=1.5)

    # Calculate the average decision threshold to plot as a single line.
    # This is a reasonable simplification as N varies only slightly across windows.
    avg_threshold = results_df['decision_threshold'].mean()
    # Plot the decision threshold as a horizontal dashed line.
    ax.axhline(
        y=avg_threshold,
        color='black',
        linestyle='--',
        linewidth=1.0,
        label=f'Decision Threshold (Avg ≈ {avg_threshold:.3f})'
    )

    # --- Step 3: Add Contextual Shading for Crisis Periods ---
    # Extract crisis period definitions from the configuration.
    crisis_periods = replication_config['empirical']['reporting']['crisis_periods']
    # Use a semi-transparent grey for shading.
    shade_color = 'grey'
    shade_alpha = 0.2

    # Iterate through each crisis period and add a shaded region to the plot.
    for period_name, (start_str, end_str) in crisis_periods.items():
        # Convert date strings to Timestamp objects.
        start_date = pd.to_datetime(start_str)
        end_date = pd.to_datetime(end_str)
        # Add the vertical shaded span to the axes.
        ax.axvspan(start_date, end_date, color=shade_color, alpha=shade_alpha, zorder=0)

    # --- Step 4: Final Formatting for Publication Quality ---

    # Set the title and axis labels.
    ax.set_title(title, fontsize=16, fontweight='bold')
    ax.set_ylabel("Q-Statistic", fontsize=12)
    ax.set_xlabel(None) # The date axis is self-explanatory.

    # Set the y-axis limits to be between 0 and 1, as Q is a proportion.
    ax.set_ylim(0, 1.05)

    # Format the x-axis to display years clearly.
    ax.xaxis.set_major_locator(mdates.YearLocator(5)) # Ticks every 5 years
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
    plt.setp(ax.get_xticklabels(), rotation=0, ha='center')

    # Configure the legend.
    ax.legend(title='Factor Model', loc='upper left', bbox_to_anchor=(1.02, 1))

    # Adjust layout to prevent the legend from being cut off.
    fig.tight_layout(rect=[0, 0, 0.9, 1]) # Make space for legend on the right

    logger.info("Plot generation complete.")

    return fig, ax


In [None]:
# Task 13: Monte Carlo Simulation Setup

class SimulationExperiment(NamedTuple):
    """
    A structured container for a single Monte Carlo experiment's configuration.

    This object holds all necessary parameters to run one specific simulation
    cell (e.g., a given scenario with a specific N and T). Using a NamedTuple
    ensures immutability and provides clear attribute access.

    Attributes:
        experiment_id (str): A unique identifier for the experiment.
        scenario (str): The name of the simulation scenario.
        N (int): The number of assets (cross-sectional dimension).
        T (int): The number of time periods.
        M_replications (int): The number of Monte Carlo replications to run.
        dgp_params (Dict[str, Any]): The fully specified parameters for the
                                     Data Generation Process.
        hypothesis_params (Dict[str, Any]): Parameters defining the null and
                                             alternative hypotheses.
        test_config (Dict[str, Any]): Configuration for the statistical tests.
    """
    experiment_id: str
    scenario: str
    N: int
    T: int
    M_replications: int
    dgp_params: Dict[str, Any]
    hypothesis_params: Dict[str, Any]
    test_config: Dict[str, Any]


def _configure_scenario(
    base_dgp_params: Dict[str, Any],
    scenario: str,
    N: int
) -> Dict[str, Any]:
    """
    Applies scenario-specific modifications to the base DGP parameters.

    Args:
        base_dgp_params (Dict[str, Any]): The base DGP parameters.
        scenario (str): The name of the scenario to configure.
        N (int): The number of assets for this experiment.

    Returns:
        Dict[str, Any]: The modified DGP parameters for the specified scenario.
    """
    # Start with a deep copy to avoid modifying the base configuration.
    dgp_params = copy.deepcopy(base_dgp_params)

    # The 'main' scenario includes persistence in the omitted factor.
    dgp_params['omitted_factor_persistence'] = 0.4

    # Apply modifications based on the scenario name.
    if scenario == 'no_persistence':
        # As per Appendix A.1, set the omitted factor persistence (phi_g) to 0.
        dgp_params['omitted_factor_persistence'] = 0.0

    elif scenario == 'weak_error_factor':
        # As per Appendix A.2, a fraction of assets have zero loading on the omitted factor.
        # The factor is "weak" as it affects only floor(N^0.4) assets.
        num_affected = int(np.floor(N**0.4))
        dgp_params['omitted_factor_num_affected'] = num_affected

    elif scenario == 'semi_strong_error_factor':
        # As per Appendix A.2, the factor is "semi-strong" (affects floor(N^0.8) assets).
        num_affected = int(np.floor(N**0.8))
        dgp_params['omitted_factor_num_affected'] = num_affected

    elif scenario == 'semi_strong_pricing_factor':
        # As per Appendix A.3, some pricing factors are semi-strong.
        # Here, we specify that factors 2 and 3 are zero for a fraction of assets.
        num_affected = int(np.floor(N**0.8))
        dgp_params['pricing_factor_num_affected'] = {
            'indices': [1, 2], # Corresponds to Beta_2 and Beta_3
            'count': num_affected
        }

    # The 'main' scenario implies all assets are affected by the omitted factor.
    if 'omitted_factor_num_affected' not in dgp_params:
        dgp_params['omitted_factor_num_affected'] = N

    return dgp_params


def setup_monte_carlo_experiments(
    replication_config: Dict[str, Any]
) -> List[SimulationExperiment]:
    """
    Generates the full list of Monte Carlo experiments to be executed.

    This function parses the 'monte_carlo' section of the configuration,
    creates the Cartesian product of all parameter grids (N, T, scenarios),
    and constructs a list of fully-specified, immutable SimulationExperiment
    objects, one for each unique experimental cell.

    Args:
        replication_config (Dict[str, Any]): The complete study configuration.

    Returns:
        List[SimulationExperiment]: A list of all simulation experiments to run.

    Raises:
        KeyError: If the configuration is missing required sections.
    """
    logger.info("--- Setting up Monte Carlo Simulation Experiments ---")

    try:
        # Extract the main Monte Carlo configuration sections.
        mc_config = replication_config['monte_carlo']
        controls = mc_config['run_controls']
        base_dgp = mc_config['dgp_base_params']
        hypotheses = mc_config['hypothesis_params']
        tests = mc_config['test_config']

        # Extract the parameter grids.
        N_grid = controls['N_grid']
        T_grid = controls['T_grid']
        scenarios = controls['scenarios']
        M_replications = controls['M_replications']

    except KeyError as e:
        logger.error(f"Configuration missing required Monte Carlo key: {e}")
        raise

    # Create the list to hold all experiment configurations.
    experiments = []

    # Generate the Cartesian product of all experimental parameters.
    experiment_grid = list(itertools.product(scenarios, N_grid, T_grid))
    logger.info(f"Generated {len(experiment_grid)} unique experimental cells.")

    # Process each experimental cell to create a full configuration.
    for scenario, N, T in experiment_grid:

        # --- Theoretical Constraint Check (Assumption 3.1) ---
        nu = tests['our_test']['nu']
        # The exponent in the theoretical bound N = O(T^exponent)
        theoretical_exponent = (nu / 2 - 1) / 2
        # We check this for a small epsilon, e.g., 0.01
        if (np.log(N) / np.log(T)) > (theoretical_exponent - 0.01):
            logger.warning(
                f"Experiment (N={N}, T={T}, nu={nu}) may violate Assumption 3.1. "
                f"log(N)/log(T)={np.log(N)/np.log(T):.2f} vs. bound≈{theoretical_exponent:.2f}. "
                "Asymptotic results may not hold."
            )

        # --- Configure Scenario-Specific DGP ---
        # This helper function applies the required modifications for the scenario.
        dgp_params = _configure_scenario(base_dgp, scenario, N)

        # Create a unique ID for this experiment for logging and result tracking.
        exp_id = f"scenario={scenario}_N={N}_T={T}"

        # Create the immutable experiment configuration object.
        experiment = SimulationExperiment(
            experiment_id=exp_id,
            scenario=scenario,
            N=N,
            T=T,
            M_replications=M_replications,
            dgp_params=dgp_params,
            hypothesis_params=hypotheses,
            test_config=tests
        )
        experiments.append(experiment)

    logger.info(f"Successfully created {len(experiments)} simulation experiment configurations.")

    return experiments


In [None]:
# Task 14: Data Generation Process (DGP)

class DGPData(NamedTuple):
    """
    Container for a single realization of the Data Generation Process.

    Attributes:
        asset_returns (np.ndarray): (T x N) array of simulated asset returns.
        factor_returns (np.ndarray): (T x K) array of simulated factor returns.
    """
    asset_returns: np.ndarray
    factor_returns: np.ndarray

def _draw_from_dist(
    dist_config: Dict[str, Any],
    size: Tuple[int, ...],
    rng: np.random.Generator
) -> np.ndarray:
    """
    Draws random samples from a distribution specified in a configuration dictionary.

    This helper function provides a flexible way to generate random numbers from
    various distributions (e.g., uniform, normal) based on string identifiers
    and parameters provided in the main configuration object.

    Args:
        dist_config (Dict[str, Any]): A dictionary containing 'type' (e.g., 'uniform')
                                     and 'args' (e.g., (low, high)).
        size (Tuple[int, ...]): The desired output shape of the random array.
        rng (np.random.Generator): The random number generator instance to use.

    Returns:
        np.ndarray: An array of random samples with the specified shape.

    Raises:
        NotImplementedError: If the specified distribution 'type' is not supported.
        KeyError: If the dist_config dictionary is missing 'type' or 'args'.
    """
    try:
        # Extract the distribution type (e.g., 'uniform', 'normal').
        dist_type = dist_config['type']

        # Extract the arguments for the distribution (e.g., (low, high) or (mean, std)).
        args = dist_config['args']
    except KeyError as e:
        # Raise an error if the configuration is malformed.
        logger.error(f"Distribution configuration is missing key: {e}")
        raise KeyError(f"Invalid distribution configuration: {dist_config}")

    # Generate random numbers based on the specified distribution type.
    if dist_type == 'uniform':
        # Draw from a uniform distribution.
        return rng.uniform(low=args[0], high=args[1], size=size)
    elif dist_type == 'normal':
        # Draw from a normal (Gaussian) distribution.
        return rng.normal(loc=args[0], scale=args[1], size=size)

    # Raise an error for any unsupported distribution type.
    raise NotImplementedError(f"Distribution type '{dist_type}' not implemented.")


def generate_dgp_data(
    experiment: SimulationExperiment,
    is_null_hypothesis: bool,
    rng: np.random.Generator
) -> DGPData:
    """
    Generates a single, complete dataset according to the Data Generation Process (DGP).

    This function is the core of the Monte Carlo simulation. It synthesizes a
    dataset of asset and factor returns based on the detailed specifications
    provided in the SimulationExperiment object, meticulously following the
    stochastic processes outlined in Section 5 and Appendix A of the source paper.

    Args:
        experiment (SimulationExperiment): The configuration object for this DGP run.
        is_null_hypothesis (bool): If True, generate data under the null (all alphas=0).
                                   If False, generate under the alternative.
        rng (np.random.Generator): The random number generator to use for all stochastic
                                   components, ensuring reproducibility.

    Returns:
        DGPData: A NamedTuple containing the (T x N) asset returns and (T x K)
                 factor returns as NumPy arrays.

    Raises:
        ValueError: If DGP parameters are invalid (e.g., non-stationary VAR).
        NotImplementedError: If an unsupported error DGP type is specified.
    """
    # --- Step 0: Extract Parameters and Validate ---

    # Unpack parameters from the experiment object for clarity and frequent use.
    N, T = experiment.N, experiment.T
    dgp = experiment.dgp_params
    K = dgp['K_factors']

    # Define a burn-in period to allow stochastic processes to reach stationarity.
    burn_in = 100

    # --- Step 1: Generate Factor Process (f_t) ---
    logger.debug(f"Generating {K} factors for T={T} periods...")

    # Equation (5.2): f_t = f̄ + Φ(f_{t-1} - f̄) + ζ_t

    # Extract the unconditional mean of the factors.
    factor_mean = np.array(dgp['factor_mean'], dtype=np.float64)

    # Extract the VAR(1) coefficient matrix (assumed diagonal as per the paper).
    phi_matrix = np.diag(dgp['factor_var_matrix']['values'])

    # Check for stationarity of the VAR(1) process.
    if np.any(np.abs(np.linalg.eigvals(phi_matrix)) >= 1):
        raise ValueError("Factor VAR(1) process is not stationary (eigenvalue >= 1).")

    # Pre-allocate array for the factor time series, including the burn-in period.
    factors = np.zeros((T + burn_in, K), dtype=np.float64)

    # Initialize the process at its unconditional mean.
    factors[0, :] = factor_mean

    # Generate the i.i.d. standard normal innovations for the factor process.
    innovations_f = rng.standard_normal(size=(T + burn_in, K))

    # Iterate through time to generate the VAR(1) series.
    for t in range(1, T + burn_in):
        # Apply the VAR(1) update equation.
        factors[t, :] = factor_mean + phi_matrix @ (factors[t-1, :] - factor_mean) + innovations_f[t, :]

    # Discard the burn-in period to get the final stationary time series.
    factor_returns = factors[burn_in:, :]

    # --- Step 2: Generate Factor Loadings (β_i for pricing, γ_i for error) ---
    logger.debug(f"Generating factor loadings for N={N} assets...")

    # Pre-allocate the matrix for pricing factor loadings (betas).
    betas = np.zeros((N, K), dtype=np.float64)

    # Generate loadings for each of the K pricing factors from its specified distribution.
    for k in range(K):
        betas[:, k] = _draw_from_dist(dgp['factor_loading_dist'][k], (N,), rng)

    # Apply scenario modification for semi-strong pricing factors if specified.
    if 'pricing_factor_num_affected' in dgp:
        # Extract the specification for which factors are semi-strong.
        spec = dgp['pricing_factor_num_affected']
        # Determine the number of assets that will have zero loadings.
        num_zero_loadings = N - spec['count']
        # Randomly select the assets that will have zero loadings.
        indices_to_zero = rng.choice(N, size=num_zero_loadings, replace=False)
        # Set the loadings for the specified factors and assets to zero.
        for k in spec['indices']:
            betas[indices_to_zero, k] = 0.0

    # Generate loadings (gammas) for the common component in the error term.
    gammas = _draw_from_dist(dgp['omitted_factor_loading_dist'], (N, 1), rng)

    # Apply scenario modification for weak or semi-strong error factors.
    num_affected_error = dgp['omitted_factor_num_affected']
    if num_affected_error < N:
        # Determine the number of assets with zero loadings on the error factor.
        num_zero_loadings_error = N - num_affected_error
        # Randomly select the assets to have zero loadings.
        indices_to_zero_error = rng.choice(N, size=num_zero_loadings_error, replace=False)
        # Set their gamma loadings to zero.
        gammas[indices_to_zero_error, 0] = 0.0

    # --- Step 3: Generate Error Terms (u_i,t) ---
    logger.debug("Generating error terms...")

    # Equation (5.3): u_t = γ * g_t + ξ_t

    # Generate the common error factor (g_t) as an AR(1) process.
    phi_g = dgp['omitted_factor_persistence']
    g_t = np.zeros(T + burn_in, dtype=np.float64)
    innovations_g = rng.standard_normal(size=T + burn_in)
    for t in range(1, T + burn_in):
        g_t[t] = phi_g * g_t[t-1] + innovations_g[t]
    # Reshape after discarding burn-in to be a (T x 1) column vector.
    g_t_final = g_t[burn_in:].reshape(-1, 1)

    # Generate the T x N matrix of idiosyncratic shocks (ξ_i,t).
    # This example uses the first defined error type; a full implementation could loop.
    error_spec_key = list(dgp['error_dgp_types'].keys())[0]
    error_spec = dgp['error_dgp_types'][error_spec_key]

    if error_spec['type'] == 'gaussian':
        # Generate standard normal idiosyncratic shocks.
        xi = rng.standard_normal(size=(T, N))
    elif error_spec['type'] == 'student_t':
        # Generate shocks from a Student's t-distribution.
        df = error_spec['args']['df']
        if df <= 2:
            raise ValueError("Degrees of freedom for Student's t must be > 2 for finite variance.")
        xi = rng.standard_t(df=df, size=(T, N))
        # Standardize the shocks to have unit variance.
        xi /= np.sqrt(df / (df - 2))
    elif error_spec['type'] == 'garch':
        # Generate shocks from a GARCH(1,1) process for each asset.
        garch_params = error_spec['args']
        omegas = _draw_from_dist(garch_params['omega_dist'], (N,), rng)
        alphas_g = _draw_from_dist(garch_params['alpha_dist'], (N,), rng)
        betas_g = _draw_from_dist(garch_params['beta_dist'], (N,), rng)

        # Check GARCH stationarity condition.
        if np.any(alphas_g + betas_g >= 1):
            logger.warning("Some GARCH processes may be non-stationary (alpha+beta >= 1).")

        xi = np.zeros((T, N), dtype=np.float64)
        h_sq = np.ones((T, N), dtype=np.float64) # Initialize conditional variance
        innovations_xi = rng.standard_normal(size=(T, N))

        # Iterate over time to generate the GARCH series for all assets.
        for t in range(1, T):
            # Equation: h_i,t^2 = ω_i + α_i * ξ_{i,t-1}^2 + β_i * h_{i,t-1}^2
            h_sq[t, :] = omegas + alphas_g * np.square(xi[t-1, :]) + betas_g * h_sq[t-1, :]
        # Calculate the shocks: ξ_i,t = h_i,t * z_i,t
        xi = np.sqrt(h_sq) * innovations_xi
    else:
        raise NotImplementedError(f"Error DGP type '{error_spec['type']}' not implemented.")

    # Combine the common factor and idiosyncratic shocks to form the final errors.
    errors = g_t_final @ gammas.T + xi

    # --- Step 4: Generate Alphas and Synthesize Asset Returns ---
    logger.debug("Generating alphas and synthesizing final asset returns...")

    # Initialize alphas as a vector of zeros (for the null hypothesis).
    alphas = np.zeros((N, 1), dtype=np.float64)

    # If generating under the alternative, create sparse non-zero alphas.
    if not is_null_hypothesis:
        alt_params = experiment.hypothesis_params['alternative']
        sparsity = alt_params['default_sparsity']
        num_nonzero = int(np.floor(N * sparsity))

        if num_nonzero > 0:
            # Randomly select which assets will have non-zero alpha.
            nonzero_indices = rng.choice(N, size=num_nonzero, replace=False)
            # Draw the alpha values from the specified distribution.
            dist_args = alt_params['dist_args']
            alphas[nonzero_indices, 0] = rng.normal(loc=dist_args[0], scale=dist_args[1], size=num_nonzero)

    # Synthesize the final T x N asset returns matrix.
    # Equation (5.1): y_i,t = α_i + β'_i * f_t + u_i,t
    # In matrix form: Y = F @ B.T + U + α.T
    asset_returns = factor_returns @ betas.T + errors + alphas.T

    # Return the final generated data in the structured container.
    return DGPData(
        asset_returns=asset_returns,
        factor_returns=factor_returns
    )


In [None]:
# Task 15: Monte Carlo Simulation

class SimulationResult(NamedTuple):
    """
    Structured container for the results of a full Monte Carlo simulation.

    Attributes:
        experiment_id (str): The unique identifier for the experiment.
        rejection_rate (float): The empirical rejection frequency (size or power).
        conf_interval (Tuple[float, float]): The Wilson score confidence
                                             interval for the rejection rate.
        successful_replications (int): The number of replications that
                                       completed without errors.
        total_replications (int): The total number of replications attempted.
    """
    experiment_id: str
    rejection_rate: float
    conf_interval: Tuple[float, float]
    successful_replications: int
    total_replications: int

def _run_single_replication(
    experiment: SimulationExperiment,
    is_null_hypothesis: bool,
    seed: int
) -> Optional[bool]:
    """
    Executes a single replication of the Monte Carlo simulation.

    This involves generating one dataset and running the full testing pipeline on it.

    Args:
        experiment (SimulationExperiment): The configuration for the experiment.
        is_null_hypothesis (bool): Flag to generate data under H0 or HA.
        seed (int): A unique seed for this specific replication.

    Returns:
        Optional[bool]: True if H0 was rejected, False otherwise. None on failure.
    """
    try:
        # Create a dedicated RNG for this replication to ensure independence.
        rng = np.random.default_rng(seed)

        # Task 14: Generate one dataset.
        dgp_data = generate_dgp_data(experiment, is_null_hypothesis, rng)

        # Convert numpy arrays to the pandas DataFrames required by the test pipeline.
        # A dummy DatetimeIndex is sufficient for the simulation.
        date_index = pd.to_datetime(pd.date_range(start='2000-01-01', periods=experiment.T, freq='M'))
        asset_returns_df = pd.DataFrame(dgp_data.asset_returns, index=date_index)
        factor_returns_df = pd.DataFrame(dgp_data.factor_returns, index=date_index)

        # --- Execute Full Test Pipeline (Tasks 4-8) ---
        test_config = experiment.test_config['our_test']

        estimation_res = estimate_factor_model_params(asset_returns_df, factor_returns_df)
        psi_vals = compute_psi_statistic(estimation_res, nu=test_config['nu'])
        de_rand_res = perform_de_randomization(
            psi_vals,
            tau=test_config['de_randomization']['tau'],
            b_function=test_config['de_randomization']['B_function'],
            rng=rng # Pass the same RNG to maintain state
        )
        decision = make_test_decision(
            de_rand_res,
            f_b_function=test_config['de_randomization']['f_B_function']
        )

        # Return the rejection decision (True if rejected, False if not).
        return not decision.fail_to_reject_h0

    except Exception as e:
        # If any step fails for this replication, log the error and return None.
        logger.error(f"Replication failed for experiment {experiment.experiment_id}: {e}", exc_info=False)
        return None


def run_monte_carlo_simulation(
    experiment: SimulationExperiment,
    is_null_hypothesis: bool,
    n_jobs: int = -1,
    seed: Optional[int] = None
) -> SimulationResult:
    """
    Runs a full Monte Carlo simulation for a single experimental cell.

    This function manages the execution of M replications, distributing the
    workload across multiple CPU cores if available. It then aggregates the
    results to compute the empirical rejection rate (size or power) and its
    confidence interval.

    Args:
        experiment (SimulationExperiment): The configuration for the experiment.
        is_null_hypothesis (bool): If True, calculates empirical size. If False,
                                   calculates empirical power.
        n_jobs (int): The number of CPU cores to use. -1 means use all available.
        seed (Optional[int]): A seed to initialize the random number generator
                              for the entire set of replications.

    Returns:
        SimulationResult: A NamedTuple containing the final simulation results.
    """
    # Extract the number of replications from the experiment config.
    M = experiment.M_replications

    # Create a master RNG for this experiment.
    # It will be used to generate unique, deterministic seeds for each replication.
    master_rng = np.random.default_rng(seed)

    # Generate M unique seeds for the M replications. This is crucial for
    # ensuring that parallel jobs are independently and reproducibly seeded.
    replication_seeds = master_rng.integers(low=0, high=2**32 - 1, size=M)

    logger.info(
        f"Starting Monte Carlo simulation for '{experiment.experiment_id}' "
        f"under H{int(not is_null_hypothesis)} with {M} replications on {n_jobs} core(s)."
    )

    # Use joblib.Parallel for efficient, parallel execution of the replications.
    # The `delayed` function wraps the function and its arguments for lazy evaluation.
    results = Parallel(n_jobs=n_jobs)(
        delayed(_run_single_replication)(experiment, is_null_hypothesis, seed)
        for seed in tqdm(replication_seeds, desc=f"Simulating {experiment.experiment_id}")
    )

    # Filter out any replications that failed (returned None).
    successful_results = [res for res in results if res is not None]

    # Get the number of successful replications.
    num_success = len(successful_results)

    # Check if a significant number of replications failed.
    if num_success < 0.95 * M:
        logger.warning(
            f"High failure rate for experiment '{experiment.experiment_id}': "
            f"{M - num_success}/{M} replications failed. Results may be unreliable."
        )

    # If no replications succeeded, return a failure result.
    if num_success == 0:
        logger.error(f"All {M} replications failed for '{experiment.experiment_id}'.")
        return SimulationResult(experiment.experiment_id, np.nan, (np.nan, np.nan), 0, M)

    # Calculate the number of rejections.
    num_rejections = sum(successful_results)

    # Calculate the empirical rejection rate (size or power).
    rejection_rate = num_rejections / num_success

    # Calculate the Wilson score confidence interval for the rejection rate.
    conf_interval = proportion_confint(
        count=num_rejections, nobs=num_success, method='wilson'
    )

    logger.info(
        f"Simulation for '{experiment.experiment_id}' complete. "
        f"Rejection Rate: {rejection_rate:.4f} "
        f"(95% CI: [{conf_interval[0]:.4f}, {conf_interval[1]:.4f}])"
    )

    # Return the final, structured result.
    return SimulationResult(
        experiment_id=experiment.experiment_id,
        rejection_rate=rejection_rate,
        conf_interval=conf_interval,
        successful_replications=num_success,
        total_replications=M
    )


In [None]:
# Task 16: Monte Carlo Orchestrator Function

class MonteCarloOrchestrator:
    """
    Orchestrates the execution of the entire Monte Carlo simulation study.

    This class manages the full lifecycle of the simulation:
    1. Initialization: Sets up the complete grid of experiments based on the
       provided configuration.
    2. Execution: Runs the simulation for each experimental cell, for both the
       null and alternative hypotheses, to calculate empirical size and power.
    3. Compilation: Gathers all results into a single, tidy DataFrame.
    """

    def __init__(
        self,
        replication_config: Dict[str, Any],
        seed: Optional[int] = 42
    ):
        """
        Initializes the orchestrator and sets up the experimental plan.

        Args:
            replication_config (Dict[str, Any]): The complete study configuration.
            seed (Optional[int]): A master seed for the random number generator
                                  to ensure the entire study is reproducible.
        """
        # Log the start of the initialization process.
        logger.info("Initializing MonteCarloOrchestrator...")

        # Store a deep copy of the configuration to prevent external modifications.
        self.config = copy.deepcopy(replication_config)

        # Store the master seed.
        self.seed = seed

        # Initialize the results attribute to None. It will be populated by the run method.
        self.results_df: Optional[pd.DataFrame] = None

        try:
            # Use the setup function from Task 13 to generate the experimental plan.
            self.experiments: List[SimulationExperiment] = setup_monte_carlo_experiments(
                self.config
            )
        except (KeyError, ValueError) as e:
            # Catch errors during setup and provide a clear message.
            logger.error(f"Failed to initialize orchestrator during experiment setup: {e}")
            raise

        # Log the successful completion of the setup.
        logger.info(
            f"Orchestrator initialized with {len(self.experiments)} unique experiments to run."
        )

    def run(self, n_jobs: int = -1) -> pd.DataFrame:
        """
        Executes the full grid of Monte Carlo simulations.

        This method iterates through every experiment defined during initialization,
        running simulations for both the null (size) and alternative (power)
        hypotheses. It leverages parallel processing to speed up execution.

        Args:
            n_jobs (int): The number of CPU cores to use for parallel execution.
                          -1 means use all available cores. 1 means run serially.

        Returns:
            pd.DataFrame: A tidy DataFrame containing the results (size and power)
                          for every experimental cell.
        """
        # Log the start of the main execution phase.
        logger.info(f"--- Starting Monte Carlo Simulation Run ({len(self.experiments)} experiments) ---")

        # Initialize an empty list to store the result dictionaries.
        results_list = []

        # Create a master RNG to generate unique seeds for each experimental cell.
        master_rng = np.random.default_rng(self.seed)

        # Iterate through the pre-defined list of experiments with a progress bar.
        for experiment in tqdm(self.experiments, desc="Overall Simulation Progress"):
            try:
                # Generate a unique, deterministic seed for this specific experiment cell.
                experiment_seed = master_rng.integers(0, 2**32 - 1)

                # --- Run simulation under the Null Hypothesis (for Size) ---
                size_result = run_monte_carlo_simulation(
                    experiment=experiment,
                    is_null_hypothesis=True,
                    n_jobs=n_jobs,
                    seed=experiment_seed
                )

                # Convert the result to a dictionary and add experiment details.
                size_dict = {
                    'scenario': experiment.scenario, 'N': experiment.N, 'T': experiment.T,
                    'hypothesis': 'size', **size_result._asdict()
                }
                results_list.append(size_dict)

                # --- Run simulation under the Alternative Hypothesis (for Power) ---
                power_result = run_monte_carlo_simulation(
                    experiment=experiment,
                    is_null_hypothesis=False,
                    n_jobs=n_jobs,
                    seed=experiment_seed + 1 # Use a different seed for the power run
                )

                # Convert the result to a dictionary and add experiment details.
                power_dict = {
                    'scenario': experiment.scenario, 'N': experiment.N, 'T': experiment.T,
                    'hypothesis': 'power', **power_result._asdict()
                }
                results_list.append(power_dict)

            except Exception as e:
                # Log any critical failure for an entire experimental cell and continue.
                logger.error(
                    f"A critical error occurred during simulation for experiment "
                    f"'{experiment.experiment_id}': {e}",
                    exc_info=True
                )
                continue

        # Check if the simulation produced any results.
        if not results_list:
            logger.error("Monte Carlo simulation run completed with no results.")
            self.results_df = pd.DataFrame()
            return self.results_df

        # Convert the list of dictionaries into a final, tidy DataFrame.
        self.results_df = pd.DataFrame(results_list)

        # Log the successful completion of the entire simulation study.
        logger.info("--- Monte Carlo Simulation Run Complete ---")

        return self.results_df


In [None]:
# Task 17: Monte Carlo Robustness Analysis

def _run_fly_test(
    asset_returns_df: pd.DataFrame,
    factor_returns_df: pd.DataFrame,
    tau: float
) -> bool:
    """
    Implements a high-dimensional GRS-style test inspired by Fan, Liao, Yao (2015).

    This function provides a functional implementation of a test for the null
    hypothesis that all alphas are jointly zero, designed for high-dimensional
    settings (N > T). It uses a regularized (pseudo-inverse) estimate of the
    residual covariance matrix to ensure stability.

    Args:
        asset_returns_df (pd.DataFrame): A (T x N) DataFrame of asset returns.
        factor_returns_df (pd.DataFrame): A (T x K) DataFrame of factor returns.
        tau (float): The significance level for the hypothesis test.

    Returns:
        bool: True if the null hypothesis is rejected, False otherwise.
    """
    # Step 1: Perform OLS to obtain estimated alphas and residuals.
    estimation_result = estimate_factor_model_params(asset_returns_df, factor_returns_df)

    # Extract estimated alphas as a NumPy array.
    alphas = estimation_result.alphas.to_numpy()

    # Extract residuals as a NumPy array.
    residuals = estimation_result.residuals.to_numpy()

    # Get the dimensions of the problem.
    T, N = residuals.shape

    # Step 2: Estimate the residual covariance matrix.
    sigma_u = np.cov(residuals, rowvar=False)

    # Use the Moore-Penrose pseudo-inverse for numerical stability, which is crucial when N > T.
    sigma_u_inv = np.linalg.pinv(sigma_u)

    # Step 3: Calculate the GRS-style test statistic (Wald test for alphas).
    # Equation: J = T * α' * Σ_u^(-1) * α
    test_statistic = T * (alphas.T @ sigma_u_inv @ alphas)

    # Step 4: Determine the critical value from the asymptotic Chi-squared distribution.
    # Under H0, the statistic is asymptotically χ² with N degrees of freedom.
    critical_value = chi2.ppf(1 - tau, df=N)

    # The null is rejected if the test statistic exceeds the critical value.
    return test_statistic > critical_value

def _run_as_test(
    asset_returns_df: pd.DataFrame,
    factor_returns_df: pd.DataFrame,
    tau: float
) -> bool:
    """
    Implements the Ardia and Sessinou (2024) Cauchy combination test.

    This method tests the joint null of zero alphas by running individual t-tests
    for each asset's alpha and then combining the p-values using the robust
    Cauchy combination method.

    Args:
        asset_returns_df (pd.DataFrame): A (T x N) DataFrame of asset returns.
        factor_returns_df (pd.DataFrame): A (T x K) DataFrame of factor returns.
        tau (float): The significance level for the hypothesis test.

    Returns:
        bool: True if the null hypothesis is rejected, False otherwise.
    """
    # Get the dimensions of the problem.
    T, N = asset_returns_df.shape
    K = factor_returns_df.shape[1]

    # Initialize a list to store the p-value from each individual alpha test.
    p_values = []

    # --- Step 1 & 2: Asset-by-Asset t-tests and p-value collection ---

    # Prepare the regressor matrix X by adding an intercept.
    X = np.hstack([np.ones((T, 1)), factor_returns_df.to_numpy()])

    # Pre-compute (X'X)^-1 for efficiency.
    try:
        xtx_inv = np.linalg.inv(X.T @ X)
    except np.linalg.LinAlgError:
        logger.warning("Factor matrix is singular in AS test; cannot proceed.")
        return False

    # Loop through each asset to perform an individual t-test.
    for i in range(N):
        # Extract the return vector for the current asset.
        y = asset_returns_df.iloc[:, i].to_numpy()

        # Calculate OLS coefficients: β̂ = (X'X)^-1 * X'y
        coeffs = xtx_inv @ X.T @ y

        # Calculate residuals: û = y - Xβ̂
        residuals = y - X @ coeffs

        # Estimate residual variance: σ̂^2 = û'û / (T - K - 1)
        sigma_sq = np.sum(np.square(residuals)) / (T - K - 1)

        # Calculate the standard error of alpha (the intercept, which is the first coefficient).
        se_alpha = np.sqrt(sigma_sq * xtx_inv[0, 0])

        # If standard error is near zero, the test is degenerate; skip this asset.
        if se_alpha < 1e-9:
            continue

        # Calculate the t-statistic for the alpha.
        t_stat = coeffs[0] / se_alpha

        # Calculate the two-tailed p-value from the t-distribution's survival function.
        p_val = 2 * t.sf(np.abs(t_stat), df=T - K - 1)
        p_values.append(p_val)

    # If no valid p-values were generated, the test cannot be completed.
    if not p_values:
        logger.warning("AS test could not be performed; no valid p-values were generated.")
        return False

    # --- Step 3: Cauchy Combination ---

    # Transform p-values using the inverse CDF of the Cauchy distribution.
    # Equation: T_C = (1/N_valid) * Σ tan((0.5 - p_i) * π)
    cauchy_terms = np.tan((0.5 - np.array(p_values)) * np.pi)

    # The test statistic is the mean of these transformed values.
    test_statistic = np.mean(cauchy_terms)

    # --- Step 4: Final p-value and Decision ---

    # The combined p-value is derived from the CDF of the standard Cauchy distribution.
    # Equation: p_combined = 0.5 - arctan(T_C) / π
    final_p_value = 0.5 - np.arctan(test_statistic) / np.pi

    # Reject the null hypothesis if the combined p-value is less than the significance level.
    return final_p_value < tau

def _run_single_replication_for_comparison(
    experiment: SimulationExperiment,
    is_null_hypothesis: bool,
    seed: int
) -> Optional[Dict[str, bool]]:
    """
    Executes a single replication for multiple tests and returns their decisions.

    This function serves as the core computational unit for the comparison
    simulation. For a single set of DGP parameters, it generates one dataset
    and applies the main test ("OurTest") as well as a suite of competing
    tests (FLY, AS). It is designed to be robust, handling potential failures
    in any individual test without halting the entire replication.

    Args:
        experiment (SimulationExperiment): The configuration object containing all
            parameters for the DGP and the tests.
        is_null_hypothesis (bool): A flag indicating whether to generate data
            under the null (alphas=0) or the alternative hypothesis.
        seed (int): A unique seed for the random number generator to ensure
            this specific replication is reproducible.

    Returns:
        Optional[Dict[str, bool]]: A dictionary where keys are test names
            (e.g., 'OurTest', 'FLY', 'AS') and values are booleans indicating
            the rejection decision (True for reject, False for fail to reject).
            Returns None if the data generation or a critical part of the
            setup fails, allowing the parent process to discard the replication.
    """
    try:
        # Initialize a dedicated RNG for this replication to ensure independence.
        rng = np.random.default_rng(seed)

        # Generate one dataset using the DGP from Task 14.
        dgp_data = generate_dgp_data(experiment, is_null_hypothesis, rng)

        # Format the generated data into pandas DataFrames.
        date_index = pd.to_datetime(pd.date_range(start='2000-01-01', periods=experiment.T, freq='M'))
        asset_returns_df = pd.DataFrame(dgp_data.asset_returns, index=date_index, columns=[f'Asset_{i+1}' for i in range(experiment.N)])
        factor_returns_df = pd.DataFrame(dgp_data.factor_returns, index=date_index, columns=[f'Factor_{i+1}' for i in range(experiment.dgp_params['K_factors'])])

        # --- Run Our Test (Massacci, Sarno, Trapani, Vallarino) ---
        test_config = experiment.test_config['our_test']
        est_res = estimate_factor_model_params(asset_returns_df, factor_returns_df)
        psi_vals = compute_psi_statistic(est_res, nu=test_config['nu'])
        de_rand_res = perform_de_randomization(
            psi_vals, tau=test_config['de_randomization']['tau'],
            b_function=test_config['de_randomization']['B_function'], rng=rng
        )
        decision = make_test_decision(
            de_rand_res, f_b_function=test_config['de_randomization']['f_B_function']
        )
        our_test_rejection = not decision.fail_to_reject_h0

        # --- Run Competing Tests ---
        fly_rejection = _run_fly_test(asset_returns_df, factor_returns_df, tau=test_config['de_randomization']['tau'])
        as_rejection = _run_as_test(asset_returns_df, factor_returns_df, tau=test_config['de_randomization']['tau'])

        # Return a dictionary of boolean rejection decisions.
        return {'OurTest': our_test_rejection, 'FLY': fly_rejection, 'AS': as_rejection}

    except Exception as e:
        # Gracefully handle any failure during a single replication.
        logger.error(f"Comparison replication failed for exp {experiment.experiment_id}: {e}", exc_info=False)
        return None

def run_comparison_simulation(
    experiment: SimulationExperiment,
    n_jobs: int = -1,
    seed: Optional[int] = None
) -> pd.DataFrame:
    """
    Runs a full Monte Carlo simulation comparing multiple alpha tests.

    This function orchestrates a complete simulation experiment to compare the
    performance (both empirical size and power) of the paper's proposed test
    against specified competitors. It manages the parallel execution of
    M replications, aggregates the results, and provides a final summary
    DataFrame.

    Args:
        experiment (SimulationExperiment): The configuration object for the
            experimental cell to be simulated.
        n_jobs (int): The number of CPU cores to use for parallel execution of
            the M replications. A value of -1 uses all available cores. A value
            of 1 runs the simulation serially.
        seed (Optional[int]): A master seed to initialize the random number
            generator for the entire set of replications. This ensures that
            the full simulation run is reproducible.

    Returns:
        pd.DataFrame: A DataFrame with two rows ('size', 'power') and one
            column for each test being compared (e.g., 'OurTest', 'FLY', 'AS').
            The values are the empirical rejection rates.
    """
    # Extract the number of replications for this experiment.
    M = experiment.M_replications

    # Create a master RNG to generate unique seeds for each replication.
    master_rng = np.random.default_rng(seed)
    replication_seeds = master_rng.integers(low=0, high=2**32 - 1, size=M)

    logger.info(f"Starting comparison simulation for '{experiment.experiment_id}'...")

    # List to store the results DataFrames for size and power.
    results_data = []

    # Run the simulation for both the null (is_null=True) and alternative (is_null=False).
    for is_null in [True, False]:
        # Set a descriptive name for the current run.
        hypothesis_name = 'size' if is_null else 'power'

        # Execute the M replications in parallel.
        results = Parallel(n_jobs=n_jobs)(
            delayed(_run_single_replication_for_comparison)(experiment, is_null, seed)
            for seed in tqdm(replication_seeds, desc=f"Simulating {hypothesis_name} for {experiment.experiment_id}")
        )

        # Filter out any replications that failed.
        successful_results = [res for res in results if res is not None]

        # If no replications were successful, log a warning and continue.
        if not successful_results:
            logger.warning(f"No successful replications for {hypothesis_name} in {experiment.experiment_id}.")
            continue

        # Convert the list of result dictionaries into a DataFrame.
        results_df = pd.DataFrame(successful_results)

        # Calculate the rejection rate for each test.
        rejection_rates = results_df.mean().rename(hypothesis_name)
        results_data.append(rejection_rates)

    # If both runs failed, return an empty DataFrame.
    if not results_data:
        return pd.DataFrame()

    # Concatenate the size and power Series into a single summary DataFrame.
    comparison_df = pd.concat(results_data, axis=1).T
    logger.info(f"Comparison simulation for '{experiment.experiment_id}' complete.")

    return comparison_df

def analyze_power_by_sparsity(
    base_experiment: SimulationExperiment,
    n_jobs: int = -1
) -> pd.DataFrame:
    """
    Analyzes and computes the empirical power curve of the test.

    This function systematically evaluates the power of the proposed test by
    running a series of full Monte Carlo simulations across a grid of
    "sparsity levels". Sparsity here refers to the fraction of assets with
    non-zero alphas under the alternative hypothesis. This analysis is crucial
    for understanding how the test's ability to detect mispricing changes as
    the mispricing becomes less pervasive.

    Args:
        base_experiment (SimulationExperiment): A fully configured experiment
            object that serves as the template. The sparsity level within its
            hypothesis_params will be programmatically overridden for each run.
        n_jobs (int): The number of CPU cores to use for parallel execution
            of the simulations at each sparsity level.

    Returns:
        pd.DataFrame: A DataFrame indexed by the sparsity level. Columns include
            'power' (the empirical rejection rate), and 'ci_lower' and 'ci_upper'
            for the confidence interval of the power estimate.
    """
    # Extract the grid of sparsity levels to test from the configuration.
    sparsity_levels = base_experiment.hypothesis_params['alternative']['sparsity_levels']
    logger.info(f"Starting power analysis across {len(sparsity_levels)} sparsity levels for {base_experiment.experiment_id}.")

    # List to store the results for each sparsity level.
    power_results = []

    # Iterate through each sparsity level with a progress bar.
    for sparsity in tqdm(sparsity_levels, desc=f"Analyzing Power vs. Sparsity for {base_experiment.experiment_id}"):
        # Create a mutable copy of the hypothesis params to modify.
        new_hypothesis_params = copy.deepcopy(base_experiment.hypothesis_params)

        # Surgically update the sparsity level for this specific run.
        new_hypothesis_params['alternative']['default_sparsity'] = sparsity

        # Create a new, immutable experiment tuple with the modified hypothesis.
        run_experiment = base_experiment._replace(hypothesis_params=new_hypothesis_params)

        # Run the simulation for the alternative hypothesis only.
        power_result = run_monte_carlo_simulation(
            experiment=run_experiment,
            is_null_hypothesis=False,
            n_jobs=n_jobs,
            seed=int(sparsity * 1e6) # Use a deterministic seed based on sparsity.
        )

        # Append the structured results to the list.
        power_results.append({
            'sparsity_level': sparsity,
            'power': power_result.rejection_rate,
            'ci_lower': power_result.conf_interval[0],
            'ci_upper': power_result.conf_interval[1],
        })

    # Convert the list of results into a final, indexed DataFrame.
    power_curve_df = pd.DataFrame(power_results).set_index('sparsity_level')

    logger.info(f"Power analysis by sparsity for {base_experiment.experiment_id} complete.")
    return power_curve_df


In [None]:
# Task 18: Results Compilation and Visualization

def create_mc_results_table(
    mc_results_df: pd.DataFrame,
    scenario_filter: str
) -> pd.io.formats.style.Styler:
    """
    Creates a publication-quality summary table of Monte Carlo results.

    This function takes the tidy DataFrame of all simulation results, filters it
    for a specific scenario, and pivots it into a wide-format table showing
    empirical size and power across the grid of N and T values, similar to
    the tables in Section 5 of the source paper.

    Args:
        mc_results_df (pd.DataFrame): The complete, tidy DataFrame of results
            from the MonteCarloOrchestrator.
        scenario_filter (str): The name of the scenario to create the table for.

    Returns:
        pd.io.formats.style.Styler: A pandas Styler object, which can be
            rendered in a Jupyter notebook or exported to HTML/LaTeX.
    """
    # --- Input Validation ---
    if not isinstance(mc_results_df, pd.DataFrame) or mc_results_df.empty:
        raise ValueError("mc_results_df must be a non-empty pandas DataFrame.")
    required_cols = {'scenario', 'N', 'T', 'hypothesis', 'rejection_rate'}
    if not required_cols.issubset(mc_results_df.columns):
        raise ValueError(f"mc_results_df is missing required columns. Need: {required_cols}")

    # --- Step 1: Filter and Prepare Data ---

    # Filter the results for the specified scenario.
    scenario_df = mc_results_df[mc_results_df['scenario'] == scenario_filter].copy()

    if scenario_df.empty:
        logger.warning(f"No results found for scenario '{scenario_filter}'. Returning empty table.")
        return pd.DataFrame().style

    # --- Step 2: Pivot Data for Size and Power ---

    # Create the pivot table for empirical size.
    size_pivot = scenario_df[scenario_df['hypothesis'] == 'size'].pivot_table(
        index='N', columns='T', values='rejection_rate'
    )

    # Create the pivot table for empirical power.
    power_pivot = scenario_df[scenario_df['hypothesis'] == 'power'].pivot_table(
        index='N', columns='T', values='rejection_rate'
    )

    # --- Step 3: Combine and Format the Table ---

    # Create a multi-level column index for clarity.
    size_pivot.columns = pd.MultiIndex.from_product([['Empirical Size (H₀)'], size_pivot.columns])
    power_pivot.columns = pd.MultiIndex.from_product([['Empirical Power (Hₐ)'], power_pivot.columns])

    # Concatenate the size and power tables side-by-side.
    final_table = pd.concat([size_pivot, power_pivot], axis=1)

    # --- Step 4: Style the Table for Publication ---

    # Apply styling using the pandas Styler API.
    styled_table = (
        final_table.style
        .set_caption(f"Monte Carlo Results: {scenario_filter.replace('_', ' ').title()}")
        .format("{:.3f}", na_rep="-")
        .background_gradient(cmap='viridis', subset=pd.IndexSlice[:, 'Empirical Power (Hₐ)'])
        .background_gradient(
            cmap='coolwarm',
            subset=pd.IndexSlice[:, 'Empirical Size (H₀)'],
            vmin=0.0,
            vmax=0.10 # Highlight deviations from the nominal 5% size
        )
        .set_properties(**{'width': '70px', 'text-align': 'center'})
    )

    return styled_table

def plot_power_curves(
    power_curve_data: Dict[str, pd.DataFrame],
    title: str = "Power Curve Analysis by Sparsity of Alternative",
    figsize: Tuple[int, int] = (12, 8)
) -> Tuple[plt.Figure, plt.Axes]:
    """
    Creates a publication-quality plot of one or more power curves.

    This function visualizes the results from the sparsity analysis, plotting
    empirical power as a function of the fraction of non-zero alphas. It can
    plot a single curve with its confidence interval or compare multiple curves
    for different tests on the same axes.

    Args:
        power_curve_data (Dict[str, pd.DataFrame]): A dictionary where keys are
            the names of the tests (e.g., 'OurTest', 'FLY') and values are the
            power curve DataFrames produced by `analyze_power_by_sparsity`.
        title (str): The title for the plot.
        figsize (Tuple[int, int]): The size of the figure in inches.

    Returns:
        A tuple containing the matplotlib Figure and Axes objects.
    """
    # --- Input Validation ---
    if not power_curve_data:
        raise ValueError("Input 'power_curve_data' dictionary cannot be empty.")

    # --- Step 1: Initialize Plot ---
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=figsize)

    # --- Step 2 & 3: Plot Power Curves and Confidence Bands ---

    # Define a color cycle for comparing multiple tests.
    colors = plt.cm.viridis(np.linspace(0, 1, len(power_curve_data)))

    # Iterate through each test's power curve data provided in the dictionary.
    for i, (test_name, df) in enumerate(power_curve_data.items()):
        # Check if the required columns exist.
        required_cols = {'power', 'ci_lower', 'ci_upper'}
        if not required_cols.issubset(df.columns):
            raise ValueError(f"DataFrame for '{test_name}' is missing required columns.")

        # Plot the main power curve (power vs. sparsity).
        ax.plot(
            df.index,
            df['power'],
            label=test_name,
            color=colors[i],
            marker='o',
            linestyle='-',
            lw=2
        )

        # Add a shaded confidence band around the power curve.
        ax.fill_between(
            df.index,
            df['ci_lower'],
            df['ci_upper'],
            color=colors[i],
            alpha=0.2,
            label=f'_{test_name}_ci' # Underscore hides it from the main legend
        )

    # --- Step 4: Final Formatting ---

    # Set the title and axis labels.
    ax.set_title(title, fontsize=16, fontweight='bold')
    ax.set_xlabel("Sparsity of Alternative (Fraction of non-zero α)", fontsize=12)
    ax.set_ylabel("Empirical Power (Rejection Rate)", fontsize=12)

    # Set logical axis limits.
    ax.set_ylim(0, 1.05)
    ax.set_xlim(left=0)

    # Add a legend to identify the different tests.
    ax.legend(title="Test", fontsize=10)

    # Ensure the layout is tight.
    fig.tight_layout()

    logger.info("Power curve plot generated successfully.")

    return fig, ax


In [None]:
# Task 19: Interpretation and Discussion

def _interpret_empirical_results(
    empirical_summary_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Analyzes and interprets the empirical results summary table.

    Args:
        empirical_summary_df (pd.DataFrame): The summary table from `compile_empirical_results`.

    Returns:
        A dictionary containing key quantitative findings.
    """
    # Initialize the dictionary to store findings.
    findings = {}

    # --- Full Sample Analysis ---
    # Extract the full sample rejection rates.
    full_sample_rates = empirical_summary_df.loc['Full Sample']
    # Find the model with the lowest rejection rate (best performance).
    best_model = full_sample_rates.idxmin()
    # Find the model with the highest rejection rate (worst performance).
    worst_model = full_sample_rates.idxmax()
    findings['full_sample'] = {
        'best_performing_model': f"{best_model} (Rejection Rate: {full_sample_rates.min():.3f})",
        'worst_performing_model': f"{worst_model} (Rejection Rate: {full_sample_rates.max():.3f})",
        'model_ranking_by_rejection_rate': full_sample_rates.sort_values().to_dict()
    }

    # --- Crisis Period Analysis ---
    crisis_findings = {}
    # Isolate the crisis periods by dropping the 'Full Sample' row.
    crisis_df = empirical_summary_df.drop('Full Sample')
    # Find the best performing model for each crisis.
    crisis_bests = crisis_df.idxmin(axis=1)
    for period, model in crisis_bests.items():
        rate = crisis_df.loc[period, model]
        crisis_findings[period] = f"Best performer: {model} (Rejection Rate: {rate:.3f})"

    # Analyze the degradation in performance during crises compared to the full sample.
    performance_degradation = crisis_df.subtract(full_sample_rates, axis=1)
    crisis_findings['max_performance_degradation'] = performance_degradation.max().idxmax()

    findings['crisis_analysis'] = crisis_findings

    return findings

def _interpret_mc_results(
    mc_results_df: pd.DataFrame,
    nominal_size: float
) -> Dict[str, Any]:
    """
    Analyzes and interprets the Monte Carlo simulation results.

    Args:
        mc_results_df (pd.DataFrame): The tidy DataFrame from `MonteCarloOrchestrator`.
        nominal_size (float): The nominal size (τ) of the test.

    Returns:
        A dictionary containing key quantitative findings about the test's properties.
    """
    findings = {}

    # --- Size Control Analysis ---
    # Filter for the empirical size results.
    size_df = mc_results_df[mc_results_df['hypothesis'] == 'size'].copy()
    # Check where the nominal size is outside the result's confidence interval.
    size_df['is_distorted'] = (size_df['ci_lower'] > nominal_size) | (size_df['ci_upper'] < nominal_size)

    num_distorted = size_df['is_distorted'].sum()
    total_experiments = len(size_df)

    findings['size_control'] = {
        'summary': f"Test exhibits size distortion in {num_distorted} out of {total_experiments} cells.",
        'distorted_cells': size_df[size_df['is_distorted']][['scenario', 'N', 'T', 'rejection_rate']].to_dict('records')
    }

    # --- Power Analysis ---
    # Filter for the empirical power results.
    power_df = mc_results_df[mc_results_df['hypothesis'] == 'power']
    # Find the average power across all scenarios.
    avg_power = power_df['rejection_rate'].mean()
    # Find conditions with low power (e.g., < 50%).
    low_power_cells = power_df[power_df['rejection_rate'] < 0.5]

    findings['power_analysis'] = {
        'average_power': f"{avg_power:.3f}",
        'summary_low_power': f"{len(low_power_cells)} out of {len(power_df)} cells show power < 50%.",
        'low_power_cells': low_power_cells[['scenario', 'N', 'T', 'rejection_rate']].to_dict('records')
    }

    return findings

def generate_results_summary(
    empirical_results: Optional[pd.DataFrame] = None,
    mc_results: Optional[pd.DataFrame] = None,
    replication_config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    Generates a structured, quantitative summary of all study results.

    This function serves as the final analysis step, taking the compiled
    results from the empirical and Monte Carlo studies and producing a
    dictionary of key, interpretable findings. It does not generate prose,
    but rather provides the quantitative building blocks for a final research report.

    Args:
        empirical_results (Optional[pd.DataFrame]): The summary table of empirical
            rejection rates from `compile_empirical_results`.
        mc_results (Optional[pd.DataFrame]): The tidy DataFrame of all Monte Carlo
            simulation results from `MonteCarloOrchestrator`.
        replication_config (Optional[Dict[str, Any]]): The study's configuration,
            needed for context like nominal test size.

    Returns:
        Dict[str, Any]: A nested dictionary containing the structured summary.
    """
    logger.info("--- Generating Final Results Summary ---")

    # Initialize the master dictionary for all findings.
    master_summary = {}

    # --- Interpret Empirical Results if provided ---
    if empirical_results is not None:
        if not isinstance(empirical_results, pd.DataFrame):
            raise TypeError("empirical_results must be a pandas DataFrame.")
        logger.info("Interpreting empirical results...")
        master_summary['empirical_findings'] = _interpret_empirical_results(empirical_results)

    # --- Interpret Monte Carlo Results if provided ---
    if mc_results is not None:
        if not isinstance(mc_results, pd.DataFrame):
            raise TypeError("mc_results must be a pandas DataFrame.")
        if replication_config is None:
            raise ValueError("replication_config is required to interpret Monte Carlo results.")

        logger.info("Interpreting Monte Carlo simulation results...")
        # Extract the nominal size (tau) from the configuration for comparison.
        nominal_size = replication_config['monte_carlo']['test_config']['our_test']['de_randomization']['tau']
        master_summary['monte_carlo_findings'] = _interpret_mc_results(mc_results, nominal_size)

    if not master_summary:
        logger.warning("No results were provided to generate a summary.")
        return {"status": "No results to summarize."}

    logger.info("--- Final Summary Generation Complete ---")

    return master_summary


In [None]:
# Master Pipeline Function

def run_full_study(
    asset_returns: pd.DataFrame,
    factor_returns: pd.DataFrame,
    replication_config: Dict[str, Any],
    run_empirical: bool = True,
    run_monte_carlo: bool = True,
    n_jobs: int = -1,
    seed: int = 42
) -> Dict[str, Any]:
    """
    Executes the complete end-to-end research pipeline for the randomized alpha test.

    This master orchestrator function serves as the main entry point for the
    entire study. It manages the execution of both the empirical analysis on the
    provided data and the comprehensive Monte Carlo simulations to evaluate the
    test's statistical properties.

    Args:
        asset_returns (pd.DataFrame): Raw DataFrame of asset returns.
        factor_returns (pd.DataFrame): Raw DataFrame of factor returns.
        replication_config (Dict[str, Any]): The complete study configuration.
        run_empirical (bool): If True, the empirical analysis will be run.
        run_monte_carlo (bool): If True, the Monte Carlo simulations will be run.
        n_jobs (int): The number of CPU cores to use for parallelizable tasks,
                      primarily the Monte Carlo simulations. -1 uses all available.
        seed (int): A master seed for all random operations to ensure the
                    entire study is fully reproducible.

    Returns:
        Dict[str, Any]: A dictionary containing the key outputs from each major
                        stage of the research pipeline, including DataFrames of
                        results, generated plots, and summary interpretations.
    """
    # Initialize the dictionary to hold all outputs of the study.
    study_outputs = {}

    # --- I. EMPIRICAL ANALYSIS ---
    if run_empirical:
        logger.info("="*80)
        logger.info("STARTING: EMPIRICAL ANALYSIS")
        logger.info("="*80)
        try:
            # Task 9: Instantiate and run the empirical orchestrator.
            # This single object handles validation, cleaning, setup, and execution.
            empirical_orchestrator = EmpiricalAnalysisOrchestrator(
                asset_returns=asset_returns,
                factor_returns=factor_returns,
                replication_config=replication_config,
                seed=seed
            )
            empirical_results_df = empirical_orchestrator.run_analysis()
            study_outputs['empirical_timeseries_results'] = empirical_results_df

            # Task 11: Compile the empirical results into a summary table.
            empirical_summary_table = compile_empirical_results(
                empirical_results_df, replication_config
            )
            study_outputs['empirical_summary_table'] = empirical_summary_table

            # Task 12: Visualize the empirical results.
            fig, ax = plot_q_statistic_time_series(
                empirical_results_df, replication_config
            )
            study_outputs['empirical_q_statistic_plot'] = (fig, ax)

            logger.info("EMPIRICAL ANALYSIS COMPLETED SUCCESSFULLY.")

        except Exception as e:
            # Handle any failure during the empirical analysis stage.
            logger.error(f"Empirical analysis failed: {e}", exc_info=True)
            study_outputs['empirical_analysis_error'] = str(e)
    else:
        logger.info("Skipping Empirical Analysis as per configuration.")

    # --- II. MONTE CARLO SIMULATION ---
    if run_monte_carlo:
        logger.info("="*80)
        logger.info("STARTING: MONTE CARLO SIMULATION")
        logger.info("="*80)
        try:
            # Task 16: Instantiate and run the Monte Carlo orchestrator.
            mc_orchestrator = MonteCarloOrchestrator(
                replication_config=replication_config,
                seed=seed
            )
            mc_results_df = mc_orchestrator.run(n_jobs=n_jobs)
            study_outputs['monte_carlo_raw_results'] = mc_results_df

            # Task 18 (Part 1): Compile MC results into summary tables.
            mc_summary_tables = {}
            for scenario in replication_config['monte_carlo']['run_controls']['scenarios']:
                mc_summary_tables[scenario] = create_mc_results_table(
                    mc_results_df, scenario_filter=scenario
                )
            study_outputs['monte_carlo_summary_tables'] = mc_summary_tables

            logger.info("MONTE CARLO SIMULATION COMPLETED SUCCESSFULLY.")

        except Exception as e:
            # Handle any failure during the Monte Carlo simulation stage.
            logger.error(f"Monte Carlo simulation failed: {e}", exc_info=True)
            study_outputs['monte_carlo_simulation_error'] = str(e)
    else:
        logger.info("Skipping Monte Carlo Simulation as per configuration.")

    # --- III. FINAL INTERPRETATION ---
    logger.info("="*80)
    logger.info("STARTING: FINAL RESULTS INTERPRETATION")
    logger.info("="*80)
    try:
        # Task 19: Generate a quantitative summary of all available results.
        final_summary = generate_results_summary(
            empirical_results=study_outputs.get('empirical_summary_table', pd.DataFrame()).style.data,
            mc_results=study_outputs.get('monte_carlo_raw_results'),
            replication_config=replication_config
        )
        study_outputs['final_quantitative_summary'] = final_summary
        logger.info("FINAL INTERPRETATION COMPLETED SUCCESSFULLY.")
    except Exception as e:
        # Handle any failure during the final summary generation.
        logger.error(f"Final summary generation failed: {e}", exc_info=True)
        study_outputs['summary_generation_error'] = str(e)

    # Return the comprehensive dictionary of all study artifacts.
    return study_outputs
