# `README.md`

# Cryptocurrencies and Interest Rates: Inferring Yield Curves in a Bondless Market

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Type Checking: mypy](https://img.shields.io/badge/type_checking-mypy-blue)](http://mypy-lang.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2509.03964-b31b1b.svg)](https://arxiv.org/abs/2509.03964)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/crypto_currencies_interest_rates)
[![Discipline](https://img.shields.io/badge/Discipline-Mathematical%20Finance-blue)](https://github.com/chirindaopensource/crypto_currencies_interest_rates)
[![Methodology](https://img.shields.io/badge/Methodology-Robust%20Statistics%20%26%20Derivatives%20Pricing-orange)](https://github.com/chirindaopensource/crypto_currencies_interest_rates)
[![Data Source](https://img.shields.io/badge/Data-Deribit%20%7C%20Aave-lightgrey)](https://www.deribit.com/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Statsmodels](https://img.shields.io/badge/Statsmodels-15588D.svg?style=flat)](https://www.statsmodels.org/stable/index.html)
[![PyYAML](https://img.shields.io/badge/PyYAML-4B5F6E.svg?style=flat)](https://pyyaml.org/)

--

**Repository:** `https://github.com/chirindaopensource/crypto_currencies_interest_rates`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Cryptocurrencies and Interest Rates: Inferring Yield Curves in a Bondless Market"** by:

*   Philippe Bergault
*   Sébastien Bieber
*   Olivier Guéant
*   Wenkai Zhang

The project provides a complete, end-to-end computational framework for constructing cryptocurrency yield curves from derivatives market data. It delivers a modular, auditable, and extensible pipeline that replicates the paper's entire workflow: from rigorous data validation and cleansing, through robust outlier detection via RANSAC, to the closed-form analytical estimation of zero-coupon bond prices and their conversion to a continuous, daily time series of interest rates.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: run_full_analysis](#key-callable-run_full_analysis)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "Cryptocurrencies and Interest Rates: Inferring Yield Curves in a Bondless Market." The core of this repository is the iPython Notebook `crypto_currencies_interest_rates_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation to the final generation and analysis of yield curve time series.

The paper addresses a fundamental challenge in decentralized finance: how to determine a term structure of interest rates for a currency that lacks a traditional, liquid bond market. This codebase operationalizes the paper's derivatives-based approach, allowing users to:
-   Rigorously validate and cleanse high-frequency options market data.
-   Apply the Random Sample Consensus (RANSAC) algorithm to robustly filter outliers from option price data based on a modified put-call parity relationship.
-   Solve a weighted least-squares problem with a closed-form analytical solution to jointly estimate the prices of synthetic zero-coupon bonds in both the cryptocurrency and a reference currency (USD).
-   Convert these bond prices into annualized, continuously compounded interest rates.
-   Aggregate and interpolate these granular estimates to construct a continuous, daily time series of the yield curve for standardized tenors.
-   Perform a full suite of robustness checks, including parameter sensitivity analysis and bootstrap confidence intervals, to validate the stability and precision of the estimates.
-   Conduct economic validation by comparing the derived rates to external benchmarks and performing formal econometric tests.

## Theoretical Background

The implemented methods are grounded in Arbitrage Pricing Theory, Financial Engineering, and Robust Statistics.

**1. Modified Put-Call Parity for Inverse Options:**
The cornerstone of the methodology is a static replication argument. A specific portfolio of an inverse call, an inverse put, and an inverse future on a cryptocurrency `C` creates a deterministic payoff in a reference currency (USD). Under the no-arbitrage principle, the cost of this portfolio today must equal the discounted value of its future payoff. This leads to the central pricing relationship, a modified form of put-call parity:
$$ C_t(K, T) - P_t(K, T) = \frac{F_t(T) - K}{S_t} \cdot ZC_t^{ref}(T) $$
where $ZC_t^{ref}(T)$ is the unknown price of a zero-coupon bond in the reference currency.

**2. Robust Regression with RANSAC:**
Market data is noisy and contains outliers. To robustly identify the underlying relationship in the data before estimation, the paper uses the Random Sample Consensus (RANSAC) algorithm. This iterative method fits a model to minimal subsets of the data to find the largest "consensus set" of inliers, effectively isolating and ignoring outliers. This is applied to a linear relationship derived from put-call parity for bid-ask prices.

**3. Joint Weighted Least-Squares Estimation:**
The prices of the crypto zero-coupon bond ($ZC_t^C(T) = \alpha_t$) and the reference currency zero-coupon bond ($ZC_t^{ref}(T) = \beta_t$) are estimated by jointly minimizing the errors in two parity relationships across all inlier strike prices for a given expiry. This is formulated as the minimization of the objective function:
$$ \arg\min_{\alpha_t, \beta_t} \sum_{i=1}^{n_T} \left(y_t^i - \left(\alpha_t - \frac{K_i}{S_t} \beta_t\right)\right)^2 + \lambda_n^T \left(\alpha_t - \frac{F_t}{S_t} \beta_t\right)^2 $$
The paper provides a closed-form analytical solution to this problem, which is implemented directly.

## Features

The provided iPython Notebook (`crypto_currencies_interest_rates_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Task-Based Architecture:** The entire pipeline is broken down into 22 distinct, modular tasks, from data validation to economic analysis.
-   **Professional-Grade Data Validation:** A comprehensive validation suite ensures all inputs (data and configurations) conform to the required schema before execution.
-   **Auditable Data Filtering:** A multi-stage filtering pipeline for maturity and liquidity, returning detailed diagnostic reports at each step.
-   **From-Scratch RANSAC Implementation:** A numerically stable, from-scratch implementation of the RANSAC algorithm for maximum control and fidelity to the paper's methodology.
-   **High-Fidelity Analytical Solver:** A direct and numerically robust implementation of the closed-form solution for the zero-coupon bond prices.
-   **Complete Term Structure Construction:** A full suite of functions for temporal aggregation, maturity interpolation, and time-series gap-filling to construct a continuous daily yield curve panel.
-   **Advanced Robustness Toolkit:**
    -   A parallelized framework for conducting **parameter sensitivity analysis** across a grid of hyperparameter values.
    -   A parallelized framework for constructing **bootstrap confidence intervals** to quantify the statistical uncertainty of the estimates.
-   **Econometric Validation Suite:** Functions to perform economic plausibility checks, compare results to external benchmarks, and conduct formal statistical tests (ADF, Cointegration).

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Validation and Filtering (Tasks 1-6):** Ingests and rigorously validates the raw data and `config.yaml` file, then filters the data based on maturity, liquidity, and the existence of complete call-put pairs.
2.  **RANSAC Outlier Rejection (Tasks 7-9):** Prepares the data for RANSAC, executes the algorithm for each timestamp-expiry group, and validates the results to find a clean set of inlier options.
3.  **Closed-Form Estimation (Tasks 10-12):** Constructs feature vectors from the inlier data and applies the analytical solution to estimate the zero-coupon bond prices (`α*`, `β*`).
4.  **Rate Conversion and Validation (Tasks 13-14):** Converts the bond prices to continuously compounded interest rates and performs final economic plausibility checks.
5.  **Time Series Construction (Tasks 15-18):** Aggregates the granular estimates into a daily panel, interpolates across maturities to standardized tenors, and fills temporal gaps to create a continuous time series.
6.  **Robustness and Economic Analysis (Tasks 20-22):** Executes the optional, advanced analyses for parameter sensitivity, bootstrap CIs, and economic validation.

## Core Components (Notebook Structure)

The `crypto_currencies_interest_rates_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: run_full_analysis

The central function in this project is `run_full_analysis`. It orchestrates the entire analytical workflow, providing a single entry point for running the baseline study replication and the advanced robustness checks.

```python
def run_full_analysis(
    df_master: pd.DataFrame,
    fused_input_state: Dict[str, Any],
    parameter_grid: Optional[Dict[str, List[Any]]] = None,
    bootstrap_group: Optional[Tuple] = None,
    n_bootstrap: int = 1000,
    benchmark_df: Optional[pd.DataFrame] = None,
    verbose: bool = False
) -> Dict[str, Any]:
    """
    Executes the entire end-to-end yield curve analysis.
    """
    # ... (implementation is in the notebook)
```

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `statsmodels`, `pyyaml`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/crypto_currencies_interest_rates.git
    cd crypto_currencies_interest_rates
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy statsmodels pyyaml
    ```

## Input Data Structure

The pipeline requires two primary inputs:
1.  **`df_master`:** A `pandas.DataFrame` containing high-frequency options market data. It **must** have a `MultiIndex` with the levels `['timestamp', 'asset', 'expiry_date', 'strike_price', 'option_type']` and the required data columns.
2.  **`fused_input_state`:** A Python dictionary loaded from the `config.yaml` file, which controls all hyperparameters of the pipeline.

A mock data generation code block is provided in the main notebook to create a valid example `df_master` for testing the pipeline.

## Usage

The `crypto_currencies_interest_rates_draft.ipynb` notebook provides a complete, step-by-step guide. The core workflow is:

1.  **Prepare Inputs:** Load your `df_master` DataFrame. Ensure the `config.yaml` file is present and configured.
2.  **Execute Pipeline:** Call the grand orchestrator function.

    ```python
    # This single call runs the entire project.
    full_report = run_full_analysis(
        df_master=my_market_data_df,
        fused_input_state=my_config_dict,
        verbose=True
    )
    ```
3.  **Inspect Outputs:** Programmatically access any result from the returned dictionary. For example, to view the final yield curve time series:
    ```python
    final_yield_curves = full_report['core_pipeline_results']['yield_curves_df']
    print(final_yield_curves.head())
    ```

## Output Structure

The `run_full_analysis` function returns a single, comprehensive dictionary containing all generated artifacts, including:
-   `core_pipeline_results`: A dictionary containing the final `yield_curves_df` and a nested dictionary of diagnostics from every step of the main pipeline run.
-   `sensitivity_analysis`: (Optional) A dictionary containing summary DataFrames for rate and coverage stability.
-   `bootstrap_analysis`: (Optional) A dictionary with the point estimate and confidence intervals for the targeted group.
-   `economic_validation`: (Optional) A dictionary with the results of the theoretical, benchmark, and statistical tests.

## Project Structure

```
crypto_currencies_interest_rates/
│
├── crypto_currencies_interest_rates_draft.ipynb   # Main implementation notebook
├── config.yaml                                    # Master configuration file
├── requirements.txt                               # Python package dependencies
├── LICENSE                                        # MIT license file
└── README.md                                      # This documentation file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can easily modify all hyperparameters, such as filter thresholds, RANSAC parameters, and target tenors, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:

-   **Alternative Interpolation Methods:** Implementing more advanced term structure interpolation methods, such as cubic splines or the Nelson-Siegel model, as alternatives to linear interpolation.
-   **Generalization to Other Derivatives:** Extending the framework to incorporate data from other relevant derivatives, such as perpetual futures or different option structures.
-   **Automated Reporting:** Creating a function that takes the final `full_analysis_report` dictionary and generates a full PDF or HTML report summarizing the findings with plots and tables.
-   **Live Data Integration:** Building a data ingestion module to connect the pipeline to a live market data feed from an exchange API for real-time yield curve monitoring.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{bergault2025cryptocurrencies,
  title={{Cryptocurrencies and Interest Rates: Inferring Yield Curves in a Bondless Market}},
  author={Bergault, Philippe and Bieber, S{\'e}bastien and Gu{\'e}ant, Olivier and Zhang, Wenkai},
  journal={arXiv preprint arXiv:2509.03964},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Python Implementation for Inferring Cryptocurrency Yield Curves.
GitHub repository: https://github.com/chirindaopensource/crypto_currencies_interest_rates
```

## Acknowledgments

-   Credit to **Philippe Bergault, Sébastien Bieber, Olivier Guéant, and Wenkai Zhang** for their foundational research, which forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, and Statsmodels**, whose work makes complex computational finance accessible and robust.

--

*This README was generated based on the structure and content of `crypto_currencies_interest_rates_draft.ipynb` and follows best practices for research software documentation.*

# Paper

Title: "*Cryptocurrencies and Interest Rates: Inferring Yield Curves in a Bondless Market*"

Authors: Philippe Bergault, Sébastien Bieber, Olivier Guéant, Wenkai Zhang

E-Journal Submission Date: 4 September 2025

Link: https://arxiv.org/abs/2509.03964

Abstract:

In traditional financial markets, yield curves are widely available for countries (and, by extension, currencies), financial institutions, and large corporates. These curves are used to calibrate stochastic interest rate models, discount future cash flows, and price financial products. Yield curves, however, can be readily computed only because of the current size and structure of bond markets. In cryptocurrency markets, where fixed-rate lending and bonds are almost nonexistent as of early 2025, the yield curve associated with each currency must be estimated by other means. In this paper, we show how mathematical tools can be used to construct yield curves for cryptocurrencies by leveraging data from the highly developed markets for cryptocurrency derivatives.

# Summary

### **Summary of "Cryptocurrencies and Interest Rates: Inferring Yield Curves in a Bondless Market"**

This paper presents a novel framework for constructing yield curves for cryptocurrencies and their associated reference currencies (in this case, a form of digital USD) in a market environment that completely lacks traditional fixed-income instruments like government or corporate bonds.

--

#### **The Fundamental Problem and Motivation**

In traditional finance ("TradFi"), the term structure of interest rates, or the yield curve, is the bedrock of valuation. It tells us the "price of money" over different time horizons. We typically derive it by "bootstrapping" from the prices of default-free government bonds of various maturities. This curve is essential for:
*   Discounting future cash flows to find their present value.
*   Pricing all manner of derivatives.
*   Calibrating stochastic interest rate models for risk management.

The cryptocurrency ecosystem, however, has a structural void: there is no sovereign issuer of crypto-denominated bonds. As of early 2025, a liquid market for fixed-rate, fixed-term debt in assets like Bitcoin (BTC) or Ethereum (ETH) is nonexistent. This raises a critical question: **How can one rationally discount a future cash flow denominated in BTC or ETH?**

The authors' central thesis is that while a bond market is absent, a highly liquid and sophisticated *derivatives* market has emerged. Their work proposes to use the prices from this market to reverse-engineer the very interest rate information that is normally a prerequisite for pricing those derivatives.

The paper also provides an excellent, detailed historical overview of interest rates, from ancient Sumeria to the modern era. The key takeaway from this section is that the existence of a clean, easily observable yield curve is a very recent historical development, contingent on the rise of standardized government debt. Cryptocurrencies, in a sense, return us to a state where the term structure is not directly observable and must be inferred.

--

#### **The Proposed Methodology - A Return to First Principles**

The core of the paper's methodology is a clever adaptation of one of the most fundamental no-arbitrage relationships in finance: **call-put parity**.

1.  **Revisiting Call-Put Parity for Inverse Options:** The authors focus on the Deribit exchange, the dominant platform for crypto options. Crucially, Deribit lists *inverse options*.
    *   A standard (linear) option has a strike price `K` and payoff denominated in the reference currency (e.g., USD).
    *   An inverse option has a strike `K` in USD, but its payoff is delivered in the underlying cryptocurrency (e.g., BTC). The payoff for an inverse call is `max(0, 1/K - 1/S_T)` in USD terms, or `max(0, S_T/K - 1)` in BTC terms.

    By constructing a portfolio of a long inverse call, a short inverse put, and a short inverse futures contract (all with the same strike `K` and maturity `T`), the authors show that the terminal payoff is a deterministic amount of the *reference currency* (USD). This leads to a modified call-put parity relationship:

    `C_t(K,T) – P_t(K,T) = (F_t(T) - K)/S_t * ZC_t^{ref}(T)`

    Where:
    *   `C_t` and `P_t` are the call and put prices in the crypto asset (e.g., BTC).
    *   `F_t(T)` is the futures price.
    *   `S_t` is the spot price.
    *   `ZC_t^{ref}(T)` is the price of a zero-coupon bond in the **reference currency (USD)** maturing at `T`. This is the first quantity they want to find.

2.  **Linking Reference and Crypto Yield Curves:** Next, they use another no-arbitrage argument involving a futures contract to link the USD zero-coupon bond to the cryptocurrency zero-coupon bond. This yields the second key relationship:

    `ZC_t^{crypto}(T) = (F_t(T)/S_t) * ZC_t^{ref}(T)`

    Where `ZC_t^{crypto}(T)` is the price of a zero-coupon bond in the **cryptocurrency** itself. This is the second, and ultimate, quantity of interest.

--

#### **Econometric Estimation and Data Filtering**

The theoretical parity relationships are elegant, but real-world market data is noisy, subject to bid-ask spreads, and contains pricing errors. A naive application would fail. The authors implement a robust two-stage estimation procedure:

1.  **Robust Data Filtering with RANSAC:** Before estimation, they must clean the data. They employ a powerful algorithm from computer vision and statistics called **Random Sample Consensus (RANSAC)**. This iterative method fits a model to random subsets of the data to identify a consensus set of "inliers" that conform to the expected theoretical relationship, while systematically rejecting outliers. This is a critical step for dealing with the often-erratic nature of crypto market data.

2.  **Joint Minimization:** For a given maturity `T`, there are dozens of options with different strikes `K`. The authors formulate a weighted least-squares problem. They simultaneously estimate the two unknown zero-coupon bond prices (`ZC^{ref}` and `ZC^{crypto}`) by minimizing the sum of squared errors across all available option strikes and the futures pricing relationship. This joint estimation provides more stable and robust estimates than trying to solve for the rates from a single instrument.

--

#### **Empirical Results and Key Findings**

The authors apply this methodology to hourly top-of-book data from Deribit for BTC and ETH options from January 2022 to December 2024. Their findings are compelling:

1.  **Deribit USD Yield Curve:** They successfully extract a dynamic, time-varying yield curve for USD as priced on the Deribit platform. This "Deribit USD" rate is not the same as the US Treasury rate; it implicitly contains the credit risk of the Deribit platform. The rates are shown to be economically sensible, rising during periods of market stress and exhibiting a general correlation with rates on decentralized lending platforms like Aave.

2.  **Bitcoin (BTC) Implied Yield Curve:** The implied interest rates for BTC are consistently found to be very close to zero across all maturities. This aligns perfectly with economic intuition. BTC is a non-productive asset; it has no native yield mechanism (like staking). Holding it is analogous to holding digital gold, which also has a near-zero (or even negative) interest rate.

3.  **Ethereum (ETH) Implied Yield Curve (The Puzzle):** This is the most surprising result. The implied interest rates for ETH are *also* found to be near-zero. This is puzzling because, unlike Bitcoin, Ethereum has a significant native staking yield (typically 2-4%) for validators who secure the network. One would expect the derivatives market to price this in, leading to a consistently positive interest rate. The authors acknowledge this discrepancy, suggesting it as a key area for future research. It may point to market segmentation, where derivatives traders do not fully incorporate on-chain yield opportunities into their pricing.


#### **Conclusion and Implications**

This paper makes a significant contribution by providing the first systematic, theoretically-grounded framework for estimating yield curves in a bondless market.

*   **Main Contribution:** It demonstrates that the fundamental principles of no-arbitrage pricing can be adapted to the unique structure of cryptocurrency markets to extract vital economic information that is not directly observable.
*   **Key Insight:** The derived interest rates are not universal "risk-free" rates but are inherently **platform-specific**. The "Deribit USD" rate reflects USD interest plus Deribit's credit risk. This opens the door to analyzing the spread between rates inferred from different platforms (e.g., Deribit vs. Binance) as a market-implied measure of relative platform risk.
*   **Future Work:** The "ETH staking puzzle" remains the most significant open question. Resolving it will likely yield deeper insights into the efficiency and integration of on-chain and off-chain crypto financial markets.

In essence, the paper provides a crucial toolkit for bringing rigorous quantitative analysis to the pricing and risk management of crypto assets, moving the field beyond speculative metrics and toward a more mature, valuation-driven framework.

# Import Essential Modules


In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  Cryptocurrencies and Interest Rates: Inferring Yield Curves in a Bondless Market
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Cryptocurrencies and Interest Rates:
#  Inferring Yield Curves in a Bondless Market" by Bergault, Bieber, Guéant,
#  and Zhang (2025). It delivers a robust system for constructing cryptocurrency
#  yield curves from derivatives data in the absence of a traditional bond market,
#  enabling consistent pricing, risk management, and the development of more
#  sophisticated financial instruments in the digital asset ecosystem.
#
#  Core Methodological Components:
#  • Synthetic Zero-Coupon Bond Replication via Inverse Options and Futures.
#  • Modified Call-Put Parity relationship for inverse derivative contracts.
#  • Robust Outlier Rejection using the Random Sample Consensus (RANSAC) algorithm.
#  • Joint Weighted Least-Squares Estimation with a closed-form analytical solution.
#  • Conversion of zero-coupon bond prices to continuously compounded interest rates.
#  • Temporal aggregation and interpolation to construct a continuous term structure.
#
#  Technical Implementation Features:
#  • Comprehensive, multi-stage data validation (structural, economic, completeness).
#  • Modular, auditable pipeline with detailed diagnostics at each stage.
#  • Numerically stable implementation of the RANSAC algorithm and analytical solver.
#  • Parallelized frameworks for computationally intensive robustness analyses,
#    including parameter sensitivity and bootstrap confidence intervals.
#  • Suite of economic validation and econometric tests for final model assessment.
#
#  Paper Reference:
#  Bergault, P., Bieber, S., Guéant, O., & Zhang, W. (2025). Cryptocurrencies
#  and Interest Rates: Inferring Yield Curves in a Bondless Market.
#  arXiv preprint arXiv:2509.03964.
#  https://arxiv.org/abs/2509.03964
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# ==============================================================================#
# Consolidated Imports for the End-to-End Yield Curve Pipeline
# ==============================================================================#

# --- Standard Library Imports ---
# For robust type hinting and annotation across all functions.
from typing import List, Dict, Any, Set, Tuple, Optional, Union
# For creating detailed diagnostic reports on rejected items.
from collections import Counter
# For creating deep copies of configuration objects in parallel processing.
import copy
# For generating parameter grids for sensitivity analysis.
import itertools
# For robust floating-point comparisons.
import math
# For parallel execution of computationally intensive tasks.
from multiprocessing import Pool, cpu_count
# For pretty-printing diagnostic dictionaries.
from pprint import pprint


# --- Third-Party Library Imports ---
# The core library for numerical operations, array manipulation, and linear algebra.
import numpy as np
# The core library for data manipulation, time-series analysis, and data structures.
import pandas as pd
# Provides robust, specific functions for data type validation.
import pandas.api.types as ptypes
# The primary library for advanced statistical and econometric testing.
from statsmodels.tsa.stattools import adfuller, coint


# Implementation

## Draft 1

### **Functional Decomposition of Key Pipeline Orchestrators using the Inputs-Processes-Outputs (IPO) Model**

#### **Task 1: `validate_dataframe_structure`**

*   **Inputs:** A `pandas.DataFrame` (`df_master`) intended for analysis.
*   **Processes:**
    1.  **MultiIndex Validation:** Systematically inspects the DataFrame's index to confirm it is a `MultiIndex` with exactly 5 levels named `['timestamp', 'asset', 'expiry_date', 'strike_price', 'option_type']` in that order. It verifies the `dtype` of each level (e.g., `timestamp` is `datetime64`) and the values within categorical levels (e.g., `asset` is in `{'BTC', 'ETH'}`).
    2.  **Column Validation:** Checks for the presence and correct `dtype` of all required data columns (`instrument_name`, `best_bid_price`, etc.). It further validates that all numeric columns contain only finite, positive values.
    3.  **Economic Constraint Validation:** Performs vectorized checks across the DataFrame to enforce fundamental market logic: `best_ask_price >= best_bid_price` and `expiry_date > timestamp`. It also validates timezone consistency between the temporal index levels.
*   **Transformation:** This function is a pure validation gate; it does not transform the data. It either passes the input DataFrame through silently or raises a `ValueError` if any check fails, preventing corrupted or malformed data from entering the pipeline.
*   **Outputs:** `None` on success, or raises a `ValueError` on failure.
*   **Role in Research Pipeline:** This callable implements the foundational data integrity checks that are a prerequisite for any quantitative analysis. While not tied to a specific equation, it ensures the raw data conforms to the structural assumptions upon which all subsequent mathematical operations are built. It is the first line of defense against "garbage in, garbage out."

#### **Task 2: `validate_hyperparameter_dictionary`**

*   **Inputs:** A Python dictionary (`fused_input_state`) containing nested `metadata` and `hyperparameters` dictionaries.
*   **Processes:**
    1.  **Structural Validation:** Confirms the presence of the top-level `metadata` and `hyperparameters` keys.
    2.  **Schema-Driven Validation:** Iterates through a predefined internal schema that specifies the required type, and value constraints (e.g., range, exact value, non-empty list) for every single parameter. It checks each parameter in the input dictionary against these rules.
    3.  **Unrecognized Parameter Check:** Identifies any keys present in the input dictionary that are not defined in the schema, flagging them as potential typos.
*   **Transformation:** This is a pure validation gate. It does not transform the input dictionary.
*   **Outputs:** `None` on success, or raises a `ValueError` on failure.
*   **Role in Research Pipeline:** This callable ensures the reproducibility and correctness of the entire study by validating the parameters that govern its execution. It operationalizes the specific hyperparameter values mentioned throughout the paper, such as the RANSAC thresholds from Section 4.2. For example, it validates:
    *   `ransac_residual_sq_threshold = 0.004`
    *   `ransac_slope_tolerance_pct = 0.10`

#### **Task 3: `validate_data_completeness`**

*   **Inputs:** A structurally valid `pandas.DataFrame` (`df_master`).
*   **Processes:**
    1.  **Temporal Coverage Analysis:** Calculates the number of unique timestamps and the maximum time gap between any two consecutive timestamps, comparing them against minimum density thresholds (e.g., >= 100 timestamps, max gap <= 7 days).
    2.  **Cross-Sectional Coverage Analysis:** For each estimation group (`timestamp`, `asset`, `expiry_date`), it validates that there is a sufficient number of call-put pairs (>= 5), that the available strikes cover a required moneyness range ([0.7, 1.3]), and that a valid futures price is present.
    3.  **Market Data Quality Assessment:** Scans the entire dataset for quality red flags like zero prices, inverted spreads, and extreme relative spreads (>100%).
*   **Transformation:** This is a pure validation gate. It does not transform the data.
*   **Outputs:** `None` on success, or raises a `ValueError` on failure.
*   **Role in Research Pipeline:** This callable enforces the implicit assumption of the paper that the data is sufficiently dense and of high enough quality to support the estimation. A sparse or low-quality dataset would lead to statistically unreliable estimates, and this function prevents the pipeline from running on such data.

#### **Task 4: `filter_by_maturity`**

*   **Inputs:** A `pandas.DataFrame` and an integer `maturity_filter_days`.
*   **Processes:**
    1.  Calculates the time-to-expiry in days for each option row by subtracting the `timestamp` from the `expiry_date`.
    2.  Creates a boolean mask by comparing each option's time-to-expiry against the `maturity_filter_days` threshold.
    3.  Applies the mask to the DataFrame, discarding all rows that do not meet the minimum maturity requirement.
*   **Transformation:** This function transforms the input DataFrame by filtering out rows, resulting in an output DataFrame with fewer rows.
*   **Outputs:** A tuple containing the filtered `pandas.DataFrame` and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable directly implements the first data trimming step described in **Section 4.2**: *"First, options with maturities of less than one month are excluded."*

#### **Task 5: `filter_by_liquidity`**

*   **Inputs:** A `pandas.DataFrame` and a float `liquidity_filter_spread_pct`.
*   **Processes:**
    1.  Calculates the mid-price for each option: $Mid_i = \frac{Ask_i + Bid_i}{2}$.
    2.  Calculates the relative bid-ask spread for each option: $Spread_i = \frac{Ask_i - Bid_i}{Mid_i}$.
    3.  Creates a boolean mask by comparing each option's relative spread against the `liquidity_filter_spread_pct` threshold.
    4.  Applies the mask to the DataFrame, discarding all rows that exceed the maximum spread threshold.
*   **Transformation:** This function transforms the input DataFrame by filtering out rows, resulting in an output DataFrame with fewer rows.
*   **Outputs:** A tuple containing the filtered `pandas.DataFrame` and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable directly implements the second data trimming step described in **Section 4.2**: *"Second, we exclude data points with excessive bid-ask spreads..."*

#### **Task 6: `prepare_features_and_pairs`**

*   **Inputs:** A filtered `pandas.DataFrame`.
*   **Processes:**
    1.  **Feature Engineering:** Computes and adds two new columns to the DataFrame: `mid_price` and `moneyness`.
    2.  **Pairing Validation:** Rigorously filters the DataFrame to ensure that for every unique combination of `(timestamp, asset, expiry_date, strike_price)`, there exists exactly one `call` and one `put`. All unpaired options are discarded.
*   **Transformation:** This function transforms the DataFrame by adding columns and then filtering out rows, resulting in a DataFrame with a different shape and schema, ready for algorithmic processing.
*   **Outputs:** A tuple containing the analysis-ready `pandas.DataFrame` and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable prepares the clean data for the core algorithms. It computes the necessary features for the estimation, most notably the moneyness, $m_{i,t} = \frac{K_i}{S_t}$, which is a key variable in the final estimation equations.

#### **Task 7: `prepare_ransac_inputs`**

*   **Inputs:** The paired `pandas.DataFrame` from the previous task.
*   **Processes:**
    1.  **Pivoting:** Reshapes the data from a long format to a wide format, aligning the `best_bid_price` and `best_ask_price` for the call and put of each strike side-by-side.
    2.  **Vector Construction:** Performs vectorized calculations on the pivoted data to construct the `X` and `Y` vectors for the RANSAC regression.
    3.  **Grouping & Filtering:** Groups the data by `(timestamp, asset, expiry_date)`, filters out any groups with fewer than a minimum number of strikes, and packages the `X`, `Y`, and `strike` arrays for each valid group into a dictionary.
*   **Transformation:** This function transforms a single large DataFrame into a dictionary where keys are group identifiers and values are dictionaries of NumPy arrays. This is a major structural transformation.
*   **Outputs:** A tuple containing the dictionary of RANSAC inputs and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable directly prepares the inputs for the RANSAC algorithm as specified in **Equation (4)**. It computes:
    *   $X_i = C_{i,t}^{\text{ask}} - P_{i,t}^{\text{bid}}$
    *   $Y_i = P_{i,t}^{\text{ask}} - C_{i,t}^{\text{bid}}$
    These vectors form the basis of the linear relationship $Y = \zeta + \xi X + \eta$ that RANSAC will fit.

#### **Task 8: `execute_ransac_on_all_groups`**

*   **Inputs:** The dictionary of RANSAC inputs from the previous task and RANSAC hyperparameters.
*   **Processes:**
    1.  Iterates through each estimation group in the input dictionary.
    2.  For each group, it executes the full RANSAC algorithm:
        *   Repeatedly draws a minimal random sample of points.
        *   Fits a linear model to the sample.
        *   Calculates the size of the consensus set (inliers) for that model.
        *   Keeps track of the model with the largest consensus set.
    3.  After all iterations, it performs a final linear regression fit on the best inlier set to get the final robust model parameters.
*   **Transformation:** This function transforms the dictionary of input arrays into a dictionary of result objects, where each result contains the fitted model parameters (`slope`, `intercept`) and the boolean mask identifying the inliers.
*   **Outputs:** A tuple containing the dictionary of `RansacResult` objects and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable is the implementation of the **Random Sample Consensus (RANSAC)** algorithm, a core component of the paper's robust statistics methodology described in **Section 4**. It is designed to find the parameters $(\zeta_t, \xi_t)$ for the model $Y = \zeta_t + \xi_t X + \eta_t$ in a way that is insensitive to outliers.

#### **Task 9: `validate_and_filter_ransac_results`**

*   **Inputs:** The dictionary of `RansacResult` objects and validation hyperparameters.
*   **Processes:**
    1.  Iterates through each RANSAC result.
    2.  Applies a series of validation checks to each result:
        *   **Slope Check:** Verifies that the estimated slope `ξ̂` is close to its theoretical value of -1, i.e., $|\hat{\xi} + 1| \leq \text{tolerance}$.
        *   **Intercept Check:** Verifies that the intercept `ζ̂` is positive.
        *   **Inlier Count Check:** Verifies that the final model is supported by a minimum number of inlier points.
    3.  Filters out any results that fail one or more checks.
*   **Transformation:** This function transforms the input dictionary of results by filtering it, producing a smaller output dictionary containing only the validated results.
*   **Outputs:** A tuple containing the dictionary of validated `RansacResult` objects and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable implements the crucial diagnostic check on the RANSAC output, as described in **Section 4.2**: *"we discard all observations for that pair (t, T) if the estimated slope ξt deviates significantly from −1."*

#### **Task 10: `construct_feature_vectors`**

*   **Inputs:** The original paired DataFrame and the dictionary of validated RANSAC results.
*   **Processes:**
    1.  Iterates through the validated RANSAC results.
    2.  For each valid group, it uses the `inlier_mask` to select only the inlier options from the original DataFrame.
    3.  On this inlier-only subset, it computes the final feature vectors for the least-squares estimation: the call-put mid-price difference vector `y` and the moneyness vector `m`.
    4.  It also extracts the constant `futures_price` and `spot_price` for the group.
*   **Transformation:** This function transforms the validated RANSAC results (which are just model parameters and masks) back into a set of clean, numerical feature vectors, ready for the final estimation.
*   **Outputs:** A tuple containing a dictionary of `FeatureVectorGroup` objects and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable prepares the final, clean inputs for the weighted least-squares estimation. It constructs the vectors that directly populate the estimation equations, namely:
    *   $y_t^i = C_t(K^i,T) - P_t(K^i,T)$ (using mid-prices)
    *   $m_t^i = \frac{K^i}{S_t}$

#### **Task 11: `compute_summations_for_all_groups`**

*   **Inputs:** The dictionary of `FeatureVectorGroup` objects and the `estimation_lambda_weight` hyperparameter.
*   **Processes:**
    1.  Iterates through each feature vector group.
    2.  For each group, it computes the five scalar summations and one ratio required for the closed-form solution.
*   **Transformation:** This function transforms the dictionary of vector inputs into a dictionary of scalar inputs for the final solver.
*   **Outputs:** A tuple containing a dictionary of `SummationGroup` objects and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable computes the scalar components of the closed-form solution presented in the unnumbered equations following **Equation (3)** in **Section 3.3**. It calculates:
    *   $S_1 = \sum_{i=1}^{n_t^T} y_t^i$
    *   $S_2 = \sum_{i=1}^{n_t^T} m_t^i$
    *   $S_3 = \sum_{i=1}^{n_t^T} (m_t^i)^2$
    *   $S_4 = \sum_{i=1}^{n_t^T} m_t^i y_t^i$
    *   $\lambda_{n_t^T}$ (referred to as `lambda_value`)
    *   $F_t / S_t$

#### **Task 12: `compute_closed_form_solutions`**

*   **Inputs:** The dictionary of `SummationGroup` objects.
*   **Processes:**
    1.  Iterates through each summation group.
    2.  For each group, it first calculates the determinant `d` of the 2x2 linear system.
    3.  It checks if the system is solvable (i.e., `abs(d)` is greater than a small tolerance).
    4.  If solvable, it computes the zero-coupon bond prices `α*` and `β*` using the closed-form formulas.
    5.  It performs a final check to ensure the computed prices are positive.
*   **Transformation:** This function transforms the dictionary of scalar inputs into a dictionary of solution results (the estimated bond prices).
*   **Outputs:** A tuple containing a dictionary of `SolutionResult` objects and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable is the direct implementation of the final closed-form solution for the zero-coupon bond prices, `α*` (crypto ZCB, `ZC_t^C(T)`) and `β*` (reference ZCB, `ZC_t^{ref}(T)`), presented in the unnumbered equations of **Section 3.3**:
    *   $d = (\lambda_{n_t^T} + S_3)(n_t^T + \lambda_{n_t^T} \frac{F_t^2}{S_t^2}) - (\lambda_{n_t^T} \frac{F_t}{S_t} + S_2)^2$
    *   $\alpha_t^* = \frac{1}{d}[(\lambda_{n_t^T} + S_3)S_1 - (\lambda_{n_t^T} \frac{F_t}{S_t} + S_2)S_4]$
    *   $\beta_t^* = \frac{1}{d}[(\lambda_{n_t^T} \frac{F_t}{S_t} + S_2)S_1 - (n_t^T + \lambda_{n_t^T} \frac{F_t^2}{S_t^2})S_4]$

#### **Task 13: `convert_all_solutions_to_rates`**

*   **Inputs:** The dictionary of `SolutionResult` objects and the `annualization_factor` hyperparameter.
*   **Processes:**
    1.  Iterates through each solution group.
    2.  For each group, it calculates the precise time-to-expiry in years (`T-t`).
    3.  It applies the logarithmic conversion formula to `α*` and `β*` to find the continuously compounded interest rates.
*   **Transformation:** This function transforms the estimated bond prices (discount factors) into annualized interest rates.
*   **Outputs:** A tuple containing a dictionary of `InterestRateResult` objects and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable implements the standard financial formula for converting a zero-coupon bond price (discount factor) to a continuously compounded rate, as referenced at the end of **Section 3.3**:
    *   $\hat{r}_t^{T-t,\text{crypto}} = -\frac{1}{T-t} \ln(\alpha_t^*)$
    *   $\hat{r}_t^{T-t,\text{ref}} = -\frac{1}{T-t} \ln(\beta_t^*)$

#### **Task 14: `validate_and_filter_rates`**

*   **Inputs:** The dictionary of `InterestRateResult` objects.
*   **Processes:**
    1.  Iterates through each interest rate result.
    2.  Applies a series of economic reasonableness checks based on predefined ranges for the crypto rate, the reference rate, and the spread between them.
    3.  Filters out any results that fall outside these plausible bounds.
*   **Transformation:** This function transforms the input dictionary of rates by filtering it, producing a smaller output dictionary of validated rates.
*   **Outputs:** A tuple containing the dictionary of validated `InterestRateResult` objects and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable serves as a final economic sanity check on the numerical outputs of the model, ensuring the results are plausible before they are used for any further analysis.

#### **Task 15: `aggregate_results_to_dataframe` (Remediated Version)**

*   **Inputs:** The dictionary of validated `InterestRateResult` objects (which now includes `n_contracts`).
*   **Processes:**
    1.  Iterates through the input dictionary.
    2.  For each item, it creates a "flattened" dictionary record containing the group identifiers (`timestamp`, `asset`, `expiry_date`) and all the results (`crypto_rate`, `ref_rate`, `n_contracts`, etc.).
    3.  Constructs a single `pandas.DataFrame` from the list of these records.
    4.  Enforces a standard column order and correct data types.
*   **Transformation:** This function performs a major structural transformation, converting the nested dictionary of results into a single, flat, tabular `pandas.DataFrame`.
*   **Outputs:** A `pandas.DataFrame`.
*   **Role in Research Pipeline:** This callable is a crucial data structuring step that prepares the granular, point-in-time estimates for the final phase of time-series analysis.

#### **Task 16: `aggregate_rates_daily`**

*   **Inputs:** The `pandas.DataFrame` of aggregated results from the previous task.
*   **Processes:**
    1.  Creates a `date` column and a `maturity_bucket` column.
    2.  Groups the DataFrame by `(date, asset, maturity_bucket)`.
    3.  For each group, it calculates the mean and standard deviation of the rates and counts the number of observations.
    4.  Filters out any aggregated points that are based on fewer than a minimum number of observations.
*   **Transformation:** This function transforms the high-frequency (e.g., hourly) data into a lower-frequency (daily) panel dataset, smoothing out intraday noise.
*   **Outputs:** A tuple containing the daily aggregated `pandas.DataFrame` and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable implements the temporal aggregation step, a standard technique in time-series analysis for reducing noise and creating a dataset with a regular frequency, as discussed in **Section 4.1** regarding the choice to aggregate hourly observations into daily estimates.

#### **Task 17: `interpolate_yield_curves`**

*   **Inputs:** The daily aggregated DataFrame.
*   **Processes:**
    1.  For each `(date, asset)` group, it treats the available maturity bucket midpoints and their corresponding mean rates as points on a yield curve.
    2.  It uses linear interpolation to estimate the interest rate at specific, standardized target tenors (e.g., 90, 180, 360 days).
    3.  It handles cases where a target tenor is outside the available range by using flat extrapolation.
    4.  It creates a `status` flag to identify whether each point was interpolated or extrapolated.
*   **Transformation:** This function transforms the data from rates at discrete maturity *buckets* to rates at specific maturity *points*, creating a standardized term structure for each day.
*   **Outputs:** A tuple containing the final interpolated `pandas.DataFrame` (in wide format) and a diagnostic dictionary.
*   **Role in Research Pipeline:** This callable implements the maturity interpolation step, as referenced in **Section 4.3.2**: *"Each point on these curves is computed by applying our methodology..., followed by linear interpolation from the nearest available maturities corresponding to the target tenor."*

#### **Task 18: `construct_final_time_series`**

*   **Inputs:** The daily, maturity-interpolated DataFrame.
*   **Processes:**
    1.  For each asset, it creates a complete daily date range from the start to the end of the sample period.
    2.  It reindexes the data to this complete range, which makes any missing days explicit by filling them with `NaN`.
    3.  It uses limited linear interpolation to fill small gaps (<= 7 days) in the time series.
    4.  It creates a `source` column to provide an audit trail for each data point (observed, time-interpolated, etc.).
*   **Transformation:** This function transforms a potentially sparse daily time series into a dense, continuous daily time series, ready for econometric analysis.
*   **Outputs:** A tuple containing the final, dense `pandas.DataFrame` and a diagnostic dictionary.
*   **Role in Research Pipeline:** This is the final data construction step, creating the clean, continuous time series that are plotted in Figures 9 and 10 and used to generate the summary statistics in Tables 3, 4, and 5.

#### **Task 19: `orchestrate_yield_curve_estimation`**

*   **Inputs:** The raw `df_master` and the `fused_input_state` configuration.
*   **Processes:** Sequentially executes the orchestrator functions from Tasks 1 through 18, passing data and parameters between them and collecting diagnostics.
*   **Transformation:** Manages the entire end-to-end transformation of raw data into a final, clean yield curve time series.
*   **Outputs:** A tuple containing the final `pandas.DataFrame` and a master diagnostic dictionary.
*   **Role in Research Pipeline:** This is the master controller for the entire estimation methodology described in the paper.

#### **Task 20: `run_parameter_sensitivity_analysis`**

*   **Inputs:** The raw `df_master`, a base configuration, and a grid of parameter values to test.
*   **Processes:**
    1.  Generates all combinations of the specified hyperparameters.
    2.  For each combination, it executes the entire core pipeline (`orchestrate_yield_curve_estimation`) in parallel.
    3.  Aggregates the results from all runs.
    4.  Calculates robustness metrics, such as the standard deviation of the estimated rates across the different parameter runs.
*   **Transformation:** Transforms a single set of inputs into a distribution of outputs, which is then summarized to measure model stability.
*   **Outputs:** A tuple of summary DataFrames detailing rate stability and coverage stability.
*   **Role in Research Pipeline:** This is a meta-analysis function. It provides a framework for rigorously testing the robustness of the paper's results to the specific choices of hyperparameters (e.g., the RANSAC thresholds), which is a critical step in validating any quantitative model.

#### **Task 21: `construct_bootstrap_confidence_intervals`**

*   **Inputs:** The paired DataFrame, a configuration, and a specific estimation group to analyze.
*   **Processes:**
    1.  For a single estimation group, it repeatedly resamples the underlying option pairs with replacement.
    2.  For each resampled dataset, it re-runs the core estimation logic (RANSAC through rate conversion).
    3.  It collects the distribution of the resulting rate estimates.
    4.  It calculates percentile-based confidence intervals from this distribution.
*   **Transformation:** Transforms a single point estimate into a distribution of estimates, which is then summarized into confidence intervals.
*   **Outputs:** A dictionary containing the point estimate and its confidence intervals.
*   **Role in Research Pipeline:** This function provides a method for quantifying the statistical uncertainty of any given point estimate produced by the model. This is a standard and rigorous technique for moving beyond a simple point estimate to understanding its precision.

#### **Task 22: `run_economic_validation`**

*   **Inputs:** The final yield curve DataFrame and an optional external benchmark DataFrame.
*   **Processes:**
    1.  Performs theoretical consistency checks (term structure shape, cross-asset consistency).
    2.  If a benchmark is provided, it calculates the correlation between the estimated rates and the benchmark.
    3.  Performs formal econometric tests for stationarity (unit root) and cointegration.
*   **Transformation:** This function transforms the final time-series data into a set of summary statistics and test results.
*   **Outputs:** A comprehensive dictionary containing the results of all validation tests.
*   **Role in Research Pipeline:** This function directly addresses the economic interpretation and validation of the results, similar to the analysis presented in **Section 4.3**, which compares the model's output to DeFi lending rates (Figure 11) and discusses the properties of the resulting curves.

<br> <br>

### **Usage Example**

This guide demonstrates how to execute the complete yield curve estimation and analysis pipeline. The process involves three main stages:
1.  **Data Loading and Preparation:** Loading the raw market data and the external benchmark data into the required `pandas.DataFrame` structures.
2.  **Configuration Loading:** Loading the study's parameters from the `config.yaml` file.
3.  **Pipeline Execution:** Calling the main orchestrator function, `run_full_analysis`, with the prepared data and configuration to generate the final results.

#### **1. Data Loading and Preparation**

Before running the pipeline, the user must prepare two primary data artifacts.

**a) The Master Market Data DataFrame (`df_master`)**

This is the most critical input. It must contain the high-frequency, top-of-book quote data for options, along with the corresponding futures and spot prices. The data must be structured precisely as a `pandas.DataFrame` with the specific `MultiIndex` and columns that the pipeline's validation functions expect.

*   **Discussion:** A user would typically construct this DataFrame through a dedicated ETL (Extract, Transform, Load) process. This would involve fetching raw data from an API (like CoinMetrics), parsing it, merging the options, futures, and spot feeds, and then setting the specific MultiIndex. For this example, we will simulate a small, correctly formatted `df_master` to demonstrate the pipeline's usage.

```python
# Python Code Snippet: Simulating df_master

import pandas as pd
import numpy as np

# Create the MultiIndex levels
timestamps = pd.to_datetime(['2024-11-05 08:00:00', '2024-11-05 09:00:00'])
assets = ['BTC']
expiries = pd.to_datetime(['2024-12-27', '2025-03-28'])
strikes = [68000.0, 70000.0, 72000.0]
option_types = ['call', 'put']

# Create the full MultiIndex from the product of the levels
index = pd.MultiIndex.from_product(
    [timestamps, assets, expiries, strikes, option_types],
    names=['timestamp', 'asset', 'expiry_date', 'strike_price', 'option_type']
)

# Create a placeholder DataFrame
df_master = pd.DataFrame(index=index)

# Populate with realistic, but simulated, data
# In a real scenario, this data would be loaded from a file (e.g., Parquet)
np.random.seed(42)
df_master['best_bid_price'] = np.random.uniform(0.01, 0.1, size=len(df_master))
df_master['best_ask_price'] = df_master['best_bid_price'] * np.random.uniform(1.01, 1.05, size=len(df_master))
df_master['instrument_name'] = [f"BTC-{exp.strftime('%d%b%y')}-{int(k)}-{opt[0].upper()}" for t, a, exp, k, opt in df_master.index]

# Add spot and futures prices (constant per timestamp/expiry)
df_master['spot_price'] = df_master.index.get_level_values('timestamp').map(
    {timestamps[0]: 70000.0, timestamps[1]: 70100.0}
)
# Create a mapping for futures prices
futures_map = {
    (timestamps[0], expiries[0]): 70500.0, (timestamps[0], expiries[1]): 71000.0,
    (timestamps[1], expiries[0]): 70600.0, (timestamps[1], expiries[1]): 71100.0,
}
# Map the futures prices based on timestamp and expiry
df_master['futures_price'] = [futures_map.get((t, exp)) for t, a, exp, k, opt in df_master.index]

print("--- Sample df_master created ---")
print(df_master.head())
```

**b) The External Benchmark DataFrame (`benchmark_df`)**

This DataFrame is optional. If provided, it is used in the final economic validation step (Task 22). It should contain one or more columns of benchmark data (e.g., DeFi lending rates) and have a `DatetimeIndex` with daily frequency.

*   **Discussion:** This data would typically be sourced from a provider like DeFiLlama. For this example, we will simulate a small benchmark DataFrame.

```python
# Python Code Snippet: Simulating benchmark_df

# Create a date range matching the master data
date_range = pd.to_datetime(['2024-11-05', '2024-11-06'])

# Create the benchmark DataFrame
benchmark_df = pd.DataFrame(
    {'aave_usdc_rate': [0.055, 0.056]},
    index=pd.DatetimeIndex(date_range, name='date')
)

print("\n--- Sample benchmark_df created ---")
print(benchmark_df)
```

#### **2. Configuration Loading**

The pipeline's behavior is controlled by the `config.yaml` file. We need to load this file into a Python dictionary. The `PyYAML` library is the standard tool for this.

*   **Discussion:** This step cleanly separates the code from the parameters. A researcher can easily create multiple YAML files to test different scenarios without ever modifying the Python source code.

```python
# Python Code Snippet: Loading the YAML configuration

import yaml

# Define the path to the configuration file
config_path = 'config.yaml'

# Open and load the YAML file into a Python dictionary
with open(config_path, 'r') as f:
    fused_input_state = yaml.safe_load(f)

print("\n--- Configuration loaded from config.yaml ---")
pprint(fused_input_state)
```

#### **3. Pipeline Execution**

With the data and configuration prepared, we can now call the main orchestrator function, `run_full_analysis`. This single function call will execute the entire end-to-end process.

*   **Discussion:** We will demonstrate a comprehensive call that includes the optional robustness analyses. The `parameter_grid` will test the pipeline's sensitivity to the RANSAC residual threshold. The `bootstrap_group` will be set to one of the specific `(timestamp, asset, expiry)` combinations present in our simulated data to calculate confidence intervals for that point estimate.

```python
# Python Code Snippet: Executing the full analysis

# --- Define Inputs for Optional Analyses ---

# Define a parameter grid for the sensitivity analysis (Task 20)
# We will test two different values for the RANSAC threshold.
param_grid_for_sensitivity = {
    'ransac_residual_sq_threshold': [0.004, 0.008]
}

# Define a specific group for the bootstrap analysis (Task 21)
# This must be a valid (timestamp, asset, expiry_date) from our df_master
bootstrap_target_group = (
    pd.to_datetime('2024-11-05 08:00:00'),
    'BTC',
    pd.to_datetime('2024-12-27')
)

# --- Execute the Grand Orchestrator ---

# Call the main function with all inputs.
# The 'verbose' flag is set to True to print diagnostics from the core pipeline.
full_report = run_full_analysis(
    df_master=df_master,
    fused_input_state=fused_input_state,
    parameter_grid=param_grid_for_sensitivity,
    bootstrap_group=bootstrap_target_group,
    n_bootstrap=100,  # Using a smaller number for a quick example run
    benchmark_df=benchmark_df,
    verbose=True
)

# --- Inspect the Results ---

print("\n\n" + "="*50)
print("      FULL ANALYSIS COMPLETE - FINAL REPORT      ")
print("="*50 + "\n")

# Print the final yield curve DataFrame produced by the core pipeline
print("--- Final Yield Curve Time Series (from core pipeline) ---")
print(full_report['core_pipeline_results']['yield_curves_df'])

# Print a summary of the sensitivity analysis
print("\n--- Sensitivity Analysis Summary ---")
if 'sensitivity_analysis' in full_report:
    print("Rate Stability (Mean and Std Dev of rates across parameter runs):")
    print(full_report['sensitivity_analysis']['rate_stability_summary'])

# Print the results of the bootstrap analysis
print("\n--- Bootstrap Analysis Summary ---")
if 'bootstrap_analysis' in full_report:
    pprint(full_report['bootstrap_analysis'])

# Print the results of the economic validation
print("\n--- Economic Validation Summary ---")
if 'economic_validation' in full_report:
    pprint(full_report['economic_validation'])
```




In [None]:
# Task 1: DataFrame Structure Validation

# ======================================================================================
# Task 1, Step 1: MultiIndex Level Validation
# ======================================================================================

def _validate_multiindex(
    df: pd.DataFrame
) -> List[str]:
    """
    Validates the structure, names, dtypes, and values of the DataFrame's MultiIndex.

    This function performs a series of rigorous checks to ensure the MultiIndex
    conforms to the exact schema required by the yield curve estimation pipeline.
    It is a critical first step in the input validation process.

    Args:
        df (pd.DataFrame):
            The input DataFrame which is expected to have a specific MultiIndex
            structure.

    Returns:
        List[str]:
            A list of string error messages. An empty list indicates that all
            MultiIndex validation checks have passed successfully.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []

    # Define the expected schema for the MultiIndex.
    expected_levels: int = 5
    expected_names: List[str] = [
        'timestamp', 'asset', 'expiry_date', 'strike_price', 'option_type'
    ]
    expected_dtypes: Dict[str, str] = {
        'timestamp': 'datetime64',
        'asset': 'object',
        'expiry_date': 'datetime64',
        'strike_price': 'float64',
        'option_type': 'object'
    }
    expected_values: Dict[str, Set[str]] = {
        'asset': {'BTC', 'ETH'},
        'option_type': {'call', 'put'}
    }

    # Check 1: Verify that the index is indeed a MultiIndex.
    if not isinstance(df.index, pd.MultiIndex):
        # If not a MultiIndex, further checks are irrelevant. Return immediately.
        return ["Input is not a pandas MultiIndex."]

    # Check 2: Validate the number of levels in the MultiIndex.
    if df.index.nlevels != expected_levels:
        # Add an error message if the number of levels is incorrect.
        errors.append(
            f"MultiIndex validation failed: Expected {expected_levels} levels, "
            f"but found {df.index.nlevels}."
        )
        # If the level count is wrong, further checks by name will fail. Return.
        return errors

    # Check 3: Validate the names and order of the MultiIndex levels.
    if list(df.index.names) != expected_names:
        # Add an error message if the names or their order are incorrect.
        errors.append(
            f"MultiIndex validation failed: Expected level names "
            f"{expected_names}, but found {list(df.index.names)}."
        )

    # Check 4: Iterate through each level to validate its dtype and values.
    for i, name in enumerate(expected_names):
        # Extract the current level's values for inspection.
        level_values = df.index.get_level_values(i)

        # Sub-check 4a: Validate the data type of the level.
        if expected_dtypes[name] == 'datetime64':
            # For datetime levels, use the specific pandas API type checker.
            if not ptypes.is_datetime64_any_dtype(level_values):
                errors.append(
                    f"MultiIndex validation failed: Level '{name}' is not a "
                    f"datetime64 dtype, found {level_values.dtype}."
                )
        elif expected_dtypes[name] == 'float64':
            # For float levels, use the specific pandas API type checker.
            if not ptypes.is_float_dtype(level_values):
                errors.append(
                    f"MultiIndex validation failed: Level '{name}' is not a "
                    f"float64 dtype, found {level_values.dtype}."
                )
        elif expected_dtypes[name] == 'object':
            # For object (string) levels, use the specific pandas API type checker.
            if not ptypes.is_object_dtype(level_values):
                errors.append(
                    f"MultiIndex validation failed: Level '{name}' is not an "
                    f"object dtype, found {level_values.dtype}."
                )

        # Sub-check 4b: Validate the set of unique values for categorical levels.
        if name in expected_values:
            # Get the unique values present in the data for this level.
            unique_vals = set(level_values.unique())
            # Check if there are any values in the data that are not in the expected set.
            if not unique_vals.issubset(expected_values[name]):
                # Identify and report the unexpected values.
                unexpected = unique_vals - expected_values[name]
                errors.append(
                    f"MultiIndex validation failed: Level '{name}' contains "
                    f"unexpected values: {unexpected}. Expected a subset of "
                    f"{expected_values[name]}."
                )

        # Sub-check 4c: For the 'strike_price' level, ensure all values are positive.
        if name == 'strike_price' and ptypes.is_float_dtype(level_values):
            # Check if any strike price is less than or equal to zero.
            if not (level_values > 0).all():
                errors.append(
                    "MultiIndex validation failed: Level 'strike_price' "
                    "contains non-positive values."
                )

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 1, Step 2: Column Schema and Dtype Validation
# ======================================================================================

def _validate_columns(
    df: pd.DataFrame
) -> List[str]:
    """
    Validates the DataFrame's columns for presence, dtype, and numeric properties.

    This function ensures that all required columns for the analysis exist,
    have the correct data types, and contain valid numerical values (i.e.,
    positive and finite, where applicable).

    Args:
        df (pd.DataFrame):
            The input DataFrame with columns to be validated.

    Returns:
        List[str]:
            A list of string error messages. An empty list indicates successful
            validation of all column properties.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []

    # Define the expected schema for the columns.
    expected_columns: Dict[str, np.dtype] = {
        'instrument_name': np.dtype('O'),
        'best_bid_price': np.dtype('float64'),
        'best_ask_price': np.dtype('float64'),
        'futures_price': np.dtype('float64'),
        'spot_price': np.dtype('float64')
    }

    # Check 1: Verify that all expected columns are present in the DataFrame.
    missing_cols = set(expected_columns.keys()) - set(df.columns)
    if missing_cols:
        # If any columns are missing, report them and return, as further checks are invalid.
        errors.append(f"Column validation failed: Missing required columns: {missing_cols}.")
        return errors

    # Check 2: Iterate through each expected column to validate its properties.
    for col_name, expected_dtype in expected_columns.items():
        # Extract the column Series for inspection.
        column_series = df[col_name]

        # Sub-check 2a: Validate the data type of the column.
        if column_series.dtype != expected_dtype:
            errors.append(
                f"Column validation failed: Column '{col_name}' has incorrect "
                f"dtype. Expected {expected_dtype}, found {column_series.dtype}."
            )
            # If dtype is wrong, numeric checks below might fail; continue to next column.
            continue

        # Sub-check 2b: Check for any null (NaN, None) values.
        if column_series.isnull().any():
            errors.append(
                f"Column validation failed: Column '{col_name}' contains null values."
            )

        # Sub-check 2c: For numeric columns, perform additional checks.
        if ptypes.is_numeric_dtype(column_series):
            # Ensure all values are finite (not inf, -inf, or NaN).
            if not np.isfinite(column_series).all():
                errors.append(
                    f"Column validation failed: Column '{col_name}' contains "
                    "non-finite values (inf, -inf, or NaN)."
                )
            # For price columns, ensure all values are strictly positive.
            if col_name != 'instrument_name': # All other columns are prices
                if not (column_series > 0).all():
                    errors.append(
                        f"Column validation failed: Column '{col_name}' contains "
                        "non-positive values."
                    )

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 1, Step 3: Economic Constraint Validation
# ======================================================================================

def _validate_economic_constraints(
    df: pd.DataFrame
) -> List[str]:
    """
    Validates fundamental economic and logical constraints within the data.

    This function checks for internal consistency, such as valid bid-ask spreads
    and logical temporal ordering between observation timestamps and contract expiries.

    Args:
        df (pd.DataFrame):
            The input DataFrame, assumed to have passed schema validation.

    Returns:
        List[str]:
            A list of string error messages. An empty list indicates all
            constraints are satisfied.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []

    # Constraint 1: The ask price must be greater than or equal to the bid price.
    # A violation (ask < bid) indicates a data quality issue or a crossed market.
    inverted_spread_mask = df['best_ask_price'] < df['best_bid_price']
    if inverted_spread_mask.any():
        # If any violations exist, report the count and percentage.
        num_violations = inverted_spread_mask.sum()
        pct_violations = 100 * num_violations / len(df)
        errors.append(
            f"Economic constraint failed: Found {num_violations} rows "
            f"({pct_violations:.2f}%) where best_ask_price < best_bid_price."
        )

    # Extract timestamp and expiry_date levels for temporal checks.
    timestamps = df.index.get_level_values('timestamp')
    expiries = df.index.get_level_values('expiry_date')

    # Constraint 2: The expiry date of an option must be in the future relative
    # to its observation timestamp.
    # A violation (expiry <= timestamp) is a logical impossibility.
    expired_contract_mask = expiries <= timestamps
    if expired_contract_mask.any():
        # If any violations exist, report the count and percentage.
        num_violations = expired_contract_mask.sum()
        pct_violations = 100 * num_violations / len(df)
        errors.append(
            f"Economic constraint failed: Found {num_violations} rows "
            f"({pct_violations:.2f}%) where expiry_date <= timestamp."
        )

    # Constraint 3: Check for timezone consistency. Timestamps and expiries must
    # either both be timezone-aware (and have the same timezone) or both be naive.
    # Mixing aware and naive datetimes leads to incorrect duration calculations.
    ts_tz = timestamps.tz
    ex_tz = expiries.tz
    if ts_tz != ex_tz:
        errors.append(
            "Economic constraint failed: Timezone mismatch between 'timestamp' "
            f"level (tz='{ts_tz}') and 'expiry_date' level (tz='{ex_tz}')."
        )

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 1: Fused Orchestrator Function
# ======================================================================================

def validate_dataframe_structure(
    df_master: pd.DataFrame
) -> None:
    """
    Orchestrates a comprehensive validation of the input DataFrame's structure.

    This master function executes a sequence of validation checks covering the
    MultiIndex, column schema, and fundamental economic constraints. It aggregates
    all identified issues and raises a single, detailed ValueError if any
    validation fails, providing a complete diagnostic report. A successful
    execution (i.e., no exception raised) indicates the DataFrame is fit for
    the subsequent stages of the yield curve estimation pipeline.

    Args:
        df_master (pd.DataFrame):
            The primary input DataFrame for the yield curve estimation pipeline.
            It is expected to conform to a strict, predefined schema.

    Raises:
        TypeError:
            If the input `df_master` is not a pandas DataFrame.
        ValueError:
            If any of the validation checks fail. The error message contains a
            detailed, itemized list of all identified structural, schema, or
            constraint violations.

    Returns:
        None:
            This function does not return a value. It either completes silently,
            indicating success, or raises an exception, indicating failure.
    """
    # --- Input Type Validation ---
    # First, ensure the provided object is a pandas DataFrame.
    if not isinstance(df_master, pd.DataFrame):
        # Raise a TypeError if the input is of the wrong type.
        raise TypeError("Input `df_master` must be a pandas DataFrame.")

    # --- Aggregation of Validation Errors ---
    # Initialize a list to hold all error messages from the various checks.
    all_errors: List[str] = []

    # Execute Step 1: MultiIndex validation.
    # This checks the structure, names, dtypes, and values of the index.
    multiindex_errors = _validate_multiindex(df=df_master)
    all_errors.extend(multiindex_errors)

    # Execute Step 2: Column schema validation.
    # This checks for column presence, dtypes, and numeric properties.
    # This check is only performed if the index validation passed, as some
    # constraints depend on a valid index.
    if not multiindex_errors:
        column_errors = _validate_columns(df=df_master)
        all_errors.extend(column_errors)

        # Execute Step 3: Economic constraint validation.
        # This checks for logical consistency in the data.
        # This check is only performed if both index and column schema are valid.
        if not column_errors:
            constraint_errors = _validate_economic_constraints(df=df_master)
            all_errors.extend(constraint_errors)

    # --- Final Error Reporting ---
    # Check if any errors were collected during the validation process.
    if all_errors:
        # If errors exist, format them into a single, comprehensive error message.
        error_header = "Input DataFrame validation failed with the following errors:"
        # Join all individual error messages with newlines for readability.
        formatted_errors = "\n".join(f"- {error}" for error in all_errors)
        # Raise a single ValueError containing the complete diagnostic report.
        raise ValueError(f"{error_header}\n{formatted_errors}")

    # If the function reaches this point, all validations passed.
    # No action is needed, and the silent return indicates success.
    return None


In [None]:
# Task 2: Hyperparameter Dictionary Validation

# ======================================================================================
# Task 2, Step 1: Metadata Section Validation
# ======================================================================================

def _validate_metadata(
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the 'metadata' section of the configuration dictionary.

    This function ensures that the 'metadata' key exists and its value is a
    dictionary containing the required keys ('asset', 'data_source') with
    appropriate types and values.

    Args:
        config (Dict[str, Any]):
            The top-level configuration dictionary.

    Returns:
        List[str]:
            A list of error messages. An empty list signifies successful validation.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []

    # Check 1: Ensure the 'metadata' key exists and is a dictionary.
    metadata = config.get('metadata')
    if not isinstance(metadata, dict):
        # If 'metadata' is missing or not a dict, further checks are impossible.
        errors.append("Configuration validation failed: Missing or invalid 'metadata' section.")
        return errors

    # Check 2: Validate the 'asset' key.
    asset = metadata.get('asset')
    expected_assets: Set[str] = {'BTC', 'ETH'}
    if not isinstance(asset, str) or asset not in expected_assets:
        # The asset must be a string and one of the allowed values.
        errors.append(
            f"Metadata validation failed: 'asset' must be one of "
            f"{expected_assets}, but found '{asset}'."
        )

    # Check 3: Validate the 'data_source' key.
    data_source = metadata.get('data_source')
    if not isinstance(data_source, str) or not data_source:
        # The data_source must be a non-empty string.
        errors.append(
            "Metadata validation failed: 'data_source' must be a non-empty string."
        )

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 2, Step 2 & 3: Hyperparameter Range, Type, and Special Validation
# ======================================================================================

def _validate_hyperparameters(
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the 'hyperparameters' section using a declarative schema.

    This function provides a rigorous, schema-driven validation of the
    'hyperparameters' dictionary. It checks for the presence, type, and value
    constraints (e.g., range, exact value, non-empty list) of each expected
    hyperparameter. It is a critical gatekeeper that ensures the integrity of
    all model parameters before the pipeline execution begins. This revised
    version includes an explicit check for non-empty lists.

    Args:
        config (Dict[str, Any]):
            The top-level configuration dictionary which is expected to contain
            a 'hyperparameters' key.

    Returns:
        List[str]:
            A list of specific, actionable error messages. An empty list
            signifies that all hyperparameter validation checks have passed
            successfully.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []

    # Define the validation schema for all expected hyperparameters.
    # This schema is declarative, robust, and easily extensible.
    # New Rule: 'non_empty' has been added for list validation.
    schema: Dict[str, Dict[str, Any]] = {
        'maturity_filter_days': {'type': int, 'range': (1, 365)},
        'liquidity_filter_spread_pct': {'type': float, 'range': (0.001, 1.0)},
        'ransac_residual_sq_threshold': {'type': float, 'range': (0.0001, 0.1)},
        'ransac_slope_tolerance_pct': {'type': float, 'range': (0.01, 0.5)},
        'ransac_min_sample_size': {'type': int, 'exact_value': 2},
        'ransac_num_iterations': {'type': int, 'range': (10, 10000)},
        'estimation_lambda_weight': {'type': float, 'range': (0.1, 10.0)},
        'annualization_factor': {'type': float, 'exact_value': 365.25},
        'target_tenors_days': {
            'type': list,
            'non_empty': True, # New rule to enforce non-empty list.
            'element_type': int,
            'element_positive': True
        }
    }

    # Check 1: Ensure the 'hyperparameters' key exists and is a dictionary.
    hyperparams = config.get('hyperparameters')
    if not isinstance(hyperparams, dict):
        # If the main section is missing or invalid, no further checks are possible.
        errors.append("Configuration validation failed: Missing or invalid 'hyperparameters' section.")
        return errors

    # Check 2: Iterate through the schema to validate each expected parameter.
    for param, rules in schema.items():
        # Sub-check 2a: Check for the presence of the parameter.
        if param not in hyperparams:
            errors.append(f"Hyperparameter validation failed: Missing required parameter '{param}'.")
            # Skip further checks for this missing parameter.
            continue

        # Retrieve the value for the current parameter.
        value = hyperparams[param]
        # Retrieve the expected primary type from the schema.
        expected_type = rules['type']

        # Sub-check 2b: Validate the primary type of the parameter.
        if not isinstance(value, expected_type):
            errors.append(
                f"Hyperparameter validation failed: '{param}' must be of type "
                f"{expected_type.__name__}, but found {type(value).__name__}."
            )
            # If the primary type is wrong, subsequent checks are invalid.
            continue

        # Sub-check 2c: Perform detailed validation based on schema rules.
        if expected_type is list:
            # --- Enhanced List Validation ---
            # First, check if the list is required to be non-empty.
            if rules.get('non_empty', False) and not value:
                errors.append(f"Hyperparameter validation failed: '{param}' must be a non-empty list.")
            # Only proceed to check elements if the list is not empty.
            elif value:
                # Check that all elements in the list have the correct type.
                element_type = rules.get('element_type')
                if not all(isinstance(el, element_type) for el in value):
                    errors.append(f"Hyperparameter validation failed: All elements in '{param}' must be of type {element_type.__name__}.")
                # Check if all elements in the list must be positive.
                if rules.get('element_positive') and not all(el > 0 for el in value):
                    errors.append(f"Hyperparameter validation failed: All elements in '{param}' must be positive.")

        elif 'range' in rules:
            # For numeric types with a range constraint.
            min_val, max_val = rules['range']
            if not (min_val <= value <= max_val):
                errors.append(
                    f"Hyperparameter validation failed: '{param}' must be within the range "
                    f"[{min_val}, {max_val}], but found {value}."
                )

        elif 'exact_value' in rules:
            # For parameters that must have an exact value.
            exact_val = rules['exact_value']
            # Use math.isclose for robust floating-point comparison.
            if isinstance(exact_val, float):
                if not math.isclose(value, exact_val, rel_tol=1e-9, abs_tol=1e-9):
                    errors.append(f"Hyperparameter validation failed: '{param}' must be exactly {exact_val}, but found {value}.")
            # Use direct comparison for other types (e.g., int).
            elif value != exact_val:
                errors.append(f"Hyperparameter validation failed: '{param}' must be exactly {exact_val}, but found {value}.")

    # Check 3: Identify any unrecognized parameters to catch potential typos.
    unrecognized_params = set(hyperparams.keys()) - set(schema.keys())
    if unrecognized_params:
        errors.append(f"Hyperparameter validation failed: Unrecognized parameters found: {unrecognized_params}.")

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 2: Fused Orchestrator Function
# ======================================================================================

def validate_hyperparameter_dictionary(
    fused_input_state: Dict[str, Any]
) -> None:
    """
    Orchestrates a comprehensive validation of the hyperparameter dictionary.

    This master function validates the entire configuration structure, including
    the 'metadata' and 'hyperparameters' sections. It uses a declarative,
    schema-driven approach to ensure all parameters are present, correctly typed,
    and within their valid operational ranges. A successful execution indicates
    the configuration is safe to use for the pipeline.

    Args:
        fused_input_state (Dict[str, Any]):
            The primary configuration dictionary for the study, containing
            metadata and algorithm hyperparameters.

    Raises:
        TypeError:
            If the input `fused_input_state` is not a dictionary.
        ValueError:
            If any validation checks fail. The error message provides a detailed,
            itemized list of all identified configuration issues.

    Returns:
        None:
            This function returns nothing upon successful validation, otherwise
            it raises an exception.
    """
    # --- Input Type Validation ---
    # Ensure the top-level input is a dictionary.
    if not isinstance(fused_input_state, dict):
        raise TypeError("Input `fused_input_state` must be a dictionary.")

    # --- Aggregation of Validation Errors ---
    # Initialize a list to hold all error messages.
    all_errors: List[str] = []

    # Execute Step 1: Validate the 'metadata' section.
    metadata_errors = _validate_metadata(config=fused_input_state)
    all_errors.extend(metadata_errors)

    # Execute Step 2 & 3: Validate the 'hyperparameters' section.
    hyperparam_errors = _validate_hyperparameters(config=fused_input_state)
    all_errors.extend(hyperparam_errors)

    # --- Final Error Reporting ---
    # Check if the aggregated list of errors is non-empty.
    if all_errors:
        # If errors were found, construct a comprehensive error message.
        error_header = "Input hyperparameter dictionary validation failed with the following errors:"
        # Format each error for clarity.
        formatted_errors = "\n".join(f"- {error}" for error in all_errors)
        # Raise a single, informative ValueError.
        raise ValueError(f"{error_header}\n{formatted_errors}")

    # If no errors were found, the function completes silently.
    return None


In [None]:
# Task 3: Data Completeness and Consistency Validation

# ======================================================================================
# Task 3, Step 1: Temporal Coverage Analysis
# ======================================================================================

def _validate_temporal_coverage(
    df: pd.DataFrame
) -> List[str]:
    """
    Validates the temporal density and continuity of the dataset.

    This function checks if the dataset meets minimum requirements for the number
    of observations over time and ensures there are no excessively large gaps
    between consecutive data points.

    Args:
        df (pd.DataFrame):
            The input DataFrame, assumed to have a valid MultiIndex structure.

    Returns:
        List[str]:
            A list of error messages. An empty list indicates sufficient temporal coverage.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []

    # Define temporal coverage thresholds.
    MIN_UNIQUE_TIMESTAMPS = 100
    MAX_GAP_DAYS = pd.Timedelta(days=7)

    # Extract the unique timestamps from the index for analysis.
    unique_timestamps = df.index.get_level_values('timestamp').unique()

    # Check 1: Ensure a minimum number of unique timestamps exist.
    if len(unique_timestamps) < MIN_UNIQUE_TIMESTAMPS:
        errors.append(
            f"Temporal coverage failed: Insufficient unique timestamps. "
            f"Found {len(unique_timestamps)}, but require at least {MIN_UNIQUE_TIMESTAMPS}."
        )

    # Check 2: Calculate and validate the maximum gap between consecutive timestamps.
    if len(unique_timestamps) > 1:
        # Sort timestamps to ensure correct diff calculation.
        sorted_timestamps = unique_timestamps.sort_values()
        # Calculate the difference between each consecutive timestamp.
        gaps = pd.Series(sorted_timestamps).diff()
        # Find the largest gap.
        max_gap = gaps.max()
        # Check if the largest gap exceeds the allowed threshold.
        if max_gap > MAX_GAP_DAYS:
            errors.append(
                f"Temporal coverage failed: Maximum gap between consecutive timestamps "
                f"is {max_gap}, which exceeds the threshold of {MAX_GAP_DAYS}."
            )

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 3, Step 2: Cross-Sectional Coverage Validation
# ======================================================================================

def _validate_cross_sectional_coverage(
    df: pd.DataFrame
) -> List[str]:
    """
    Validates the cross-sectional density of option contracts.

    For each (timestamp, expiry_date) group, this function checks for a
    sufficient number of option pairs and an adequate range of moneyness to
    ensure the stability of the estimation procedure.

    Args:
        df (pd.DataFrame):
            The input DataFrame, assumed to have a valid MultiIndex structure.

    Returns:
        List[str]:
            A list of error messages. An empty list indicates sufficient cross-sectional coverage.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []

    # Define cross-sectional coverage thresholds.
    MIN_CALL_PUT_PAIRS = 5
    MIN_MONEYNESS = 0.7
    MAX_MONEYNESS = 1.3

    # Group the DataFrame by the estimation unit: (timestamp, asset, expiry_date).
    grouped = df.groupby(['timestamp', 'asset', 'expiry_date'])
    num_groups = len(grouped)

    # --- Check 1: Minimum number of call-put pairs ---
    # To accurately check for pairs, we pivot the data.
    try:
        # Create a pivot table to align calls and puts for each strike.
        paired_data = df.reset_index().pivot_table(
            index=['timestamp', 'asset', 'expiry_date', 'strike_price'],
            columns='option_type',
            values='best_bid_price' # Value column is arbitrary, just for structure
        )
        # A valid pair exists where neither 'call' nor 'put' is NaN.
        valid_pairs = paired_data.dropna()
        # Count the number of valid pairs per estimation group.
        pair_counts = valid_pairs.groupby(['timestamp', 'asset', 'expiry_date']).size()

        # Identify groups that fail to meet the minimum pair requirement.
        insufficient_pair_groups = pair_counts[pair_counts < MIN_CALL_PUT_PAIRS]
        if not insufficient_pair_groups.empty:
            pct_failing = 100 * len(insufficient_pair_groups) / num_groups
            errors.append(
                f"Cross-sectional coverage failed: {len(insufficient_pair_groups)} groups "
                f"({pct_failing:.2f}%) have fewer than {MIN_CALL_PUT_PAIRS} call-put pairs."
            )
    except Exception as e:
        errors.append(f"Cross-sectional coverage failed during pairing analysis: {e}")

    # --- Check 2: Moneyness range ---
    # Compute moneyness on the fly for the entire DataFrame.
    df_with_moneyness = df.copy()
    df_with_moneyness['moneyness'] = (
        df_with_moneyness.index.get_level_values('strike_price') / df_with_moneyness['spot_price']
    )

    # Calculate the min and max moneyness for each group.
    moneyness_ranges = df_with_moneyness.groupby(['timestamp', 'asset', 'expiry_date'])['moneyness'].agg(['min', 'max'])

    # Identify groups with an insufficient moneyness range.
    inadequate_range_mask = (moneyness_ranges['min'] > MIN_MONEYNESS) | (moneyness_ranges['max'] < MAX_MONEYNESS)
    num_inadequate_range = inadequate_range_mask.sum()
    if num_inadequate_range > 0:
        pct_failing = 100 * num_inadequate_range / num_groups
        errors.append(
            f"Cross-sectional coverage failed: {num_inadequate_range} groups "
            f"({pct_failing:.2f}%) do not have strikes spanning the required moneyness "
            f"range of [{MIN_MONEYNESS}, {MAX_MONEYNESS}]."
        )

    # --- Check 3: Futures price availability ---
    # Check if a single, unique, non-null futures price exists for each group.
    futures_check = grouped['futures_price'].agg(lambda x: x.nunique() == 1 and not x.isnull().any())
    num_missing_futures = (~futures_check).sum()
    if num_missing_futures > 0:
        pct_failing = 100 * num_missing_futures / num_groups
        errors.append(
            f"Cross-sectional coverage failed: {num_missing_futures} groups "
            f"({pct_failing:.2f}%) are missing a unique, valid futures price."
        )

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 3, Step 3: Market Data Quality Assessment
# ======================================================================================

def _validate_market_data_quality(
    df: pd.DataFrame
) -> List[str]:
    """
    Performs a high-level assessment of the market data quality.

    This function screens for common data quality issues like zero prices,
    inverted spreads, and extremely wide spreads, which can indicate illiquid
    markets or data feed errors.

    Args:
        df (pd.DataFrame):
            The input DataFrame, assumed to have a valid schema.

    Returns:
        List[str]:
            A list of error messages detailing data quality issues.
    """
    # Initialize a list to aggregate potential error messages.
    errors: List[str] = []
    total_rows = len(df)

    # Check 1: Identify rows with zero bid or ask prices.
    zero_price_mask = (df['best_bid_price'] == 0) | (df['best_ask_price'] == 0)
    num_zero_prices = zero_price_mask.sum()
    if num_zero_prices > 0:
        pct_zero = 100 * num_zero_prices / total_rows
        errors.append(
            f"Market data quality warning: Found {num_zero_prices} rows "
            f"({pct_zero:.2f}%) with a zero bid or ask price."
        )

    # Check 2: Identify rows with inverted bid-ask spreads (re-checked here for logging).
    inverted_spread_mask = df['best_ask_price'] < df['best_bid_price']
    num_inverted = inverted_spread_mask.sum()
    if num_inverted > 0:
        pct_inverted = 100 * num_inverted / total_rows
        errors.append(
            f"Market data quality warning: Found {num_inverted} rows "
            f"({pct_inverted:.2f}%) with inverted bid-ask spreads (ask < bid)."
        )

    # Check 3: Identify rows with extreme relative spreads (> 100%).
    # Calculate mid-price, handling potential division by zero.
    mid_price = (df['best_ask_price'] + df['best_bid_price']) / 2
    # Calculate relative spread safely.
    with np.errstate(divide='ignore', invalid='ignore'):
        relative_spread = (df['best_ask_price'] - df['best_bid_price']) / mid_price

    # An extreme spread is defined as > 100%.
    extreme_spread_mask = relative_spread > 1.0
    num_extreme = extreme_spread_mask.sum()
    if num_extreme > 0:
        pct_extreme = 100 * num_extreme / total_rows
        errors.append(
            f"Market data quality warning: Found {num_extreme} rows "
            f"({pct_extreme:.2f}%) with an extreme relative spread (>100%)."
        )

    # Return the aggregated list of all found errors.
    return errors

# ======================================================================================
# Task 3: Fused Orchestrator Function
# ======================================================================================

def validate_data_completeness(
    df_master: pd.DataFrame
) -> None:
    """
    Orchestrates a validation of the dataset's completeness and consistency.

    This master function executes a sequence of checks to ensure the data is
    sufficiently dense and of high enough quality for a robust analysis. It
    validates temporal coverage, cross-sectional (instrument) density, and
    market data quality. A failure indicates the dataset is not suitable for
    the yield curve estimation pipeline in its current state.

    Args:
        df_master (pd.DataFrame):
            The primary input DataFrame, which is assumed to have already passed
            the structural validation from `validate_dataframe_structure`.

    Raises:
        TypeError:
            If the input `df_master` is not a pandas DataFrame.
        ValueError:
            If any of the completeness or consistency checks fail. The error
            message contains a detailed, itemized list of all identified issues.

    Returns:
        None:
            This function does not return a value. It either completes silently,
            indicating success, or raises an exception, indicating failure.
    """
    # --- Input Type Validation ---
    if not isinstance(df_master, pd.DataFrame):
        raise TypeError("Input `df_master` must be a pandas DataFrame.")

    # --- Aggregation of Validation Errors ---
    all_errors: List[str] = []

    # Execute Step 1: Temporal coverage validation.
    temporal_errors = _validate_temporal_coverage(df=df_master)
    all_errors.extend(temporal_errors)

    # Execute Step 2: Cross-sectional coverage validation.
    cross_sectional_errors = _validate_cross_sectional_coverage(df=df_master)
    all_errors.extend(cross_sectional_errors)

    # Execute Step 3: Market data quality assessment.
    # Note: These are often treated as warnings but are raised as errors here
    # for maximum rigor, forcing the user to acknowledge them.
    quality_errors = _validate_market_data_quality(df=df_master)
    all_errors.extend(quality_errors)

    # --- Final Error Reporting ---
    if all_errors:
        error_header = "Input DataFrame failed data completeness and consistency validation:"
        formatted_errors = "\n".join(f"- {error}" for error in all_errors)
        raise ValueError(f"{error_header}\n{formatted_errors}")

    # If no errors were found, the function completes silently.
    return None


In [None]:
# Task 4: Maturity-Based Filtering.

# ======================================================================================
# Task 4: Fused Orchestrator Function
# ======================================================================================

def filter_by_maturity(
    df_master: pd.DataFrame,
    maturity_filter_days: int
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Filters option contracts based on a minimum time-to-expiry threshold.

    This function implements the first data cleansing step as per Section 4.2 of
    the reference paper. It calculates the time-to-expiry for each option contract
    and excludes those with a maturity less than the specified threshold. The
    function is designed to be a pure transformation, returning a new, filtered
    DataFrame and a detailed diagnostic report of the operation.

    Args:
        df_master (pd.DataFrame):
            The input DataFrame containing option market data. It is assumed to
            have passed prior validation steps, ensuring a valid MultiIndex with
            'timestamp' and 'expiry_date' levels.
        maturity_filter_days (int):
            The minimum number of days to expiry for an option to be included.
            Contracts with maturities strictly less than this value will be
            excluded.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - pd.DataFrame: A new DataFrame containing only the option contracts
              that meet the minimum maturity requirement.
            - Dict[str, Any]: A diagnostic dictionary detailing the filtering
              process, including initial and final row counts, and the number
              and percentage of rows that were excluded.

    Raises:
        TypeError:
            If `df_master` is not a pandas DataFrame or if `maturity_filter_days`
            is not an integer.
        ValueError:
            If `maturity_filter_days` is not a positive integer.
    """
    # --- Input Validation ---
    # Verify the type of the main DataFrame input.
    if not isinstance(df_master, pd.DataFrame):
        raise TypeError("Input `df_master` must be a pandas DataFrame.")
    # Verify the type of the threshold parameter.
    if not isinstance(maturity_filter_days, int):
        raise TypeError("Input `maturity_filter_days` must be an integer.")
    # Verify the value of the threshold parameter.
    if maturity_filter_days <= 0:
        raise ValueError("Input `maturity_filter_days` must be a positive integer.")

    # --- Initialization ---
    # Record the initial number of rows for the diagnostic report.
    initial_row_count = len(df_master)

    # Create a copy to ensure the function is pure and avoids side effects.
    df_filtered = df_master.copy()

    # --- Step 1: Time-to-Expiry Calculation ---
    # Extract the timestamp and expiry date levels from the MultiIndex.
    timestamps = df_filtered.index.get_level_values('timestamp')
    expiries = df_filtered.index.get_level_values('expiry_date')

    # Calculate the time difference as a TimedeltaIndex. This is a robust,
    # timezone-aware calculation provided the inputs are consistent.
    time_to_expiry_delta = expiries - timestamps

    # Convert the TimedeltaIndex to a floating-point number of days.
    # This is the most precise way to represent the duration for comparison.
    # Equation Ref: Corresponds to calculating (T_i - t) for the filtering rule.
    time_to_expiry_days = time_to_expiry_delta / pd.Timedelta(days=1)

    # --- Step 2: Maturity Threshold Application ---
    # Create a boolean mask to identify rows that meet the inclusion criteria.
    # The condition is inclusive of the boundary (>=).
    # Filtering Rule: Inclusion Condition: τ_{i,t} >= maturity_filter_days
    inclusion_mask = time_to_expiry_days >= maturity_filter_days

    # Apply the mask to the DataFrame to select the desired rows.
    # Using .loc with a boolean mask is the canonical and most efficient method.
    df_filtered = df_filtered.loc[inclusion_mask]

    # --- Step 3: Filtering Validation and Documentation ---
    # Record the final number of rows after filtering.
    final_row_count = len(df_filtered)

    # Calculate the number of rows that were excluded by the filter.
    rows_excluded = initial_row_count - final_row_count

    # Calculate the percentage of rows excluded, handling the case of zero initial rows.
    percentage_excluded = (
        (rows_excluded / initial_row_count * 100)
        if initial_row_count > 0
        else 0.0
    )

    # Assemble the diagnostic report dictionary.
    diagnostics = {
        "filter_type": "Maturity-Based Filtering",
        "maturity_threshold_days": maturity_filter_days,
        "initial_row_count": initial_row_count,
        "final_row_count": final_row_count,
        "rows_excluded": rows_excluded,
        "percentage_excluded": round(percentage_excluded, 4)
    }

    # Return the filtered DataFrame and the detailed diagnostic report.
    return df_filtered, diagnostics



In [None]:
# Task 5: Liquidity-Based Filtering

# ======================================================================================
# Task 5: Fused Orchestrator Function
# ======================================================================================

def filter_by_liquidity(
    df_master: pd.DataFrame,
    liquidity_filter_spread_pct: float
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Filters option contracts based on a maximum relative bid-ask spread.

    This function implements the second data cleansing step as per Section 4.2 of
    the reference paper. It calculates the relative bid-ask spread for each
    option and excludes those with a spread exceeding the specified percentage
    threshold. This is a crucial step to remove illiquid or stale quotes.

    The relative spread is calculated as:
    Spread_i = (Ask_i - Bid_i) / Mid_i
    where Mid_i = (Ask_i + Bid_i) / 2

    Args:
        df_master (pd.DataFrame):
            The input DataFrame, potentially already filtered by maturity. It is
            assumed to have passed prior validation, ensuring the presence and
            validity of 'best_bid_price' and 'best_ask_price' columns.
        liquidity_filter_spread_pct (float):
            The maximum allowable relative bid-ask spread, expressed as a decimal
            (e.g., 0.15 for 15%). Contracts with a spread greater than this
            value will be excluded.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - pd.DataFrame: A new DataFrame containing only the option contracts
              that meet the liquidity requirement.
            - Dict[str, Any]: A diagnostic dictionary detailing the filtering
              process, including counts and statistics on excluded rows.

    Raises:
        TypeError:
            If `df_master` is not a pandas DataFrame or if
            `liquidity_filter_spread_pct` is not a float.
        ValueError:
            If `liquidity_filter_spread_pct` is not within a sensible range
            (e.g., between 0.0 and 2.0).
    """
    # --- Input Validation ---
    # Verify the type of the main DataFrame input.
    if not isinstance(df_master, pd.DataFrame):
        raise TypeError("Input `df_master` must be a pandas DataFrame.")
    # Verify the type of the threshold parameter.
    if not isinstance(liquidity_filter_spread_pct, float):
        raise TypeError("Input `liquidity_filter_spread_pct` must be a float.")
    # Verify the value of the threshold parameter is within a reasonable range.
    if not (0.0 < liquidity_filter_spread_pct <= 2.0):
        raise ValueError(
            "Input `liquidity_filter_spread_pct` must be a positive float, "
            "typically between 0.0 and 2.0 (200%)."
        )

    # --- Initialization ---
    # Record the initial number of rows for the diagnostic report.
    initial_row_count = len(df_master)

    # Create a copy to ensure the function is pure and avoids side effects.
    df_with_spread = df_master.copy()

    # --- Step 1: Bid-Ask Spread Calculation ---
    # Extract bid and ask price Series for calculation.
    bid_prices = df_with_spread['best_bid_price']
    ask_prices = df_with_spread['best_ask_price']

    # Calculate the absolute spread. Assumes ask >= bid from prior validation.
    # Equation Ref: Numerator of Spread_i = Ask_i - Bid_i
    absolute_spread = ask_prices - bid_prices

    # Calculate the mid-price.
    # Equation Ref: Denominator's component Mid_i = (Ask_i + Bid_i) / 2
    mid_price = (ask_prices + bid_prices) / 2

    # Calculate the relative spread with robust handling of division by zero.
    # If mid_price is 0, the spread is effectively infinite and should be filtered.
    # We assign np.inf in such cases, ensuring they are caught by the filter.
    # Equation Ref: Spread_i = (Ask_i - Bid_i) / Mid_i
    relative_spread = np.divide(
        absolute_spread,
        mid_price,
        out=np.full_like(mid_price, np.inf, dtype=np.float64),
        where=mid_price != 0
    )

    # Add the calculated spread to the DataFrame for filtering and diagnostics.
    df_with_spread['relative_spread'] = relative_spread

    # --- Step 2: Liquidity Threshold Application ---
    # Create a boolean mask to identify rows that meet the liquidity criteria.
    # The condition is inclusive of the boundary (<=).
    # Filtering Rule: Inclusion Condition: Spread_i <= liquidity_filter_spread_pct
    inclusion_mask = df_with_spread['relative_spread'] <= liquidity_filter_spread_pct

    # Apply the mask to the DataFrame.
    df_filtered = df_with_spread.loc[inclusion_mask].drop(columns=['relative_spread'])

    # Identify the rows that were excluded for diagnostic purposes.
    excluded_rows = df_with_spread.loc[~inclusion_mask]

    # --- Step 3: Filtering Validation and Documentation ---
    # Record the final number of rows after filtering.
    final_row_count = len(df_filtered)

    # Calculate the number of rows that were excluded.
    rows_excluded = initial_row_count - final_row_count

    # Calculate the percentage of rows excluded.
    percentage_excluded = (
        (rows_excluded / initial_row_count * 100)
        if initial_row_count > 0
        else 0.0
    )

    # Assemble the diagnostic report.
    diagnostics = {
        "filter_type": "Liquidity-Based Filtering",
        "liquidity_threshold_pct": liquidity_filter_spread_pct,
        "input_row_count": initial_row_count,
        "final_row_count": final_row_count,
        "rows_excluded": rows_excluded,
        "percentage_excluded": round(percentage_excluded, 4),
        "excluded_spread_stats": {
            "min": excluded_rows['relative_spread'].min(),
            "mean": excluded_rows['relative_spread'].mean(),
            "median": excluded_rows['relative_spread'].median(),
            "max": excluded_rows['relative_spread'].max(),
            "std": excluded_rows['relative_spread'].std()
        } if rows_excluded > 0 else {}
    }

    # Return the filtered DataFrame and the detailed diagnostic report.
    return df_filtered, diagnostics


In [None]:
# Task 6: Mid-Price and Moneyness Computation

# ======================================================================================
# Task 6: Fused Orchestrator Function
# ======================================================================================

def prepare_features_and_pairs(
    df_master: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Enriches the DataFrame with derived features and filters for complete call-put pairs.

    This function performs the final data preparation steps before the core
    RANSAC algorithm. It executes three critical operations:
    1.  Computes the mid-price for each option contract.
    2.  Computes the moneyness (K/S) for each option contract.
    3.  Filters the dataset to retain only options that form a complete and
        unique call-put pair for a given strike, expiry, and timestamp.

    This ensures the output DataFrame is perfectly structured and contains all
    necessary features for the subsequent analysis phases.

    Args:
        df_master (pd.DataFrame):
            The input DataFrame, assumed to have been cleansed by prior filtering
            steps (maturity, liquidity). It must have a valid schema.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - pd.DataFrame: A new, analysis-ready DataFrame with 'mid_price' and
              'moneyness' columns, containing only complete call-put pairs.
            - Dict[str, Any]: A diagnostic dictionary detailing the pairing
              process, including the number of rows and unique strikes removed.

    Raises:
        TypeError:
            If `df_master` is not a pandas DataFrame.
        AssertionError:
            If critical columns like 'spot_price' contain non-positive values,
            indicating an issue that should have been caught by prior validation.
    """
    # --- Input Validation ---
    # Verify the type of the main DataFrame input.
    if not isinstance(df_master, pd.DataFrame):
        raise TypeError("Input `df_master` must be a pandas DataFrame.")

    # --- Initialization ---
    # Record the initial number of rows for the diagnostic report.
    initial_row_count = len(df_master)

    # Create a copy to ensure the function is pure and avoids side effects.
    df_processed = df_master.copy()

    # --- Step 1: Mid-Price Computation and Validation ---
    # This step computes the mid-price, which will be used in the final estimation.
    # Equation Ref: Mid_{i,t} = (Bid_{i,t} + Ask_{i,t}) / 2
    df_processed['mid_price'] = (
        df_processed['best_bid_price'] + df_processed['best_ask_price']
    ) / 2

    # --- Step 2: Moneyness Calculation Implementation ---
    # This step computes moneyness, a key regressor in the estimation model.
    # Defensively assert that spot prices are positive, as this is a critical assumption.
    assert (df_processed['spot_price'] > 0).all(), "Spot prices must be positive for moneyness calculation."

    # Equation Ref: m_{i,t} = K_i / S_t
    df_processed['moneyness'] = (
        df_processed.index.get_level_values('strike_price') / df_processed['spot_price']
    )

    # --- Step 3: Call-Put Pairing Validation and Filtering ---
    # This is the most critical step to ensure data is correctly structured for RANSAC.
    # Define the index levels that uniquely identify an option series for pairing.
    pairing_levels = ['timestamp', 'asset', 'expiry_date', 'strike_price']

    # First, perform a highly efficient filter to keep only groups that have
    # exactly two entries (a potential call-put pair).
    # The use of .transform('size') is significantly faster than .filter().
    group_sizes = df_processed.groupby(pairing_levels).transform('size')
    df_potential_pairs = df_processed[group_sizes == 2].copy()

    # Now, on this much smaller DataFrame, perform the rigorous check to ensure
    # that the two entries are indeed one 'call' and one 'put'.
    def check_valid_pair(group: pd.DataFrame) -> bool:
        # A valid pair must contain exactly one 'call' and one 'put'.
        return set(group.index.get_level_values('option_type')) == {'call', 'put'}

    # Use .groupby().filter() to apply the rigorous check.
    df_final_pairs = df_potential_pairs.groupby(pairing_levels).filter(check_valid_pair)

    # --- Diagnostic Reporting ---
    # Record the final number of rows after pairing.
    final_row_count = len(df_final_pairs)

    # Calculate the number of rows that were excluded.
    rows_excluded = initial_row_count - final_row_count

    # Calculate the percentage of rows excluded.
    percentage_excluded = (
        (rows_excluded / initial_row_count * 100)
        if initial_row_count > 0
        else 0.0
    )

    # Calculate the number of unique strikes before and after.
    initial_unique_strikes = df_master.index.droplevel('option_type').nunique()
    final_unique_strikes = df_final_pairs.index.droplevel('option_type').nunique()
    strikes_excluded = initial_unique_strikes - final_unique_strikes

    # Assemble the diagnostic report.
    diagnostics = {
        "process_name": "Feature Engineering and Call-Put Pairing",
        "initial_row_count": initial_row_count,
        "final_row_count": final_row_count,
        "rows_excluded_unpaired": rows_excluded,
        "percentage_rows_excluded": round(percentage_excluded, 4),
        "initial_unique_strikes": initial_unique_strikes,
        "final_unique_strikes": final_unique_strikes,
        "unique_strikes_excluded": strikes_excluded
    }

    # Return the final, analysis-ready DataFrame and the diagnostic report.
    return df_final_pairs, diagnostics


In [None]:
# Task 7: RANSAC Data Preparation

# ======================================================================================
# Task 7: Fused Orchestrator Function
# ======================================================================================

def prepare_ransac_inputs(
    df_paired: pd.DataFrame,
    min_samples_per_group: int = 5
) -> Tuple[Dict[Tuple, Dict[str, np.ndarray]], Dict[str, Any]]:
    """
    Prepares and validates the input data for the RANSAC algorithm.

    This function transforms the analysis-ready DataFrame of paired options into
    the specific numerical inputs required for the RANSAC procedure. It performs
    three main steps:
    1.  Pivots the data to align call and put prices for each strike.
    2.  Constructs the X and Y vectors for the RANSAC regression based on the
        parity relationship defined in the reference paper (Equation 4).
    3.  Filters out estimation groups (timestamp-expiry pairs) that do not have
        a sufficient number of samples (strikes) and packages the valid groups
        into a dictionary of NumPy arrays.

    Args:
        df_paired (pd.DataFrame):
            The input DataFrame, assumed to contain clean, paired call-put data
            with 'best_bid_price' and 'best_ask_price' columns.
        min_samples_per_group (int, optional):
            The minimum number of valid call-put pairs (strikes) required for
            an estimation group to be considered for RANSAC. Defaults to 5.

    Returns:
        Tuple[Dict[Tuple, Dict[str, np.ndarray]], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, Dict[str, np.ndarray]]: The primary output. A dictionary
              where each key is a tuple `(timestamp, asset, expiry_date)`
              representing an estimation group, and each value is another
              dictionary containing the NumPy arrays 'X', 'Y', and 'strikes'
              for that group.
            - Dict[str, Any]: A diagnostic dictionary detailing the preparation
              and filtering process.

    Raises:
        TypeError:
            If `df_paired` is not a pandas DataFrame or `min_samples_per_group`
            is not an integer.
        ValueError:
            If `min_samples_per_group` is not a positive integer.
    """
    # --- Input Validation ---
    # Verify the type of the main DataFrame input.
    if not isinstance(df_paired, pd.DataFrame):
        raise TypeError("Input `df_paired` must be a pandas DataFrame.")
    # Verify the type and value of the min_samples parameter.
    if not isinstance(min_samples_per_group, int):
        raise TypeError("Input `min_samples_per_group` must be an integer.")
    if min_samples_per_group <= 0:
        raise ValueError("Input `min_samples_per_group` must be a positive integer.")

    # --- Initialization ---
    # Record the initial number of unique estimation groups.
    grouping_levels = ['timestamp', 'asset', 'expiry_date']
    initial_group_count = df_paired.groupby(grouping_levels).ngroups

    # --- Step 1: Call-Put Pair Extraction via Pivoting ---
    # Reset the index to make 'strike_price' and 'option_type' regular columns.
    df_long = df_paired.reset_index()

    # Pivot the table to place call and put prices side-by-side for each strike.
    # This is the most efficient structure for the subsequent vector calculations.
    df_pivoted = df_long.pivot_table(
        index=['timestamp', 'asset', 'expiry_date', 'strike_price'],
        columns='option_type',
        values=['best_bid_price', 'best_ask_price']
    )

    # --- Step 2: RANSAC Input Array Construction ---
    # Construct the X and Y vectors for the RANSAC regression.
    # These equations are derived from the parity relationship in Equation (4).
    # Equation Ref: X_i = C_{i,t}^{ask} - P_{i,t}^{bid}
    X = (
        df_pivoted[('best_ask_price', 'call')] -
        df_pivoted[('best_bid_price', 'put')]
    )
    # Equation Ref: Y_i = P_{i,t}^{ask} - C_{i,t}^{bid}
    Y = (
        df_pivoted[('best_ask_price', 'put')] -
        df_pivoted[('best_bid_price', 'call')]
    )

    # --- Step 3: Minimum Sample Size Validation and Final Structuring ---
    # Count the number of samples (strikes) in each estimation group.
    group_sizes = df_pivoted.groupby(grouping_levels).transform('size')

    # Filter the pivoted DataFrame to keep only groups with enough samples.
    df_valid_groups = df_pivoted[group_sizes >= min_samples_per_group]

    # Initialize the dictionary to hold the final NumPy array inputs.
    ransac_inputs: Dict[Tuple, Dict[str, np.ndarray]] = {}

    # Iterate over the valid groups to package the data.
    for group_name, group_df in df_valid_groups.groupby(grouping_levels):
        # Extract the strike prices from the group's index.
        strikes = group_df.index.get_level_values('strike_price').to_numpy()

        # Extract the corresponding X and Y values for the group.
        # We use the group's index to select from the original X and Y Series.
        x_values = X.loc[group_df.index].to_numpy()
        y_values = Y.loc[group_df.index].to_numpy()

        # Store the prepared NumPy arrays in the output dictionary.
        ransac_inputs[group_name] = {
            'X': x_values,
            'Y': y_values,
            'strikes': strikes
        }

    # --- Diagnostic Reporting ---
    # Record the final number of groups that are valid for RANSAC.
    final_group_count = len(ransac_inputs)

    # Calculate the number and percentage of groups excluded.
    groups_excluded = initial_group_count - final_group_count
    percentage_excluded = (
        (groups_excluded / initial_group_count * 100)
        if initial_group_count > 0
        else 0.0
    )

    # Assemble the diagnostic report.
    diagnostics = {
        "process_name": "RANSAC Input Preparation",
        "min_samples_per_group": min_samples_per_group,
        "initial_group_count": initial_group_count,
        "final_group_count": final_group_count,
        "groups_excluded_insufficient_samples": groups_excluded,
        "percentage_groups_excluded": round(percentage_excluded, 4)
    }

    # Return the prepared inputs and the diagnostic report.
    return ransac_inputs, diagnostics


In [None]:
# Task 8: RANSAC Algorithm Execution

# ======================================================================================
# Custom Data Types
# ======================================================================================

# A structured type hint for the output of a single RANSAC execution.
RansacResult = Dict[str, Any]

# ======================================================================================
# Task 8, Steps 1-3: Core RANSAC Helper Function
# ======================================================================================

def _execute_ransac_single_group(
    X: np.ndarray,
    Y: np.ndarray,
    residual_sq_threshold: float,
    max_iterations: int,
    min_samples: int,
    seed: Optional[int] = None
) -> Optional[RansacResult]:
    """
    Executes the RANSAC algorithm for a single group of data points.

    This function implements the core RANSAC logic from first principles to
    find the largest subset of data (inliers) that fits a linear model
    Y = intercept + slope * X.

    Args:
        X (np.ndarray):
            The independent variable data (1D array).
        Y (np.ndarray):
            The dependent variable data (1D array).
        residual_sq_threshold (float):
            The squared residual threshold. A point is considered an inlier if its
            squared distance to the model is less than this value.
        max_iterations (int):
            The maximum number of iterations to perform.
        min_samples (int):
            The minimum number of data points required to fit the model. For a
            line, this is 2.
        seed (Optional[int]):
            An optional seed for the random number generator to ensure
            reproducibility.

    Returns:
        Optional[RansacResult]:
            A dictionary containing the results ('slope', 'intercept',
            'inlier_mask', 'inlier_count') if a valid model with a consensus
            set is found. Returns None if no consensus set larger than the
            initial sample size can be established.
    """
    # --- Initialization ---
    # Initialize a random number generator for reproducible sampling.
    rng = np.random.default_rng(seed)

    # Get the total number of data points.
    n_total_samples = X.shape[0]

    # Initialize variables to track the best model found so far.
    best_inlier_count = 0
    best_inlier_mask = np.zeros(n_total_samples, dtype=bool)

    # --- Core RANSAC Iteration Loop ---
    for _ in range(max_iterations):
        # Step 1: Randomly sample a minimal subset of the data.
        sample_indices = rng.choice(
            n_total_samples, size=min_samples, replace=False
        )
        X_sample, Y_sample = X[sample_indices], Y[sample_indices]

        # Step 2: Fit a linear model to the minimal sample.
        # Using np.linalg.lstsq is numerically more stable than direct inversion.
        # We fit Y = A * p, where A = [X, 1] and p = [slope, intercept].
        A_sample = np.vstack([X_sample, np.ones(min_samples)]).T

        # Defensively check for degenerate samples (e.g., two points with the same X).
        if np.linalg.matrix_rank(A_sample) < min_samples:
            continue # Skip this iteration if the sample is degenerate.

        try:
            params, _, _, _ = np.linalg.lstsq(A_sample, Y_sample, rcond=None)
            slope, intercept = params
        except np.linalg.LinAlgError:
            continue # Skip if the linear algebra solver fails.

        # Step 3: Calculate squared residuals for all data points.
        # Equation Ref: residual_i^2 = (Y_i - (intercept + slope * X_i))^2
        Y_pred = intercept + slope * X
        squared_residuals = (Y - Y_pred)**2

        # Step 4: Identify inliers based on the residual threshold.
        current_inlier_mask = squared_residuals < residual_sq_threshold
        current_inlier_count = np.sum(current_inlier_mask)

        # Step 5: Update the best model if a better consensus set is found.
        if current_inlier_count > best_inlier_count:
            best_inlier_count = current_inlier_count
            best_inlier_mask = current_inlier_mask

    # --- Final Model Fitting ---
    # If a consensus set was found (more inliers than the initial sample size),
    # refit the model using all identified inliers for a more robust estimate.
    if best_inlier_count > min_samples:
        X_inliers = X[best_inlier_mask]
        Y_inliers = Y[best_inlier_mask]

        A_inliers = np.vstack([X_inliers, np.ones(best_inlier_count)]).T

        try:
            final_params, _, _, _ = np.linalg.lstsq(A_inliers, Y_inliers, rcond=None)
            final_slope, final_intercept = final_params
        except np.linalg.LinAlgError:
            return None # Final fit failed, return None.

        # Package and return the final results.
        return {
            "slope": final_slope,
            "intercept": final_intercept,
            "inlier_mask": best_inlier_mask,
            "inlier_count": best_inlier_count
        }

    # If no sufficient consensus set was ever found, return None.
    return None

# ======================================================================================
# Task 8: Fused Orchestrator Function
# ======================================================================================

def execute_ransac_on_all_groups(
    ransac_inputs: Dict[Tuple, Dict[str, np.ndarray]],
    hyperparameters: Dict[str, Any],
    seed: Optional[int] = None
) -> Tuple[Dict[Tuple, RansacResult], Dict[str, Any]]:
    """
    Orchestrates the execution of the RANSAC algorithm on all prepared groups.

    This function iterates through the dictionary of prepared RANSAC inputs,
    executes the core RANSAC algorithm for each group, and compiles the results.
    It provides a diagnostic report on the success rate of the procedure.

    Args:
        ransac_inputs (Dict[Tuple, Dict[str, np.ndarray]]):
            The dictionary of prepared inputs from `prepare_ransac_inputs`.
            Keys are group identifiers, values are dicts of NumPy arrays.
        hyperparameters (Dict[str, Any]):
            A dictionary containing the necessary hyperparameters for RANSAC:
            'ransac_residual_sq_threshold', 'ransac_num_iterations',
            'ransac_min_sample_size'.
        seed (Optional[int]):
            An optional seed for the random number generator to ensure
            reproducibility across all groups.

    Returns:
        Tuple[Dict[Tuple, RansacResult], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, RansacResult]: A dictionary where keys are the group
              identifiers and values are the structured RansacResult dictionaries
              for successfully processed groups.
            - Dict[str, Any]: A diagnostic dictionary summarizing the execution.
    """
    # --- Initialization ---
    # Initialize the dictionary to store results.
    ransac_results: Dict[Tuple, RansacResult] = {}

    # Initialize counters for the diagnostic report.
    groups_processed = 0
    groups_succeeded = 0

    # Extract hyperparameters for clarity.
    residual_sq_threshold = hyperparameters['ransac_residual_sq_threshold']
    max_iterations = hyperparameters['ransac_num_iterations']
    min_samples = hyperparameters['ransac_min_sample_size']

    # --- Group Iteration ---
    # Iterate through each estimation group and its prepared data.
    for group_name, data_arrays in ransac_inputs.items():
        groups_processed += 1

        # Execute the core RANSAC algorithm for the current group.
        result = _execute_ransac_single_group(
            X=data_arrays['X'],
            Y=data_arrays['Y'],
            residual_sq_threshold=residual_sq_threshold,
            max_iterations=max_iterations,
            min_samples=min_samples,
            seed=seed
        )

        # If the algorithm returned a valid result, store it.
        if result is not None:
            ransac_results[group_name] = result
            groups_succeeded += 1

    # --- Diagnostic Reporting ---
    # Calculate the success rate.
    success_rate = (
        (groups_succeeded / groups_processed * 100)
        if groups_processed > 0
        else 0.0
    )

    # Assemble the diagnostic report.
    diagnostics = {
        "process_name": "RANSAC Algorithm Execution",
        "groups_processed": groups_processed,
        "groups_succeeded": groups_succeeded,
        "groups_failed": groups_processed - groups_succeeded,
        "success_rate_pct": round(success_rate, 4)
    }

    # Return the compiled results and the diagnostic report.
    return ransac_results, diagnostics


In [None]:
# Task 9: RANSAC Validation and Filtering

# ======================================================================================
# Custom Data Types (re-defined for clarity in this standalone context)
# ======================================================================================
RansacResult = Dict[str, Any]

# ======================================================================================
# Task 9, Steps 1-2: Core Validation Helper Function
# ======================================================================================

def _validate_ransac_result(
    result: RansacResult,
    slope_tolerance: float,
    min_inliers: int
) -> Tuple[bool, str]:
    """
    Validates a single RANSAC result against specified criteria.

    This function applies a series of checks based on economic theory and
    statistical robustness to a single RansacResult object.

    Args:
        result (RansacResult):
            The dictionary containing the output of a single RANSAC execution.
        slope_tolerance (float):
            The maximum allowable percentage deviation of the slope from -1.0.
        min_inliers (int):
            The minimum number of inliers required for the result to be valid.

    Returns:
        Tuple[bool, str]:
            A tuple containing:
            - bool: True if the result is valid, False otherwise.
            - str: A string indicating the status ('VALID') or the specific
              reason for rejection.
    """
    # --- Validation Check 1: Slope Condition ---
    # The slope (ξ) should be theoretically close to -1.
    # Equation Ref: |ξ̂ + 1| <= ransac_slope_tolerance_pct
    slope = result['slope']
    if not (abs(slope + 1.0) <= slope_tolerance):
        return False, "Invalid Slope"

    # --- Validation Check 2: Intercept Positivity ---
    # The intercept (ζ) represents market friction and should be positive.
    intercept = result['intercept']
    if not (intercept > 0):
        return False, "Negative Intercept"

    # --- Validation Check 3: Minimum Inlier Count ---
    # The model must be supported by a minimum number of data points.
    inlier_count = result['inlier_count']
    if not (inlier_count >= min_inliers):
        return False, "Insufficient Inliers"

    # If all checks pass, the result is considered valid.
    return True, "VALID"

# ======================================================================================
# Task 9, Step 3: Fused Orchestrator Function
# ======================================================================================

def validate_and_filter_ransac_results(
    ransac_results: Dict[Tuple, RansacResult],
    hyperparameters: Dict[str, Any],
    min_inliers_for_validation: int = 5
) -> Tuple[Dict[Tuple, RansacResult], Dict[str, Any]]:
    """
    Filters RANSAC results based on economic and statistical criteria.

    This function iterates through a dictionary of RANSAC results, applying a
    series of validation checks to each. It filters out any results that fail
    these checks and returns a new dictionary containing only the valid results,
    along with a detailed diagnostic report summarizing the filtering process
    and the reasons for rejection.

    Args:
        ransac_results (Dict[Tuple, RansacResult]):
            A dictionary of RANSAC results, keyed by group identifier tuples.
        hyperparameters (Dict[str, Any]):
            A dictionary containing the necessary hyperparameters, specifically
            'ransac_slope_tolerance_pct'.
        min_inliers_for_validation (int, optional):
            The minimum number of inliers a RANSAC result must have to be
            considered valid. Defaults to 5.

    Returns:
        Tuple[Dict[Tuple, RansacResult], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, RansacResult]: A new dictionary containing only the
              RANSAC results that passed all validation checks.
            - Dict[str, Any]: A diagnostic dictionary detailing the number of
              results processed, passed, and rejected, including a breakdown
              of rejection reasons.
    """
    # --- Input Validation ---
    # Verify the type of the main results input.
    if not isinstance(ransac_results, dict):
        raise TypeError("Input `ransac_results` must be a dictionary.")
    # Verify the type of the hyperparameters input.
    if not isinstance(hyperparameters, dict):
        raise TypeError("Input `hyperparameters` must be a dictionary.")
    # Check for the required hyperparameter key.
    if 'ransac_slope_tolerance_pct' not in hyperparameters:
        raise ValueError("Missing 'ransac_slope_tolerance_pct' in hyperparameters.")

    # --- Initialization ---
    # Extract the slope tolerance for use in the validation helper.
    slope_tolerance = hyperparameters['ransac_slope_tolerance_pct']

    # Initialize a dictionary to store the filtered, valid results.
    valid_results: Dict[Tuple, RansacResult] = {}

    # Initialize a Counter to tally the reasons for rejection.
    rejection_reasons = Counter()

    # --- Iteration and Validation ---
    # Iterate through each group's RANSAC result.
    for group_name, result in ransac_results.items():
        # Call the helper function to perform the validation checks.
        is_valid, reason = _validate_ransac_result(
            result=result,
            slope_tolerance=slope_tolerance,
            min_inliers=min_inliers_for_validation
        )

        # If the result is valid, add it to the output dictionary.
        if is_valid:
            valid_results[group_name] = result
        # Otherwise, increment the counter for the specific rejection reason.
        else:
            rejection_reasons[reason] += 1

    # --- Diagnostic Reporting ---
    # Get the total number of results processed.
    total_processed = len(ransac_results)
    # Get the number of results that passed validation.
    total_valid = len(valid_results)
    # Calculate the total number of rejected results.
    total_rejected = total_processed - total_valid

    # Calculate the validation pass rate.
    pass_rate = (total_valid / total_processed * 100) if total_processed > 0 else 0.0

    # Assemble the final diagnostic report.
    diagnostics = {
        "process_name": "RANSAC Result Validation and Filtering",
        "total_results_processed": total_processed,
        "results_passed_validation": total_valid,
        "results_rejected": total_rejected,
        "validation_pass_rate_pct": round(pass_rate, 4),
        "rejection_reason_counts": dict(rejection_reasons)
    }

    # Return the dictionary of valid results and the diagnostic report.
    return valid_results, diagnostics


In [None]:
# Task 10: Feature Vector Construction

# ======================================================================================
# Custom Data Types (re-defined for clarity in this standalone context)
# ======================================================================================
RansacResult = Dict[str, Any]
# The structured output for a single estimation group's prepared vectors.
FeatureVectorGroup = Dict[str, Any]

# ======================================================================================
# Task 10: Fused Orchestrator Function
# ======================================================================================

def construct_feature_vectors(
    df_analysis_ready: pd.DataFrame,
    valid_ransac_results: Dict[Tuple, RansacResult]
) -> Tuple[Dict[Tuple, FeatureVectorGroup], Dict[str, Any]]:
    """
    Constructs the final feature vectors for the least-squares estimation.

    This function takes the analysis-ready DataFrame and the validated RANSAC
    results to produce the precise inputs for the closed-form solution. For each
    valid estimation group, it performs the following:
    1.  Extracts the inlier options identified by the RANSAC procedure.
    2.  Constructs the call-put difference vector 'y' using mid-prices.
    3.  Constructs the moneyness vector 'm'.
    4.  Integrates the corresponding futures and spot prices.
    5.  Packages these components into a structured dictionary.

    Args:
        df_analysis_ready (pd.DataFrame):
            The fully prepared DataFrame from Task 6, containing mid-price,
            moneyness, and complete call-put pairs.
        valid_ransac_results (Dict[Tuple, RansacResult]):
            The dictionary of validated RANSAC results from Task 9, containing
            the inlier masks for each successful group.

    Returns:
        Tuple[Dict[Tuple, FeatureVectorGroup], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, FeatureVectorGroup]: A dictionary where keys are the
              group identifiers and values are `FeatureVectorGroup` dictionaries
              containing the final NumPy arrays and scalar values for estimation.
            - Dict[str, Any]: A diagnostic dictionary summarizing the process.
    """
    # --- Input Validation ---
    if not isinstance(df_analysis_ready, pd.DataFrame):
        raise TypeError("Input `df_analysis_ready` must be a pandas DataFrame.")
    if not isinstance(valid_ransac_results, dict):
        raise TypeError("Input `valid_ransac_results` must be a dictionary.")

    # --- Initialization ---
    # Initialize the dictionary to hold the final feature vectors.
    feature_vectors: Dict[Tuple, FeatureVectorGroup] = {}

    # --- Group Iteration and Vector Construction ---
    # Iterate through each group that passed RANSAC validation.
    for group_name, ransac_data in valid_ransac_results.items():
        try:
            # --- Step 1: Inlier Data Extraction ---
            # Select all data for the current estimation group.
            group_df = df_analysis_ready.loc[group_name]

            # Retrieve the boolean mask identifying the inliers for this group.
            inlier_mask = ransac_data['inlier_mask']

            # Apply the mask to get the DataFrame of inliers only.
            # Note: The mask is a NumPy array, which works correctly with .iloc
            # because the order is preserved from the initial .groupby().
            inlier_df = group_df.iloc[inlier_mask]

            # --- Step 2: Call-Put Difference and Moneyness Vector Construction ---
            # Pivot the inlier data to align call and put information.
            inlier_pivoted = inlier_df.reset_index().pivot_table(
                index='strike_price',
                columns='option_type',
                values=['mid_price', 'moneyness']
            )

            # Construct the 'y' vector from the difference in mid-prices.
            # Equation Ref: y_t^i = C_t(K^i,T) - P_t(K^i,T)
            y_vector = (
                inlier_pivoted[('mid_price', 'call')] -
                inlier_pivoted[('mid_price', 'put')]
            ).to_numpy()

            # Construct the 'm' vector from the moneyness values.
            # Moneyness is identical for the call and put of a given strike.
            # Equation Ref: m_t^i = K^i / S_t
            m_vector = inlier_pivoted[('moneyness', 'call')].to_numpy()

            # --- Step 3: Futures and Spot Price Integration ---
            # Extract the futures and spot prices. They are constant for the group.
            # We can safely take the value from the first inlier row.
            F_value = inlier_df['futures_price'].iloc[0]
            S_value = inlier_df['spot_price'].iloc[0]

            # The number of contracts is the number of inlier pairs.
            n_contracts = len(inlier_pivoted)

            # --- Final Packaging ---
            # Store the constructed vectors and scalars in the output dictionary.
            feature_vectors[group_name] = {
                'y_vector': y_vector,
                'm_vector': m_vector,
                'F_value': F_value,
                'S_value': S_value,
                'n_contracts': n_contracts
            }

        except (KeyError, IndexError) as e:
            # This block catches potential alignment errors, which would indicate
            # a flaw in the upstream data processing pipeline.
            print(f"Warning: Could not process group {group_name}. Error: {e}. Skipping.")
            continue

    # --- Diagnostic Reporting ---
    diagnostics = {
        "process_name": "Feature Vector Construction",
        "num_input_groups": len(valid_ransac_results),
        "num_output_groups": len(feature_vectors),
        "num_failed_groups": len(valid_ransac_results) - len(feature_vectors)
    }

    # Return the dictionary of feature vectors and the diagnostic report.
    return feature_vectors, diagnostics


In [None]:
# Task 11: Summation Computations

# ======================================================================================
# Custom Data Types (re-defined for clarity in this standalone context)
# ======================================================================================
FeatureVectorGroup = Dict[str, Any]
# The structured output for a single group's computed summations.
SummationGroup = Dict[str, float]

# ======================================================================================
# Task 11, Steps 1-3: Core Computation and Validation Helper
# ======================================================================================

def _calculate_summations(
    feature_vectors: FeatureVectorGroup,
    estimation_lambda_weight: float
) -> SummationGroup:
    """
    Calculates all scalar summations required for the closed-form solution.

    This function takes the prepared feature vectors for a single estimation
    group and computes the necessary scalar aggregates (S1, S2, S3, S4), the
    lambda weight, and the futures-spot price ratio. It includes validation
    to ensure all computed values are numerically sound.

    Args:
        feature_vectors (FeatureVectorGroup):
            A dictionary containing the NumPy arrays 'y_vector', 'm_vector' and
            scalar values 'F_value', 'S_value', 'n_contracts'.
        estimation_lambda_weight (float):
            The hyperparameter that scales the regularization term.

    Returns:
        SummationGroup:
            A dictionary containing all the computed scalar values required for
            the final estimation in Task 12.

    Raises:
        AssertionError: If any computed value is not finite.
    """
    # --- Extract Inputs ---
    # Unpack the necessary arrays and scalars from the input dictionary.
    y_vec = feature_vectors['y_vector']
    m_vec = feature_vectors['m_vector']
    n_contracts = feature_vectors['n_contracts']
    F_value = feature_vectors['F_value']
    S_value = feature_vectors['S_value']

    # --- Step 1: Basic Summation Implementation ---
    # These calculations directly implement the summation terms required for the
    # analytical solution of the weighted least-squares problem.

    # Equation Ref: S_1 = Σ y_t^i
    S1 = np.sum(y_vec)

    # Equation Ref: S_2 = Σ m_t^i
    S2 = np.sum(m_vec)

    # Equation Ref: S_3 = Σ (m_t^i)^2
    S3 = np.sum(np.square(m_vec))

    # Equation Ref: S_4 = Σ m_t^i * y_t^i
    S4 = np.dot(m_vec, y_vec)

    # --- Step 2: Lambda Weight and Auxiliary Computation ---
    # Defensive assertion to prevent division by zero.
    assert S_value > 0, "Spot price must be positive."

    # Calculate the lambda weighting term for the regularization component.
    # Equation Ref: λ_{n_t^T} in the paper, scaled by n_contracts.
    lambda_value = estimation_lambda_weight * n_contracts

    # Calculate the futures-to-spot price ratio.
    # Equation Ref: F_t / S_t
    F_over_S = F_value / S_value

    # --- Assemble and Validate Output ---
    # Package all computed scalars into a result dictionary.
    summations = {
        'S1': S1, 'S2': S2, 'S3': S3, 'S4': S4,
        'lambda_value': lambda_value,
        'F_over_S': F_over_S,
        'n_contracts': float(n_contracts) # Cast to float for consistency
    }

    # --- Step 3: Computation Validation and Quality Control ---
    # Ensure all computed values are finite (not NaN or infinity). This is a
    # critical sanity check against numerical issues or invalid inputs.
    for key, value in summations.items():
        assert np.isfinite(value), f"Computed summation '{key}' is not finite: {value}"

    return summations

# ======================================================================================
# Task 11: Fused Orchestrator Function
# ======================================================================================

def compute_summations_for_all_groups(
    feature_vectors: Dict[Tuple, FeatureVectorGroup],
    hyperparameters: Dict[str, Any]
) -> Tuple[Dict[Tuple, SummationGroup], Dict[str, Any]]:
    """
    Orchestrates the computation of summations for all valid estimation groups.

    This function iterates through the dictionary of prepared feature vectors,
    calculates the required scalar summations for each group, and compiles the
    results into a new dictionary, ready for the final solution step.

    Args:
        feature_vectors (Dict[Tuple, FeatureVectorGroup]):
            The dictionary of prepared inputs from `construct_feature_vectors`.
        hyperparameters (Dict[str, Any]):
            A dictionary containing the 'estimation_lambda_weight' hyperparameter.

    Returns:
        Tuple[Dict[Tuple, SummationGroup], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, SummationGroup]: A dictionary where keys are the group
              identifiers and values are `SummationGroup` dictionaries containing
              the scalar inputs for the closed-form solution.
            - Dict[str, Any]: A diagnostic dictionary summarizing the process.
    """
    # --- Input Validation ---
    if not isinstance(feature_vectors, dict):
        raise TypeError("Input `feature_vectors` must be a dictionary.")
    if not isinstance(hyperparameters, dict):
        raise TypeError("Input `hyperparameters` must be a dictionary.")
    if 'estimation_lambda_weight' not in hyperparameters:
        raise ValueError("Missing 'estimation_lambda_weight' in hyperparameters.")

    # --- Initialization ---
    # Extract the lambda weight hyperparameter.
    lambda_weight = hyperparameters['estimation_lambda_weight']

    # Initialize the dictionary to store the results.
    all_summations: Dict[Tuple, SummationGroup] = {}

    # Initialize counters for diagnostics.
    groups_processed = 0
    groups_failed = 0

    # --- Group Iteration and Computation ---
    # Iterate through each group and its feature vectors.
    for group_name, fv_group in feature_vectors.items():
        groups_processed += 1
        try:
            # Call the helper function to compute the summations for the group.
            summation_group = _calculate_summations(
                feature_vectors=fv_group,
                estimation_lambda_weight=lambda_weight
            )
            # Store the successful result.
            all_summations[group_name] = summation_group
        except AssertionError as e:
            # Catch validation errors from the helper and log them.
            print(f"Warning: Summation calculation failed for group {group_name}. Error: {e}. Skipping.")
            groups_failed += 1
            continue

    # --- Diagnostic Reporting ---
    diagnostics = {
        "process_name": "Summation Computation",
        "num_input_groups": len(feature_vectors),
        "num_groups_succeeded": len(all_summations),
        "num_groups_failed": groups_failed
    }

    # Return the dictionary of computed summations and the diagnostic report.
    return all_summations, diagnostics


In [None]:
# Task 12: Closed-Form Solution Computation

# ======================================================================================
# Custom Data Types (re-defined for clarity in this standalone context)
# ======================================================================================
SummationGroup = Dict[str, float]
# The structured output for a single group's estimated bond prices.
SolutionResult = Dict[str, float]

# ======================================================================================
# Task 12, Steps 1-3: Core Solver and Validation Helper
# ======================================================================================

def _solve_closed_form(
    summations: SummationGroup,
    determinant_tolerance: float = 1e-12
) -> Optional[SolutionResult]:
    """
    Computes the closed-form solution for zero-coupon bond prices (α*, β*).

    This function implements the analytical solution derived from the weighted
    least-squares problem. It first calculates the system's determinant to
    ensure solvability and then computes the bond prices, followed by a final
    economic validation check. This revised version also passes through the
    'n_contracts' value for the final audit trail.

    Args:
        summations (SummationGroup):
            A dictionary containing all pre-computed scalar summations and
            auxiliary values for a single estimation group.
        determinant_tolerance (float):
            The numerical tolerance below which the system's determinant is
            considered zero, indicating an unsolvable (singular) system.

    Returns:
        Optional[SolutionResult]:
            A dictionary containing the estimated bond prices 'alpha_star',
            'beta_star', and the supporting 'n_contracts' if the solution is
            valid. Returns None if the system is unsolvable or if the results
            are economically invalid.
    """
    # --- Unpack Inputs ---
    # Extract all necessary scalar values from the input dictionary.
    S1, S2, S3, S4 = summations['S1'], summations['S2'], summations['S3'], summations['S4']
    lambda_val = summations['lambda_value']
    F_over_S = summations['F_over_S']
    n = summations['n_contracts']

    # --- Step 1: Determinant Calculation and Solvability Verification ---
    # These terms correspond to the elements of the 2x2 matrix of the linear system.
    term_A = lambda_val + S3
    term_B = lambda_val * F_over_S + S2
    term_C = n + lambda_val * (F_over_S**2)

    # Equation Ref: d = (λ + S3)(n + λ(F/S)²) - (λ(F/S) + S2)²
    determinant = term_A * term_C - term_B**2

    # Check if the system is singular or ill-conditioned.
    if abs(determinant) < determinant_tolerance:
        return None # System is unsolvable.

    # --- Step 2: Zero-Coupon Bond Price Calculation ---
    # This is the direct implementation of the closed-form solution.
    # Equation Ref: α* = (1/d) * [(λ + S3)S1 - (λ(F/S) + S2)S4]
    alpha_star = (1 / determinant) * (term_A * S1 - term_B * S4)

    # Equation Ref: β* = (1/d) * [(λ(F/S) + S2)S1 - (n + λ(F/S)²)S4]
    beta_star = (1 / determinant) * (term_B * S1 - term_C * S4)

    # --- Step 3: Solution Validation and Quality Assessment ---
    # A critical economic sanity check: zero-coupon bond prices must be positive.
    if not (alpha_star > 0 and beta_star > 0):
        return None # Result is economically nonsensical.

    # If all checks pass, return the valid solution, now including n_contracts.
    return {
        "alpha_star": alpha_star,
        "beta_star": beta_star,
        "n_contracts": n
    }

# ======================================================================================
# Task 12: Fused Orchestrator Function
# ======================================================================================

def compute_closed_form_solutions(
    all_summations: Dict[Tuple, SummationGroup]
) -> Tuple[Dict[Tuple, SolutionResult], Dict[str, Any]]:
    """
    Orchestrates the computation of closed-form solutions for all valid groups.

    This function iterates through the dictionary of pre-computed summations,
    calls the core solver for each group, and compiles the valid solutions.
    It provides a detailed diagnostic report on the success and failure rates
    of the estimation process.

    Args:
        all_summations (Dict[Tuple, SummationGroup]):
            The dictionary of summation groups from `compute_summations_for_all_groups`.

    Returns:
        Tuple[Dict[Tuple, SolutionResult], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, SolutionResult]: A dictionary where keys are the group
              identifiers and values are `SolutionResult` dictionaries containing
              the valid estimated bond prices.
            - Dict[str, Any]: A diagnostic dictionary summarizing the process.
    """
    # --- Input Validation ---
    if not isinstance(all_summations, dict):
        raise TypeError("Input `all_summations` must be a dictionary.")

    # --- Initialization ---
    # Initialize the dictionary to store the final valid solutions.
    all_solutions: Dict[Tuple, SolutionResult] = {}

    # Initialize a Counter to tally the reasons for failure.
    failure_reasons = Counter()

    # --- Group Iteration and Computation ---
    # Iterate through each group and its computed summations.
    for group_name, summation_group in all_summations.items():
        # Call the helper function to solve the system for the current group.
        solution = _solve_closed_form(summations=summation_group)

        # If the solver returned a valid solution, store it.
        if solution is not None:
            all_solutions[group_name] = solution
        else:
            # If the solver returned None, it's due to a singular system or an
            # economically invalid result. For diagnostics, we can distinguish,
            # but for now, we'll group them as "Estimation Failure".
            # A more granular check could be added inside the helper if needed.
            failure_reasons["Estimation Failure (Singular or Invalid Result)"] += 1

    # --- Diagnostic Reporting ---
    total_processed = len(all_summations)
    total_succeeded = len(all_solutions)
    total_failed = total_processed - total_succeeded
    success_rate = (total_succeeded / total_processed * 100) if total_processed > 0 else 0.0

    diagnostics = {
        "process_name": "Closed-Form Solution Computation",
        "num_input_groups": total_processed,
        "num_solutions_succeeded": total_succeeded,
        "num_solutions_failed": total_failed,
        "success_rate_pct": round(success_rate, 4),
        "failure_reason_counts": dict(failure_reasons)
    }

    # Return the dictionary of valid solutions and the diagnostic report.
    return all_solutions, diagnostics


In [None]:
# Task 13: Logarithmic Interest Rate Conversion

# ======================================================================================
# Custom Data Types (re-defined for clarity in this standalone context)
# ======================================================================================
SolutionResult = Dict[str, float]
# The structured output for a single group's converted interest rates.
InterestRateResult = Dict[str, Any]

# ======================================================================================
# Task 13, Steps 1-3: Core Conversion and Validation Helper
# ======================================================================================

def _convert_prices_to_rates(
    solution: SolutionResult,
    group_name: Tuple,
    annualization_factor: float
) -> Optional[InterestRateResult]:
    """
    Converts zero-coupon bond prices to continuously compounded interest rates.

    This function takes the estimated bond prices for a single group and
    calculates the corresponding annualized interest rates. This revised version
    also passes through the 'n_contracts' value from the solution dictionary.

    Args:
        solution (SolutionResult):
            A dictionary containing the estimated bond prices 'alpha_star',
            'beta_star', and the supporting 'n_contracts'.
        group_name (Tuple):
            The identifier tuple for the group, containing (timestamp, asset,
            expiry_date).
        annualization_factor (float):
            The factor to convert days to years (e.g., 365.25).

    Returns:
        Optional[InterestRateResult]:
            A dictionary containing the computed rates and metadata if the
            conversion is successful. Returns None if any numerical error occurs.
    """
    # --- Unpack Inputs ---
    # Extract bond prices and the number of contracts from the solution.
    alpha_star = solution['alpha_star']
    beta_star = solution['beta_star']
    n_contracts = solution['n_contracts']
    # Extract temporal information from the group name.
    timestamp, _, expiry_date = group_name

    # --- Step 1: Time-to-Expiry Calculation ---
    # Calculate the time to expiry in days.
    time_to_expiry_days = (expiry_date - timestamp) / pd.Timedelta(days=1)

    # Defensively check that time-to-expiry is positive.
    if time_to_expiry_days <= 0:
        return None

    # Convert the duration to years using the specified annualization factor.
    time_to_expiry_years = time_to_expiry_days / annualization_factor

    # --- Step 2: Logarithmic Rate Computation ---
    # Equation Ref: r_crypto = - (1 / (T-t)) * ln(α*)
    crypto_rate = -np.log(alpha_star) / time_to_expiry_years

    # Equation Ref: r_ref = - (1 / (T-t)) * ln(β*)
    ref_rate = -np.log(beta_star) / time_to_expiry_years

    # --- Step 3: Rate Validation ---
    # A final sanity check to ensure the computed rates are finite numbers.
    if not (np.isfinite(crypto_rate) and np.isfinite(ref_rate)):
        return None

    # --- Final Packaging ---
    # Assemble the complete result, now including n_contracts.
    return {
        "crypto_rate": crypto_rate,
        "ref_rate": ref_rate,
        "time_to_expiry_days": time_to_expiry_days,
        "alpha_star": alpha_star,
        "beta_star": beta_star,
        "n_contracts": n_contracts
    }

# ======================================================================================
# Task 13: Fused Orchestrator Function
# ======================================================================================

def convert_all_solutions_to_rates(
    all_solutions: Dict[Tuple, SolutionResult],
    hyperparameters: Dict[str, Any]
) -> Tuple[Dict[Tuple, InterestRateResult], Dict[str, Any]]:
    """
    Orchestrates the conversion of all bond price solutions to interest rates.

    This function iterates through the dictionary of valid closed-form solutions,
    converts each pair of bond prices into their corresponding continuously
    compounded interest rates, and compiles the results.

    Args:
        all_solutions (Dict[Tuple, SolutionResult]):
            The dictionary of valid solutions from `compute_closed_form_solutions`.
        hyperparameters (Dict[str, Any]):
            A dictionary containing the 'annualization_factor' hyperparameter.

    Returns:
        Tuple[Dict[Tuple, InterestRateResult], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, InterestRateResult]: A dictionary where keys are group
              identifiers and values are `InterestRateResult` dictionaries.
            - Dict[str, Any]: A diagnostic dictionary summarizing the conversion.
    """
    # --- Input Validation ---
    if not isinstance(all_solutions, dict):
        raise TypeError("Input `all_solutions` must be a dictionary.")
    if not isinstance(hyperparameters, dict):
        raise TypeError("Input `hyperparameters` must be a dictionary.")
    if 'annualization_factor' not in hyperparameters:
        raise ValueError("Missing 'annualization_factor' in hyperparameters.")

    # --- Initialization ---
    # Extract the annualization factor.
    annualization_factor = hyperparameters['annualization_factor']

    # Initialize the dictionary to store the final interest rate results.
    all_rates: Dict[Tuple, InterestRateResult] = {}

    # Initialize counters for diagnostics.
    groups_processed = 0
    groups_failed = 0

    # --- Group Iteration and Conversion ---
    # Iterate through each group and its solved bond prices.
    for group_name, solution in all_solutions.items():
        groups_processed += 1

        # Call the helper function to perform the conversion and validation.
        rate_result = _convert_prices_to_rates(
            solution=solution,
            group_name=group_name,
            annualization_factor=annualization_factor
        )

        # If the conversion was successful, store the result.
        if rate_result is not None:
            all_rates[group_name] = rate_result
        else:
            groups_failed += 1

    # --- Diagnostic Reporting ---
    success_rate = (
        ((groups_processed - groups_failed) / groups_processed * 100)
        if groups_processed > 0
        else 0.0
    )

    diagnostics = {
        "process_name": "Interest Rate Conversion",
        "num_input_solutions": groups_processed,
        "num_conversions_succeeded": len(all_rates),
        "num_conversions_failed": groups_failed,
        "success_rate_pct": round(success_rate, 4)
    }

    # Return the dictionary of computed rates and the diagnostic report.
    return all_rates, diagnostics


In [None]:
# Task 14: Rate Validation and Quality Control

# ======================================================================================
# Custom Data Types (re-defined for clarity in this standalone context)
# ======================================================================================
InterestRateResult = Dict[str, Any]

# ======================================================================================
# Task 14, Steps 1-3: Core Validation Helper Function
# ======================================================================================

def _validate_single_rate_result(
    rate_result: InterestRateResult,
    asset: str,
    validation_bounds: Dict[str, Tuple[float, float]]
) -> Tuple[bool, str]:
    """
    Validates a single interest rate result against economic bounds.

    This function applies a series of asset-specific and cross-validation checks
    to ensure the computed interest rates are economically plausible.

    Args:
        rate_result (InterestRateResult):
            The dictionary containing the computed rates for a single group.
        asset (str):
            The crypto asset for the group (e.g., 'BTC', 'ETH').
        validation_bounds (Dict[str, Tuple[float, float]]):
            A dictionary defining the plausible [min, max] ranges for each
            asset and the reference currency, plus the max spread.

    Returns:
        Tuple[bool, str]:
            A tuple containing:
            - bool: True if the result is valid, False otherwise.
            - str: A string indicating 'VALID' or the reason for rejection.
    """
    # --- Unpack Inputs ---
    crypto_rate = rate_result['crypto_rate']
    ref_rate = rate_result['ref_rate']

    # --- Step 1: Crypto Rate Validation ---
    # Retrieve the valid range for the specific crypto asset.
    crypto_min, crypto_max = validation_bounds.get(asset, (-float('inf'), float('inf')))
    # Check if the crypto rate is within its defined plausible range.
    if not (crypto_min <= crypto_rate <= crypto_max):
        return False, f"Crypto Rate Out of Bounds ({asset})"

    # --- Step 2: Reference Rate Validation ---
    # Retrieve the valid range for the reference currency (assumed USD).
    ref_min, ref_max = validation_bounds.get('USD', (-float('inf'), float('inf')))
    # Check if the reference rate is within its defined plausible range.
    if not (ref_min <= ref_rate <= ref_max):
        return False, "Reference Rate Out of Bounds (USD)"

    # --- Step 3: Cross-Validation Checks ---
    # Check if the spread between the two rates is excessively large.
    max_spread = validation_bounds.get('MAX_SPREAD', float('inf'))
    rate_spread = abs(crypto_rate - ref_rate)
    if rate_spread > max_spread:
        return False, "Extreme Rate Spread"

    # If all checks pass, the result is considered valid.
    return True, "VALID"

# ======================================================================================
# Task 14: Fused Orchestrator Function
# ======================================================================================

def validate_and_filter_rates(
    all_rates: Dict[Tuple, InterestRateResult]
) -> Tuple[Dict[Tuple, InterestRateResult], Dict[str, Any]]:
    """
    Orchestrates the validation and filtering of computed interest rates.

    This function applies a final layer of quality control by filtering the
    computed interest rates based on predefined, economically reasonable bounds.
    It checks asset-specific rates, reference currency rates, and the spread
    between them.

    Args:
        all_rates (Dict[Tuple, InterestRateResult]):
            The dictionary of computed interest rates from the previous task.

    Returns:
        Tuple[Dict[Tuple, InterestRateResult], Dict[str, Any]]:
            A tuple containing:
            - Dict[Tuple, InterestRateResult]: A new dictionary containing only
              the interest rate results that passed all validation checks.
            - Dict[str, Any]: A diagnostic dictionary summarizing the filtering
              process and the reasons for rejection.
    """
    # --- Input Validation ---
    if not isinstance(all_rates, dict):
        raise TypeError("Input `all_rates` must be a dictionary.")

    # --- Initialization ---
    # Define the validation schema with plausible economic bounds.
    validation_bounds: Dict[str, Any] = {
        'BTC': (-0.5, 0.5),   # Annual rate range for BTC
        'ETH': (-0.5, 0.5),   # Annual rate range for ETH
        'USD': (-0.2, 1.0),   # Annual rate range for the reference currency
        'MAX_SPREAD': 2.0     # Maximum absolute spread between crypto and ref rates
    }

    # Initialize a dictionary to store the filtered, valid results.
    valid_rates: Dict[Tuple, InterestRateResult] = {}

    # Initialize a Counter to tally the reasons for rejection.
    rejection_reasons = Counter()

    # --- Iteration and Validation ---
    # Iterate through each group's computed interest rate result.
    for group_name, rate_result in all_rates.items():
        # The asset is the second element in the group name tuple.
        asset = group_name[1]

        # Call the helper function to perform the validation checks.
        is_valid, reason = _validate_single_rate_result(
            rate_result=rate_result,
            asset=asset,
            validation_bounds=validation_bounds
        )

        # If the result is valid, add it to the output dictionary.
        if is_valid:
            valid_rates[group_name] = rate_result
        # Otherwise, increment the counter for the specific rejection reason.
        else:
            rejection_reasons[reason] += 1

    # --- Diagnostic Reporting ---
    total_processed = len(all_rates)
    total_valid = len(valid_rates)
    total_rejected = total_processed - total_valid
    pass_rate = (total_valid / total_processed * 100) if total_processed > 0 else 0.0

    diagnostics = {
        "process_name": "Economic Validation of Interest Rates",
        "validation_bounds": validation_bounds,
        "total_rates_processed": total_processed,
        "rates_passed_validation": total_valid,
        "rates_rejected": total_rejected,
        "validation_pass_rate_pct": round(pass_rate, 4),
        "rejection_reason_counts": dict(rejection_reasons)
    }

    # Return the dictionary of valid rates and the diagnostic report.
    return valid_rates, diagnostics


In [None]:
# Task 15: Results Aggregation and Storage

# ======================================================================================
# Custom Data Types (re-defined for clarity in this standalone context)
# ======================================================================================
InterestRateResult = Dict[str, Any]

# ======================================================================================
# Task 15: Fused Orchestrator Function
# ======================================================================================

def aggregate_results_to_dataframe(
    all_rates: Dict[Tuple, InterestRateResult]
) -> pd.DataFrame:
    """
    Aggregates the final interest rate results into a structured DataFrame.

    This function transforms the nested dictionary of estimation results into a
    single, flat pandas DataFrame. This revised version is more complete, as it
    now includes the 'n_contracts' column, which represents the number of
    inlier option pairs that supported each individual estimate, providing a
    crucial measure of reliability.

    Args:
        all_rates (Dict[Tuple, InterestRateResult]):
            The dictionary of validated interest rate results. It is now expected
            that each `InterestRateResult` dictionary contains the 'n_contracts' key.

    Returns:
        pd.DataFrame:
            A DataFrame where each row corresponds to a successful estimation
            for a given timestamp and maturity. The DataFrame contains all key
            information from the estimation process in a clean, tabular format.

    Raises:
        TypeError:
            If `all_rates` is not a dictionary.
        ValueError:
            If the input dictionary is empty, or if the inner result dictionaries
            are missing the required 'n_contracts' key.
    """
    # --- Input Validation ---
    # Check the main input type.
    if not isinstance(all_rates, dict):
        raise TypeError("Input `all_rates` must be a dictionary.")
    # Check if the input is empty.
    if not all_rates:
        raise ValueError("Input dictionary `all_rates` is empty. No results to aggregate.")
    # Check the structure of the first inner dictionary as a representative sample.
    first_result = next(iter(all_rates.values()))
    if 'n_contracts' not in first_result:
        raise ValueError("Incomplete data propagation: 'n_contracts' key is missing from result dictionaries.")

    # --- Step 1: Results DataFrame Construction ---
    # Initialize a list to hold the "flattened" records for the DataFrame.
    records: List[Dict[str, Any]] = []

    # Iterate through the input dictionary to create a list of records.
    for group_name, rate_result in all_rates.items():
        # Unpack the group name tuple into its components.
        timestamp, asset, expiry_date = group_name

        # Create a new dictionary for the current record, combining the group
        # identifier with the result data (which now includes n_contracts).
        record = {
            'timestamp': timestamp,
            'asset': asset,
            'expiry_date': expiry_date,
            **rate_result
        }
        # Append the fully constructed record to the list.
        records.append(record)

    # Create the DataFrame from the list of records.
    results_df = pd.DataFrame.from_records(records)

    # --- Schema Enforcement and Finalization ---
    # Define the exact column order for the final, more complete DataFrame.
    final_column_order = [
        'timestamp',
        'asset',
        'expiry_date',
        'time_to_expiry_days',
        'n_contracts', # Added for completeness
        'crypto_rate',
        'ref_rate',
        'alpha_star',
        'beta_star'
    ]

    # Reorder the columns to match the specified schema.
    results_df = results_df[final_column_order]

    # Explicitly cast data types for correctness and memory efficiency.
    dtype_mapping = {
        'timestamp': 'datetime64[ns]',
        'asset': 'category',
        'expiry_date': 'datetime64[ns]',
        'time_to_expiry_days': 'float64',
        'n_contracts': 'int32', # Use an efficient integer type
        'crypto_rate': 'float64',
        'ref_rate': 'float64',
        'alpha_star': 'float64',
        'beta_star': 'float64'
    }
    results_df = results_df.astype(dtype_mapping)

    # Sort the final DataFrame for clean, predictable ordering.
    results_df = results_df.sort_values(by=['timestamp', 'asset', 'expiry_date']).reset_index(drop=True)

    # Return the final, polished, and more complete DataFrame.
    return results_df



In [None]:
# Task 16: Daily Aggregation from Hourly Data

# ======================================================================================
# Task 16: Fused Orchestrator Function
# ======================================================================================

def aggregate_rates_daily(
    results_df: pd.DataFrame,
    min_hourly_obs: int = 2
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Aggregates high-frequency interest rate estimates to a daily frequency.

    This function transforms the granular, point-in-time interest rate estimates
    into a daily panel dataset. It performs three main steps:
    1.  Assigns each estimate to a calendar date.
    2.  Buckets each estimate into a predefined maturity range.
    3.  Groups the data by date, asset, and maturity bucket, then computes
        statistical aggregates (mean, std, count) for the rates.
    4.  Filters out any daily aggregates that are based on an insufficient
        number of hourly observations.

    Args:
        results_df (pd.DataFrame):
            The DataFrame of successful estimation results from Task 15.
        min_hourly_obs (int, optional):
            The minimum number of hourly observations required within a day for
            an aggregated data point to be considered valid. Defaults to 2.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - pd.DataFrame: A new DataFrame containing the daily aggregated
              yield curve data, indexed by date, asset, and maturity bucket.
            - Dict[str, Any]: A diagnostic dictionary summarizing the aggregation.

    Raises:
        TypeError:
            If `results_df` is not a pandas DataFrame or `min_hourly_obs` is not an int.
        ValueError:
            If `results_df` is empty or `min_hourly_obs` is not positive.
    """
    # --- Input Validation ---
    if not isinstance(results_df, pd.DataFrame):
        raise TypeError("Input `results_df` must be a pandas DataFrame.")
    if not isinstance(min_hourly_obs, int):
        raise TypeError("Input `min_hourly_obs` must be an integer.")
    if results_df.empty:
        raise ValueError("Input `results_df` is empty. Cannot perform aggregation.")
    if min_hourly_obs <= 0:
        raise ValueError("Input `min_hourly_obs` must be a positive integer.")

    # --- Initialization ---
    df_agg = results_df.copy()
    initial_point_count = len(df_agg)

    # --- Step 1: Temporal Grouping and Maturity Bucketing ---
    # Extract the calendar date from the timestamp for daily grouping.
    df_agg['date'] = df_agg['timestamp'].dt.date

    # Define the maturity buckets (in days).
    maturity_bins = [30, 60, 120, 240, 400]
    maturity_labels = ['30-60d', '60-120d', '120-240d', '240-400d']

    # Use pd.cut to assign each observation to a maturity bucket.
    df_agg['maturity_bucket'] = pd.cut(
        df_agg['time_to_expiry_days'],
        bins=maturity_bins,
        labels=maturity_labels,
        right=True, # Bins are (30, 60], (60, 120], etc.
        include_lowest=True # Ensures the lowest bound (30) is included.
    )

    # Drop rows that fall outside the defined maturity buckets.
    df_agg.dropna(subset=['maturity_bucket'], inplace=True)
    rows_after_bucketing = len(df_agg)

    # --- Step 2: Statistical Aggregation and Quality Control ---
    # Define the aggregation operations to be performed on each group.
    agg_functions = {
        'crypto_rate': ['mean', 'std'],
        'ref_rate': ['mean', 'std'],
        'timestamp': 'count' # Use a non-null column to count observations
    }

    # Group by date, asset, and bucket, then apply the aggregations.
    daily_aggregated = df_agg.groupby(
        ['date', 'asset', 'maturity_bucket']
    ).agg(agg_functions)

    # Clean up the hierarchical column names created by .agg().
    daily_aggregated.columns = [
        '_'.join(col).strip() for col in daily_aggregated.columns.values
    ]
    daily_aggregated.rename(columns={'timestamp_count': 'n_observations'}, inplace=True)

    # Filter out aggregated points that are based on too few observations.
    initial_agg_points = len(daily_aggregated)
    daily_aggregated_filtered = daily_aggregated[
        daily_aggregated['n_observations'] >= min_hourly_obs
    ]
    final_agg_points = len(daily_aggregated_filtered)

    # --- Diagnostic Reporting ---
    diagnostics = {
        "process_name": "Daily Aggregation of Rates",
        "min_hourly_obs_per_day": min_hourly_obs,
        "initial_point_count": initial_point_count,
        "points_after_bucketing": rows_after_bucketing,
        "initial_aggregated_points": initial_agg_points,
        "final_aggregated_points": final_agg_points,
        "points_rejected_insufficient_obs": initial_agg_points - final_agg_points
    }

    # Return the final aggregated DataFrame and the diagnostic report.
    return daily_aggregated_filtered, diagnostics


In [None]:
# Task 17: Maturity Interpolation to Target Tenors

# ======================================================================================
# Task 17, Steps 1-3: Core Interpolation Helper Function
# ======================================================================================

def _interpolate_single_curve(
    daily_slice: pd.DataFrame,
    target_tenors: List[int]
) -> pd.DataFrame:
    """
    Performs linear interpolation on a single day's yield curve data.

    This function takes the aggregated rate data for a single date and asset,
    and interpolates/extrapolates to find the rates at the specified target tenors.

    Args:
        daily_slice (pd.DataFrame):
            A DataFrame slice containing the aggregated rates for one date/asset.
            Must contain 'maturity_midpoint', 'crypto_rate_mean', 'ref_rate_mean'.
        target_tenors (List[int]):
            A list of integer tenors (in days) to interpolate to.

    Returns:
        pd.DataFrame:
            A DataFrame with an index of the target tenors and columns for the
            interpolated crypto and reference rates, along with a status flag.
    """
    # --- Step 1: Feasibility Analysis ---
    # Drop any potential duplicates in maturity points and sort by maturity.
    curve_data = daily_slice.drop_duplicates(
        subset=['maturity_midpoint']
    ).sort_values('maturity_midpoint')

    # At least two points are required for linear interpolation.
    if len(curve_data) < 2:
        return pd.DataFrame() # Return empty DataFrame if not feasible.

    # Extract the known x (maturities) and y (rates) values.
    x_known = curve_data['maturity_midpoint'].to_numpy()
    y_known_crypto = curve_data['crypto_rate_mean'].to_numpy()
    y_known_ref = curve_data['ref_rate_mean'].to_numpy()

    # --- Step 2: Linear Interpolation ---
    # Use numpy.interp for efficient and robust 1D linear interpolation.
    # This single function handles both interpolation and flat extrapolation.
    # Equation Ref: r(τ_target) = r(τ₁) + [ (r(τ₂) - r(τ₁))/(τ₂ - τ₁) ] * (τ_target - τ₁)
    interpolated_crypto_rates = np.interp(target_tenors, x_known, y_known_crypto)
    interpolated_ref_rates = np.interp(target_tenors, x_known, y_known_ref)

    # --- Step 3: Extrapolation Handling and Quality Assessment ---
    # Determine the status (Interpolated vs. Extrapolated) for each target tenor.
    min_known_maturity = x_known[0]
    max_known_maturity = x_known[-1]

    status = [
        'INTERPOLATED' if min_known_maturity <= tenor <= max_known_maturity else 'EXTRAPOLATED'
        for tenor in target_tenors
    ]

    # --- Final Packaging ---
    # Create a results DataFrame for this single curve.
    result_df = pd.DataFrame({
        'tenor_days': target_tenors,
        'crypto_rate': interpolated_crypto_rates,
        'ref_rate': interpolated_ref_rates,
        'status': status
    })

    return result_df

# ======================================================================================
# Task 17: Fused Orchestrator Function
# ======================================================================================

def interpolate_yield_curves(
    daily_aggregated_df: pd.DataFrame,
    target_tenors_days: List[int]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Interpolates daily yield curves to a set of standardized target tenors.

    This function transforms the daily aggregated data from maturity buckets
    into a time series of rates at fixed maturity points (e.g., 90, 180, 360 days).
    It uses linear interpolation where possible and flat extrapolation for points
    outside the available maturity range, flagging each point accordingly.

    Args:
        daily_aggregated_df (pd.DataFrame):
            The DataFrame of daily aggregated rates from Task 16.
        target_tenors_days (List[int]):
            A list of integer tenors (in days) to which the curves will be
            standardized.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - pd.DataFrame: A new DataFrame indexed by date and asset, with
              columns for each target tenor's rates and status.
            - Dict[str, Any]: A diagnostic dictionary summarizing the process.
    """
    # --- Input Validation ---
    if not isinstance(daily_aggregated_df, pd.DataFrame):
        raise TypeError("Input `daily_aggregated_df` must be a pandas DataFrame.")
    if not isinstance(target_tenors_days, list) or not all(isinstance(i, int) for i in target_tenors_days):
        raise TypeError("Input `target_tenors_days` must be a list of integers.")
    if daily_aggregated_df.empty:
        raise ValueError("Input `daily_aggregated_df` is empty.")

    # --- Initialization ---
    df_interp = daily_aggregated_df.reset_index().copy()

    # --- Step 1 (Prep): Calculate Maturity Midpoints ---
    # Define the midpoints for each maturity bucket.
    bucket_midpoints = {
        '30-60d': (30 + 60) / 2,
        '60-120d': (60 + 120) / 2,
        '120-240d': (120 + 240) / 2,
        '240-400d': (240 + 400) / 2
    }
    df_interp['maturity_midpoint'] = df_interp['maturity_bucket'].map(bucket_midpoints)

    # --- Main Interpolation Step ---
    # Group by date and asset, then apply the interpolation helper to each group.
    interpolated_results = df_interp.groupby(['date', 'asset']).apply(
        _interpolate_single_curve,
        target_tenors=target_tenors_days
    ).reset_index()

    # --- Reshape Final DataFrame ---
    # Pivot the results to create the final time-series format.
    if not interpolated_results.empty:
        final_df = interpolated_results.pivot_table(
            index=['date', 'asset'],
            columns='tenor_days',
            values=['crypto_rate', 'ref_rate', 'status']
        )
        # Clean up the hierarchical column names.
        final_df.columns = [f"{val}_{tenor}d" for val, tenor in final_df.columns]
    else:
        final_df = pd.DataFrame() # Handle case where no interpolation was possible.

    # --- Diagnostic Reporting ---
    total_curves = len(df_interp.groupby(['date', 'asset']))
    successful_curves = len(final_df) if not final_df.empty else 0

    diagnostics = {
        "process_name": "Maturity Interpolation",
        "target_tenors_days": target_tenors_days,
        "total_curves_processed": total_curves,
        "curves_successfully_interpolated": successful_curves,
        "curves_failed_interpolation": total_curves - successful_curves
    }

    return final_df, diagnostics


In [None]:
# Task 18: Time Series Construction and Gap Handling

# ======================================================================================
# Task 18: Fused Orchestrator Function
# ======================================================================================

def construct_final_time_series(
    interpolated_df: pd.DataFrame,
    max_interpolation_gap: int = 7
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Constructs a complete, daily time series by handling gaps in the data.

    This function takes the daily, maturity-interpolated yield curve data and
    ensures its temporal continuity. It performs three main steps:
    1.  Reindexes the data for each asset to a full daily calendar range, making
        any missing days explicit with NaNs.
    2.  Applies a tiered gap-filling strategy: linear interpolation for small
        gaps (up to `max_interpolation_gap` days).
    3.  Creates a quality flag for each data point, indicating whether it was
        originally observed, interpolated over time, or extrapolated over maturity.

    Args:
        interpolated_df (pd.DataFrame):
            The DataFrame of daily, maturity-interpolated rates from Task 17.
            Must have a MultiIndex of (date, asset).
        max_interpolation_gap (int, optional):
            The maximum number of consecutive missing days to fill using linear
            interpolation. Gaps larger than this will be left as NaN. Defaults to 7.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - pd.DataFrame: The final, dense time-series DataFrame with gaps
              filled and quality flags.
            - Dict[str, Any]: A diagnostic dictionary summarizing the process.
    """
    # --- Input Validation ---
    if not isinstance(interpolated_df, pd.DataFrame):
        raise TypeError("Input `interpolated_df` must be a pandas DataFrame.")
    if not isinstance(max_interpolation_gap, int) or max_interpolation_gap < 0:
        raise ValueError("`max_interpolation_gap` must be a non-negative integer.")
    if interpolated_df.empty:
        return pd.DataFrame(), {"process_name": "Time Series Construction", "status": "Input empty, no action taken."}

    # --- Initialization ---
    df_final = interpolated_df.copy()
    # Identify the rate columns that need to be interpolated.
    rate_cols = [col for col in df_final.columns if 'rate' in col]
    # Identify the status columns that need to be forward-filled.
    status_cols = [col for col in df_final.columns if 'status' in col]

    all_filled_dfs = []
    initial_rows = len(df_final)

    # --- Main Loop: Process each asset's time series independently ---
    for asset, asset_df in df_final.groupby('asset'):
        # --- Step 1: Temporal Reindexing ---
        # Prepare the asset-specific DataFrame for reindexing.
        asset_df = asset_df.droplevel('asset').reset_index().set_index('date')

        # Create a complete daily date range for this asset.
        start_date, end_date = asset_df.index.min(), asset_df.index.max()
        full_date_range = pd.date_range(start=start_date, end=end_date, freq='D')

        # Reindex the DataFrame to the full date range, introducing NaNs for missing days.
        asset_reindexed = asset_df.reindex(full_date_range)

        # --- Step 2: Gap Classification and Systematic Filling ---
        # Interpolate the numeric rate columns.
        # The 'limit' parameter elegantly handles the tiered gap-filling rule.
        asset_reindexed[rate_cols] = asset_reindexed[rate_cols].interpolate(
            method='linear',
            limit=max_interpolation_gap,
            limit_direction='forward' # Fill forward up to the limit
        )

        # Forward-fill the categorical status columns to propagate status.
        asset_reindexed[status_cols] = asset_reindexed[status_cols].ffill(
            limit=max_interpolation_gap
        )

        # --- Step 3: Time Series Quality Assessment and Flagging ---
        # Create a new 'source' column to track interpolated points.
        # We identify rows that were originally NaN but are now filled.
        original_nan_mask = asset_df.reindex(full_date_range)[rate_cols[0]].isna()
        filled_mask = asset_reindexed[rate_cols[0]].notna()

        # Assign the source based on the masks.
        asset_reindexed['source'] = 'OBSERVED'
        asset_reindexed.loc[original_nan_mask & filled_mask, 'source'] = 'INTERPOLATED_TIME'

        # Add the asset back as a column and append to our list.
        asset_reindexed['asset'] = asset
        all_filled_dfs.append(asset_reindexed.reset_index().rename(columns={'index': 'date'}))

    # --- Final Assembly and Cleanup ---
    # Concatenate the processed DataFrames for all assets.
    final_df_filled = pd.concat(all_filled_dfs, ignore_index=True)

    # Drop any rows that still have NaNs (from gaps > max_interpolation_gap).
    final_df_cleaned = final_df_filled.dropna(subset=rate_cols).reset_index(drop=True)

    # Set a clean final index.
    final_df_cleaned = final_df_cleaned.set_index(['date', 'asset']).sort_index()

    # --- Diagnostic Reporting ---
    final_rows = len(final_df_cleaned)
    rows_added_by_interp = final_df_cleaned['source'].eq('INTERPOLATED_TIME').sum()

    diagnostics = {
        "process_name": "Time Series Construction and Gap Handling",
        "max_interpolation_gap_days": max_interpolation_gap,
        "initial_observed_points": initial_rows,
        "final_total_points": final_rows,
        "points_added_by_interpolation": int(rows_added_by_interp),
        "points_dropped_large_gaps": (initial_rows + int(rows_added_by_interp)) - final_rows
    }

    return final_df_cleaned, diagnostics


In [None]:
# Task 19: Pipeline Orchestrator Function Design

# ======================================================================================
# Task 19: Fused Orchestrator Function
# ======================================================================================

def orchestrate_yield_curve_estimation(
    df_master: pd.DataFrame,
    fused_input_state: Dict[str, Any],
    verbose: bool = False
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Executes the complete end-to-end pipeline for yield curve estimation.

    This master orchestrator function manages the entire workflow, from initial
    data validation to the final construction of a continuous, daily time series
    of yield curves. It calls a sequence of modular, single-responsibility
    functions, handling the flow of data, extraction of parameters, and
    aggregation of diagnostic reports at each stage.

    The pipeline consists of the following major phases:
    1.  Input Validation: Rigorous checks on the input DataFrame and hyperparameters.
    2.  Data Filtering: Removal of data based on maturity and liquidity.
    3.  Feature Engineering: Computation of derived features and pairing of options.
    4.  RANSAC Outlier Detection: Identification and removal of outliers.
    5.  Closed-Form Estimation: Calculation of zero-coupon bond prices.
    6.  Rate Conversion: Transformation of bond prices to interest rates.
    7.  Aggregation & Interpolation: Creation of a continuous daily time series.

    Args:
        df_master (pd.DataFrame):
            The primary input DataFrame containing raw, high-frequency option
            market data with the specified MultiIndex structure.
        fused_input_state (Dict[str, Any]):
            The configuration dictionary containing all metadata and hyperparameters
            required for the pipeline execution.
        verbose (bool, optional):
            If True, the diagnostic report from each major step will be printed
            to the console. Defaults to False.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - pd.DataFrame: The final, analysis-ready DataFrame of daily,
              standardized yield curve time series.
            - Dict[str, Any]: A comprehensive, nested dictionary containing the
              diagnostic reports from every stage of the pipeline, providing a
              full audit trail of the process.

    Raises:
        ValueError/TypeError: If initial input validation fails.
        Exception: For any unexpected errors during the pipeline execution.
    """
    # --- Initialization ---
    # Initialize a master dictionary to store all diagnostic reports.
    master_diagnostics: Dict[str, Any] = {}

    def log_step(step_name: str, diagnostics: Dict[str, Any]):
        """Helper to store and optionally print diagnostics."""
        master_diagnostics[step_name] = diagnostics
        if verbose:
            print(f"\n--- {step_name} ---")
            pprint(diagnostics)

    try:
        # --- PHASE 1: INPUT VALIDATION ---
        # Step 1: Validate the structure and schema of the input DataFrame.
        validate_dataframe_structure(df_master=df_master)
        # Step 2: Validate the hyperparameter dictionary.
        validate_hyperparameter_dictionary(fused_input_state=fused_input_state)
        # Step 3: Validate the completeness and consistency of the data.
        validate_data_completeness(df_master=df_master)
        log_step("Input Validation", {"status": "All checks passed."})

        # Extract hyperparameters for easier access.
        hyperparams = fused_input_state['hyperparameters']

        # --- PHASE 2: DATA QUALITY FILTERING AND CLEANSING ---
        # Step 4: Filter out options with short maturities.
        df_filtered_maturity, diag_maturity = filter_by_maturity(
            df_master=df_master,
            maturity_filter_days=hyperparams['maturity_filter_days']
        )
        log_step(diag_maturity['filter_type'], diag_maturity)
        if df_filtered_maturity.empty:
            print("Warning: All data removed by maturity filter. Exiting.")
            return pd.DataFrame(), master_diagnostics

        # Step 5: Filter out illiquid options with wide bid-ask spreads.
        df_filtered_liquidity, diag_liquidity = filter_by_liquidity(
            df_master=df_filtered_maturity,
            liquidity_filter_spread_pct=hyperparams['liquidity_filter_spread_pct']
        )
        log_step(diag_liquidity['filter_type'], diag_liquidity)
        if df_filtered_liquidity.empty:
            print("Warning: All data removed by liquidity filter. Exiting.")
            return pd.DataFrame(), master_diagnostics

        # --- PHASE 3: RANSAC OUTLIER DETECTION IMPLEMENTATION ---
        # Step 6: Compute derived features and filter for complete call-put pairs.
        df_paired, diag_pairing = prepare_features_and_pairs(df_master=df_filtered_liquidity)
        log_step(diag_pairing['process_name'], diag_pairing)
        if df_paired.empty:
            print("Warning: No complete call-put pairs found. Exiting.")
            return pd.DataFrame(), master_diagnostics

        # Step 7: Prepare the numerical inputs (X, Y arrays) for the RANSAC algorithm.
        ransac_inputs, diag_ransac_prep = prepare_ransac_inputs(df_paired=df_paired)
        log_step(diag_ransac_prep['process_name'], diag_ransac_prep)

        # Step 8: Execute the RANSAC algorithm on all prepared groups.
        ransac_results, diag_ransac_exec = execute_ransac_on_all_groups(
            ransac_inputs=ransac_inputs, hyperparameters=hyperparams
        )
        log_step(diag_ransac_exec['process_name'], diag_ransac_exec)

        # Step 9: Validate the RANSAC results based on economic/statistical criteria.
        valid_ransac_results, diag_ransac_val = validate_and_filter_ransac_results(
            ransac_results=ransac_results, hyperparameters=hyperparams
        )
        log_step(diag_ransac_val['process_name'], diag_ransac_val)
        if not valid_ransac_results:
            print("Warning: No valid RANSAC results after validation. Exiting.")
            return pd.DataFrame(), master_diagnostics

        # --- PHASE 4 & 5: JOINT OPTIMIZATION AND RATE CONVERSION ---
        # Step 10: Construct the final feature vectors (y, m) using only inlier data.
        feature_vectors, diag_fv = construct_feature_vectors(
            df_analysis_ready=df_paired, valid_ransac_results=valid_ransac_results
        )
        log_step(diag_fv['process_name'], diag_fv)

        # Step 11: Compute the scalar summations required for the closed-form solution.
        summations, diag_sums = compute_summations_for_all_groups(
            feature_vectors=feature_vectors, hyperparameters=hyperparams
        )
        log_step(diag_sums['process_name'], diag_sums)

        # Step 12: Compute the closed-form solution for the zero-coupon bond prices.
        solutions, diag_solve = compute_closed_form_solutions(all_summations=summations)
        log_step(diag_solve['process_name'], diag_solve)

        # Step 13: Convert the bond prices to continuously compounded interest rates.
        rates, diag_rates = convert_all_solutions_to_rates(
            all_solutions=solutions, hyperparameters=hyperparams
        )
        log_step(diag_rates['process_name'], diag_rates)

        # Step 14: Apply final economic validation checks to the computed rates.
        valid_rates, diag_rate_val = validate_and_filter_rates(all_rates=rates)
        log_step(diag_rate_val['process_name'], diag_rate_val)
        if not valid_rates:
            print("Warning: No valid rates after economic validation. Exiting.")
            return pd.DataFrame(), master_diagnostics

        # --- PHASE 6: TEMPORAL AGGREGATION AND INTERPOLATION ---
        # Step 15: Aggregate the high-frequency results into a structured DataFrame.
        df_aggregated_results = aggregate_results_to_dataframe(all_rates=valid_rates)
        log_step("Results Aggregation", {"status": f"Aggregated {len(df_aggregated_results)} valid points into DataFrame."})

        # Step 16: Aggregate the hourly estimates to a daily frequency by maturity bucket.
        df_daily_agg, diag_daily_agg = aggregate_rates_daily(results_df=df_aggregated_results)
        log_step(diag_daily_agg['process_name'], diag_daily_agg)

        # Step 17: Interpolate the bucketed daily rates to standardized target tenors.
        df_interpolated, diag_interp = interpolate_yield_curves(
            daily_aggregated_df=df_daily_agg,
            target_tenors_days=hyperparams['target_tenors_days']
        )
        log_step(diag_interp['process_name'], diag_interp)

        # Step 18: Construct the final, continuous time series by handling gaps.
        df_final, diag_final_ts = construct_final_time_series(interpolated_df=df_interpolated)
        log_step(diag_final_ts['process_name'], diag_final_ts)

        # --- Final Return ---
        # Return the final time series and the master diagnostic report.
        return df_final, master_diagnostics

    except (ValueError, TypeError) as e:
        # Catch validation errors and other explicit exceptions.
        print(f"Pipeline execution failed. Error: {e}")
        # Return empty results but include diagnostics up to the point of failure.
        return pd.DataFrame(), master_diagnostics
    except Exception as e:
        # Catch any other unexpected errors during execution.
        print(f"An unexpected error occurred during pipeline execution: {e}")
        import traceback
        traceback.print_exc()
        return pd.DataFrame(), master_diagnostics


In [None]:
# Task 20: Parameter Sensitivity Analysis

# ======================================================================================
# Task 20, Step 1: Helper function for a single pipeline run
# ======================================================================================
# ======================================================================================
# NOTE: We assume the master orchestrator from Task 19,
# `orchestrate_yield_curve_estimation`, is available in the execution scope.
# ======================================================================================

def _run_single_pipeline_instance(
    params: Tuple,
    param_names: List[str],
    df_master: pd.DataFrame,
    base_config: Dict[str, Any]
) -> Tuple[Tuple, pd.DataFrame, Dict[str, Any]]:
    """
    Executes a single, complete pipeline run with a specific parameter set.

    This function serves as a self-contained wrapper around the main pipeline
    orchestrator (`orchestrate_yield_curve_estimation`). It is specifically
    designed to be the target function for a parallel processing pool (like
    `multiprocessing.Pool`). It takes a tuple of parameter values, updates a
    copy of the base configuration, runs the entire pipeline, and returns the
    results along with the parameters that generated them. This structure is
    essential for conducting the parameter sensitivity analysis in parallel.

    Args:
        params (Tuple):
            A tuple containing the specific hyperparameter values for this
            single run (e.g., (0.004, 0.10)). The order must correspond to
            `param_names`.
        param_names (List[str]):
            A list of the hyperparameter names being varied in this run
            (e.g., ['ransac_residual_sq_threshold', 'ransac_slope_tolerance_pct']).
        df_master (pd.DataFrame):
            The primary, raw input DataFrame containing the market data. This is
            passed to each worker process.
        base_config (Dict[str, Any]):
            The base configuration dictionary, which serves as a template for
            the run-specific configuration.

    Returns:
        Tuple[Tuple, pd.DataFrame, Dict[str, Any]]:
            A tuple containing:
            - Tuple: The input `params` tuple, used to identify which parameter
              combination produced these results.
            - pd.DataFrame: The final yield curve DataFrame produced by the
              pipeline for this parameter set.
            - Dict[str, Any]: The comprehensive diagnostic dictionary from the
              pipeline run.
    """
    # --- Configuration Setup ---
    # Create a deep copy of the base configuration. This is a critical step in
    # parallel processing to ensure that each worker operates on an independent
    # copy of the configuration object, preventing race conditions or unintended
    # modifications of the shared base configuration.
    config = copy.deepcopy(base_config)

    # --- Parameter Update ---
    # Iterate through the provided parameter names and their corresponding values.
    for name, value in zip(param_names, params):
        # Update the 'hyperparameters' section of the copied configuration
        # with the specific values for this experimental run.
        config['hyperparameters'][name] = value

    # --- Pipeline Execution ---
    # Execute the main end-to-end pipeline orchestrator using the newly
    # configured state. The `verbose` flag is explicitly set to False to
    # prevent interleaved printing from multiple parallel worker processes,
    # which would corrupt the console output.
    yield_curve_df, diagnostics = orchestrate_yield_curve_estimation(
        df_master=df_master,
        fused_input_state=config,
        verbose=False
    )

    # --- Result Packaging ---
    # Return the results as a tuple. Including the input `params` in the
    # output is essential for linking the results back to the specific
    # hyperparameter combination that generated them during the aggregation phase.
    return params, yield_curve_df, diagnostics


# ======================================================================================
# Task 20: Fused Orchestrator Function
# ======================================================================================

def run_parameter_sensitivity_analysis(
    df_master: pd.DataFrame,
    base_fused_input_state: Dict[str, Any],
    parameter_grid: Dict[str, List[Any]]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Performs a sensitivity analysis by running the pipeline across a grid of hyperparameters.

    This function systematically evaluates the robustness of the yield curve
    estimation pipeline by executing it for every combination of parameters
    defined in the input grid. It leverages multiprocessing to accelerate the
    computation. The results are then aggregated to compute key robustness
    metrics, such as the stability of the estimated rates.

    Args:
        df_master (pd.DataFrame):
            The primary input DataFrame containing raw market data.
        base_fused_input_state (Dict[str, Any]):
            The base configuration dictionary. This will be used as a template,
            with hyperparameter values being updated for each run.
        parameter_grid (Dict[str, List[Any]]):
            A dictionary where keys are hyperparameter names (e.g.,
            'ransac_residual_sq_threshold') and values are lists of the
            values to test for that parameter.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]:
            A tuple containing:
            - pd.DataFrame: A DataFrame summarizing the rate stability. It is
              indexed by date and asset, with columns for the mean and standard
              deviation of each tenor's rate across all parameter runs.
            - pd.DataFrame: A DataFrame summarizing the coverage stability,
              showing the mean and standard deviation of key diagnostic
              metrics (e.g., success rates) across all parameter runs.
    """
    # --- Step 1: Parameter Grid and Testing Framework ---
    # Extract the names and value lists from the parameter grid.
    param_names = list(parameter_grid.keys())
    param_values = list(parameter_grid.values())

    # Create the Cartesian product of all parameter values.
    param_combinations = list(itertools.product(*param_values))

    print(f"Starting sensitivity analysis for {len(param_combinations)} parameter combinations...")

    # Prepare arguments for the parallel execution.
    args_for_pool = [
        (params, param_names, df_master, base_fused_input_state)
        for params in param_combinations
    ]

    # Use a multiprocessing pool to run the pipeline in parallel.
    with Pool(processes=max(1, cpu_count() - 1)) as pool:
        all_results = pool.starmap(_run_single_pipeline_instance, args_for_pool)

    # --- Step 2: Robustness Metrics Computation and Analysis ---
    # --- Rate Stability Analysis ---
    all_rate_dfs = []
    for params, yield_curve_df, _ in all_results:
        if not yield_curve_df.empty:
            # Add parameter identifiers to each result DataFrame.
            for i, name in enumerate(param_names):
                yield_curve_df[name] = params[i]
            all_rate_dfs.append(yield_curve_df.reset_index())

    if not all_rate_dfs:
        print("Warning: No successful pipeline runs in sensitivity analysis.")
        return pd.DataFrame(), pd.DataFrame()

    # Concatenate all results into a single DataFrame.
    combined_rates_df = pd.concat(all_rate_dfs, ignore_index=True)

    # Identify the rate columns to analyze.
    rate_cols = [col for col in combined_rates_df.columns if 'rate' in col and 'status' not in col]

    # Group by the original index and compute mean and std dev for each rate.
    rate_stability_summary = combined_rates_df.groupby(['date', 'asset'])[rate_cols].agg(['mean', 'std'])

    # --- Coverage Stability Analysis ---
    all_diagnostics = [diag for _, _, diag in all_results]

    # Extract key success metrics from each diagnostic report.
    coverage_data = []
    for params, diag in zip(param_combinations, all_diagnostics):
        record = dict(zip(param_names, params))
        # Example: Extracting the RANSAC validation pass rate.
        pass_rate = diag.get('RANSAC Result Validation and Filtering', {}).get('validation_pass_rate_pct', np.nan)
        record['ransac_pass_rate_pct'] = pass_rate
        coverage_data.append(record)

    coverage_df = pd.DataFrame.from_records(coverage_data)

    # Summarize the stability of the coverage metrics.
    coverage_stability_summary = coverage_df.describe().transpose()

    # --- Step 3: Reporting and Recommendations (Return Summaries) ---
    print("Sensitivity analysis complete.")

    return rate_stability_summary, coverage_stability_summary


In [None]:
# Task 21: Bootstrap Confidence Interval Construction

# ======================================================================================
# NOTE: We assume the full suite of pipeline functions from Tasks 7-13 are
# available in the execution scope.
# ======================================================================================

def _run_condensed_pipeline_for_bootstrap(
    df_sample: pd.DataFrame,
    hyperparameters: Dict[str, Any],
    group_name: Tuple
) -> Optional[Dict[str, float]]:
    """
    Executes a condensed, single-group run of the core estimation pipeline.

    This helper function serves as the computational engine for a single bootstrap
    iteration. It takes a DataFrame representing one bootstrap sample for a
    specific estimation group and processes it through the essential pipeline
    stages: RANSAC outlier detection, feature vector construction, closed-form
    solution, and rate conversion. It is designed to fail gracefully, returning
    None if any stage of the pipeline does not yield a valid result for the given
    sample.

    Args:
        df_sample (pd.DataFrame):
            A DataFrame containing a bootstrap sample of paired option data for a
            single estimation group.
        hyperparameters (Dict[str, Any]):
            The configuration dictionary containing all necessary hyperparameters
            for the RANSAC and estimation stages.
        group_name (Tuple):
            The unique identifier `(timestamp, asset, expiry_date)` for the
            estimation group being processed.

    Returns:
        Optional[Dict[str, float]]:
            A dictionary containing the final estimated 'crypto_rate' and
            'ref_rate' if the pipeline run is successful. Returns None if any
            step fails (e.g., RANSAC finds no valid consensus, the linear
            system is singular, or results are economically invalid).
    """
    try:
        # --- Tasks 7-9: RANSAC Outlier Detection ---
        # Prepare the numerical inputs (X, Y arrays) for the RANSAC algorithm.
        ransac_inputs, _ = prepare_ransac_inputs(df_paired=df_sample, min_samples_per_group=5)
        # If the sample is too small or invalid, exit.
        if not ransac_inputs: return None

        # Execute the RANSAC algorithm on the prepared group.
        ransac_results, _ = execute_ransac_on_all_groups(ransac_inputs, hyperparameters)
        # If RANSAC fails to find a consensus model, exit.
        if not ransac_results: return None

        # Validate the RANSAC results based on economic and statistical criteria.
        valid_ransac, _ = validate_and_filter_ransac_results(ransac_results, hyperparameters)
        # If the found model is not valid (e.g., wrong slope), exit.
        if not valid_ransac: return None

        # --- Tasks 10-13: Estimation and Rate Conversion ---
        # Construct the final feature vectors (y, m) using only the inlier data.
        feature_vectors, _ = construct_feature_vectors(df_sample, valid_ransac)
        # Compute the scalar summations required for the closed-form solution.
        summations, _ = compute_summations_for_all_groups(feature_vectors, hyperparameters)
        # Compute the closed-form solution for the zero-coupon bond prices.
        solutions, _ = compute_closed_form_solutions(summations)
        # If the system is singular or the solution is economically invalid, exit.
        if not solutions: return None

        # Convert the bond prices to continuously compounded interest rates.
        rates, _ = convert_all_solutions_to_rates(solutions, hyperparameters)
        # If the conversion fails (e.g., numerical error), exit.
        if not rates: return None

        # Since we are processing a single group, the result dictionary will have one key.
        # Return the final rate estimates for this successful bootstrap iteration.
        return rates[group_name]

    except Exception:
        # This broad exception catch ensures that any unexpected error within the
        # complex pipeline (e.g., a rare numerical issue) results in a graceful
        # failure for this single bootstrap iteration, rather than crashing the
        # entire analysis.
        return None

# ======================================================================================
# Task 21, Step 1: Helper for a single bootstrap iteration
# ======================================================================================

def _run_single_bootstrap_iteration(
    iteration_seed: int,
    df_group_pivoted: pd.DataFrame,
    hyperparameters: Dict[str, Any],
    group_name: Tuple
) -> Optional[Dict[str, float]]:
    """
    Executes a single bootstrap iteration: resample, reformat, and re-estimate.

    This function encapsulates the logic for one bootstrap trial. It takes the
    original data for a group (in a pivoted format for efficient resampling),
    creates a new sample by drawing with replacement, reconstructs the sample
    into the long format expected by the pipeline, and then calls the condensed
    pipeline function to get a new rate estimate.

    Args:
        iteration_seed (int):
            The seed for the random number generator for this specific iteration,
            ensuring that each parallel worker produces a different and
            reproducible sample.
        df_group_pivoted (pd.DataFrame):
            The original, clean data for the estimation group, pivoted so that
            each row represents a unique strike price with columns for call and
            put data.
        hyperparameters (Dict[str, Any]):
            The configuration dictionary for the pipeline.
        group_name (Tuple):
            The unique identifier `(timestamp, asset, expiry_date)` for the
            estimation group.

    Returns:
        Optional[Dict[str, float]]:
            The estimated rates for the bootstrap sample, or None if the
            estimation fails for this sample.
    """
    # --- Resampling ---
    # Initialize a dedicated random number generator for this iteration to ensure
    # that parallel processes are statistically independent.
    rng = np.random.default_rng(iteration_seed)

    # Determine the number of unique strike pairs in the original sample.
    n_pairs = len(df_group_pivoted)
    # Generate a new set of indices by sampling with replacement from the original indices.
    resampled_indices = rng.choice(n_pairs, size=n_pairs, replace=True)

    # Create the bootstrap sample by selecting rows from the pivoted DataFrame
    # using the newly generated indices.
    df_sample_pivoted = df_group_pivoted.iloc[resampled_indices]

    # --- Reformatting ---
    # Un-pivot (or "melt") the data to transform it from the wide format back to
    # the long format required by the start of the estimation pipeline.
    df_sample_long = df_sample_pivoted.stack(level='option_type').reset_index()
    # Restore the original MultiIndex structure.
    df_sample_long = df_sample_long.set_index(
        ['timestamp', 'asset', 'expiry_date', 'strike_price', 'option_type']
    )

    # --- Data Enrichment ---
    # The pivoted format loses some of the original columns that are constant
    # across the group (e.g., spot_price). This step adds them back to the
    # resampled DataFrame to ensure it has the correct schema for the pipeline.
    # We can safely take these constant values from any row of the original data.
    original_row = df_group_pivoted.iloc[0]

    # Define the list of columns that were constant and need to be restored.
    # Note: 'mid_price' is not included as it's specific to each option.
    # It will be recalculated inside the condensed pipeline.
    cols_to_restore = ['spot_price', 'futures_price', 'instrument_name', 'moneyness']

    # Iterate and restore each required column.
    for col in cols_to_restore:
        # Check if the column exists in the hierarchical index of the pivoted frame.
        if col in original_row.index.get_level_values(0):
             # Assign the constant value to the new column. The value is the same
             # for the call and put, so we can arbitrarily take it from the 'call' sub-column.
             df_sample_long[col] = original_row[(col, 'call')]

    # --- Re-estimation ---
    # Run the condensed estimation pipeline on the fully prepared bootstrap sample.
    return _run_condensed_pipeline_for_bootstrap(df_sample_long, hyperparameters, group_name)


# ======================================================================================
# Task 21: Fused Orchestrator Function
# ======================================================================================

def construct_bootstrap_confidence_intervals(
    df_paired: pd.DataFrame,
    fused_input_state: Dict[str, Any],
    group_to_analyze: Tuple,
    n_bootstrap: int = 1000
) -> Dict[str, Any]:
    """
    Constructs bootstrap confidence intervals for a single estimation group.

    This function performs a non-parametric bootstrap procedure to quantify the
    uncertainty in the estimated interest rates for a specific timestamp, asset,
    and expiry date. It repeatedly resamples the underlying option pairs, re-runs
    the core estimation pipeline, and computes confidence intervals from the
    resulting distribution of estimates.

    Args:
        df_paired (pd.DataFrame):
            The analysis-ready DataFrame of clean, paired options from Task 6.
        fused_input_state (Dict[str, Any]):
            The full configuration dictionary for the pipeline.
        group_to_analyze (Tuple):
            The specific group identifier `(timestamp, asset, expiry_date)` for
            which to construct confidence intervals.
        n_bootstrap (int, optional):
            The number of bootstrap iterations to perform. Defaults to 1000.

    Returns:
        Dict[str, Any]:
            A dictionary containing the original point estimate, the computed
            confidence intervals, and diagnostic information about the
            bootstrap procedure.
    """
    # --- Step 1: Bootstrap Resampling Framework ---
    # Extract the data for the specific group to be analyzed.
    try:
        df_group = df_paired.loc[group_to_analyze]
    except KeyError:
        raise ValueError(f"Group {group_to_analyze} not found in the input DataFrame.")

    # Get the original point estimate for comparison.
    point_estimate_result = _run_condensed_pipeline_for_bootstrap(
        df_group, fused_input_state['hyperparameters'], group_to_analyze
    )
    if point_estimate_result is None:
        return {"status": "FAILED", "reason": "Original point estimate could not be computed."}

    # Pivot the group data once for efficient resampling.
    df_group_pivoted = df_group.reset_index().pivot_table(
        index=['timestamp', 'asset', 'expiry_date', 'strike_price'],
        columns='option_type'
    )

    # Prepare arguments for the parallel bootstrap execution.
    # Generate independent seeds for each worker process.
    master_seed = 42
    rng = np.random.default_rng(master_seed)
    iteration_seeds = rng.integers(low=0, high=2**32 - 1, size=n_bootstrap)

    args_for_pool = [
        (seed, df_group_pivoted, fused_input_state['hyperparameters'], group_to_analyze)
        for seed in iteration_seeds
    ]

    # Run bootstrap iterations in parallel.
    with Pool(processes=max(1, cpu_count() - 1)) as pool:
        bootstrap_results = pool.starmap(_run_single_bootstrap_iteration, args_for_pool)

    # --- Step 2: Bootstrap Distribution Analysis and CI Computation ---
    # Filter out failed iterations and collect the successful rate estimates.
    successful_rates = [res for res in bootstrap_results if res is not None]

    if not successful_rates:
        return {
            "status": "FAILED",
            "reason": "No bootstrap iterations were successful.",
            "point_estimate": point_estimate_result,
            "bootstrap_runs": n_bootstrap,
            "successful_runs": 0
        }

    crypto_rate_dist = [res['crypto_rate'] for res in successful_rates]
    ref_rate_dist = [res['ref_rate'] for res in successful_rates]

    # Compute percentile-based confidence intervals.
    ci_95_crypto = np.percentile(crypto_rate_dist, [2.5, 97.5])
    ci_68_crypto = np.percentile(crypto_rate_dist, [16, 84])
    ci_95_ref = np.percentile(ref_rate_dist, [2.5, 97.5])
    ci_68_ref = np.percentile(ref_rate_dist, [16, 84])

    # --- Step 3: Bootstrap Validation and Quality Assessment ---
    final_report = {
        "status": "SUCCESS",
        "group": group_to_analyze,
        "point_estimate": {
            "crypto_rate": point_estimate_result['crypto_rate'],
            "ref_rate": point_estimate_result['ref_rate']
        },
        "confidence_intervals": {
            "crypto_rate_95_ci": list(ci_95_crypto),
            "crypto_rate_68_ci": list(ci_68_crypto),
            "ref_rate_95_ci": list(ci_95_ref),
            "ref_rate_68_ci": list(ci_68_ref)
        },
        "diagnostics": {
            "bootstrap_runs": n_bootstrap,
            "successful_runs": len(successful_rates),
            "success_rate_pct": 100 * len(successful_rates) / n_bootstrap
        }
    }

    return final_report


In [None]:
# Task 22: Economic Validation and Benchmarking

# ======================================================================================
# Task 22, Step 1: Theoretical Consistency Helper
# ======================================================================================

def _validate_theoretical_consistency(
    yield_curves_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Analyzes term structure shapes and cross-asset consistency of rates.

    This function performs two key theoretical checks on the final yield curve data:
    1.  Term Structure Analysis: It calculates the slope of the yield curve
        (e.g., 360-day rate vs. 90-day rate) for each day and asset, then
        computes the historical distribution of curve shapes (Normal, Inverted, Flat).
    2.  Cross-Asset Consistency: It compares the reference currency rates (ref_rate)
        derived independently from different crypto assets (e.g., BTC and ETH).
        Theoretically, these should be similar. The function calculates and
        summarizes the spread between these rates.

    Args:
        yield_curves_df (pd.DataFrame):
            The final time-series DataFrame, indexed by (date, asset), with
            columns for rates at different tenors.

    Returns:
        Dict[str, Any]:
            A dictionary containing the analysis results, including the
            distribution of curve shapes and descriptive statistics of the
            cross-asset rate spread.
    """
    # Initialize the report dictionary to store results.
    report: Dict[str, Any] = {}

    # --- Term Structure Shape Analysis ---
    # Identify the column names for the short (90d) and long (360d) ends of the curve.
    # This approach is robust to the exact naming convention (e.g., 'crypto_rate_90d').
    rate_cols_90d = [c for c in yield_curves_df.columns if '90d' in str(c) and 'rate' in str(c)]
    rate_cols_360d = [c for c in yield_curves_df.columns if '360d' in str(c) and 'rate' in str(c)]

    # Proceed only if both short and long tenor columns are found.
    if rate_cols_90d and rate_cols_360d:
        # Create a copy to avoid modifying the original DataFrame.
        df = yield_curves_df.copy()
        # Calculate the slope of the crypto and reference yield curves.
        df['crypto_slope'] = df[rate_cols_360d[0]] - df[rate_cols_90d[0]]
        df['ref_slope'] = df[rate_cols_360d[1]] - df[rate_cols_90d[1]]

        # Define conditions for classifying the curve shape based on its slope.
        # A small tolerance (0.001 or 10bps) is used to classify a curve as 'Flat'.
        shape_conditions = [
            (df['crypto_slope'] > 0.001),   # Steeply upward sloping
            (df['crypto_slope'] < -0.001),  # Steeply inverted
        ]
        # Define the corresponding labels for each condition.
        shape_choices = ['Normal/Upward', 'Inverted']
        # Use numpy.select for efficient conditional assignment.
        df['crypto_curve_shape'] = np.select(shape_conditions, shape_choices, default='Flat')

        # Calculate the normalized frequency of each curve shape and store it.
        report['term_structure_shape_dist'] = df['crypto_curve_shape'].value_counts(normalize=True).to_dict()

    # --- Cross-Asset Consistency Analysis ---
    # Check if data for both 'BTC' and 'ETH' exists to perform a comparison.
    available_assets = set(yield_curves_df.index.get_level_values('asset'))
    if {'BTC', 'ETH'}.issubset(available_assets):
        # Unstack the DataFrame to place BTC and ETH data in adjacent columns.
        df_unstacked = yield_curves_df.unstack(level='asset')
        # Find all reference rate columns.
        ref_rate_cols = [c for c in df_unstacked.columns if 'ref_rate' in c[0]]

        # Proceed if reference rate columns for at least two assets are found.
        if len(ref_rate_cols) > 1:
            # Assume the first two ref_rate columns correspond to BTC and ETH.
            # This relies on pandas' default sorting of the 'asset' level.
            btc_ref_col = ref_rate_cols[0]
            eth_ref_col = ref_rate_cols[1]

            # Calculate the spread between the two independently derived reference rates.
            spread = df_unstacked[btc_ref_col] - df_unstacked[eth_ref_col]
            # Compute descriptive statistics for this spread and add to the report.
            report['cross_asset_ref_rate_spread_stats'] = spread.describe().to_dict()

    # Return the completed analysis report.
    return report

# ======================================================================================
# Task 22, Step 2: External Benchmark Comparison Helper
# ======================================================================================

def _compare_with_external_benchmark(
    yield_curves_df: pd.DataFrame,
    benchmark_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Compares estimated rates with an external benchmark time series.

    This function aligns the estimated yield curve data with a provided
    benchmark time series (e.g., Aave lending rates) on their date index.
    It then computes the correlation matrix between the estimated rates and
    the benchmark rates to quantify their co-movement.

    Args:
        yield_curves_df (pd.DataFrame):
            The final time-series DataFrame of estimated rates.
        benchmark_df (pd.DataFrame):
            A DataFrame containing the external benchmark series, indexed by date.

    Returns:
        Dict[str, Any]:
            A dictionary containing the correlation matrix, or a status message
            if there is no overlapping data between the inputs.
    """
    # Align the two DataFrames using an inner join on their date indices.
    # This ensures that only dates present in both datasets are included.
    df_merged = yield_curves_df.join(benchmark_df, how='inner')

    # If the merged DataFrame is empty, there's no overlapping data to analyze.
    if df_merged.empty:
        return {"status": "No overlapping data with benchmark."}

    # Identify all columns that represent a rate for the correlation analysis.
    # This includes estimated rates and benchmark rates.
    rate_cols = [c for c in df_merged.columns if 'rate' in str(c)]
    # Calculate the pairwise correlation matrix for all rate columns.
    correlation_matrix = df_merged[rate_cols].corr()

    # Return the correlation matrix as a dictionary for easy serialization.
    return {"correlation_matrix": correlation_matrix.to_dict()}

# ======================================================================================
# Task 22, Step 3: Statistical Significance Test Helper
# ======================================================================================

def _perform_statistical_tests(
    yield_curves_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Performs stationarity (ADF) and cointegration tests on the rate series.

    This function applies formal econometric tests to the time series of rates:
    1.  Unit Root Test: Uses the Augmented Dickey-Fuller (ADF) test to check
        if each rate series is stationary.
    2.  Cointegration Test: Uses the Engle-Granger test to check for a long-run
        equilibrium relationship between the reference rates derived from
        BTC and ETH.

    Args:
        yield_curves_df (pd.DataFrame):
            The final time-series DataFrame of estimated rates.

    Returns:
        Dict[str, Any]:
            A nested dictionary containing the results and interpretations of
            the statistical tests performed.
    """
    # Initialize the report dictionary with sections for each test type.
    report: Dict[str, Dict] = {"unit_root_tests": {}, "cointegration_tests": {}}

    # --- Unit Root (ADF) Tests ---
    # Identify all columns containing estimated rates.
    rate_cols = [c for c in yield_curves_df.columns if 'rate' in str(c) and 'status' not in str(c)]
    # Iterate through each rate column to test it for stationarity.
    for col in rate_cols:
        # Drop any missing values from the series.
        series = yield_curves_df[col].dropna()
        # Ensure there are enough data points for a meaningful statistical test.
        if len(series) > 20:
            # Perform the Augmented Dickey-Fuller test.
            adf_result = adfuller(series)
            # Extract the p-value from the test results.
            p_value = adf_result[1]
            # Interpret the result based on a 5% significance level.
            interpretation = "Stationary (Reject H0)" if p_value < 0.05 else "Non-Stationary (Fail to Reject H0)"
            # Store the p-value and interpretation in the report.
            report["unit_root_tests"][str(col)] = {"p_value": p_value, "interpretation": interpretation}

    # --- Cointegration Tests ---
    # Check if data for both 'BTC' and 'ETH' is available for comparison.
    available_assets = set(yield_curves_df.index.get_level_values('asset'))
    if {'BTC', 'ETH'}.issubset(available_assets):
        # Unstack the DataFrame to align the BTC and ETH series.
        df_unstacked = yield_curves_df.unstack(level='asset')
        # Dynamically find the column names for the BTC and ETH reference rates.
        btc_ref_cols = [c for c in df_unstacked.columns if 'ref_rate' in c[0] and 'BTC' in c[1]]
        eth_ref_cols = [c for c in df_unstacked.columns if 'ref_rate' in c[0] and 'ETH' in c[1]]

        # Proceed if both series were found.
        if btc_ref_cols and eth_ref_cols:
            btc_ref_col, eth_ref_col = btc_ref_cols[0], eth_ref_cols[0]
            # Create a DataFrame with just the two series and drop any rows with missing data.
            aligned_series = df_unstacked[[btc_ref_col, eth_ref_col]].dropna()
            # Ensure there are enough data points for a meaningful test.
            if len(aligned_series) > 20:
                # Perform the Engle-Granger cointegration test.
                coint_result = coint(aligned_series[btc_ref_col], aligned_series[eth_ref_col])
                # Extract the p-value.
                p_value = coint_result[1]
                # Interpret the result based on a 5% significance level.
                interpretation = "Cointegrated (Reject H0)" if p_value < 0.05 else "Not Cointegrated (Fail to Reject H0)"
                # Store the result in the report.
                report["cointegration_tests"]["BTC_vs_ETH_ref_rate"] = {"p_value": p_value, "interpretation": interpretation}

    # Return the completed report.
    return report


In [None]:
# Top-Level Orchestrator

def run_full_analysis(
    df_master: pd.DataFrame,
    fused_input_state: Dict[str, Any],
    parameter_grid: Optional[Dict[str, List[Any]]] = None,
    bootstrap_group: Optional[Tuple] = None,
    n_bootstrap: int = 1000,
    benchmark_df: Optional[pd.DataFrame] = None,
    verbose: bool = False
) -> Dict[str, Any]:
    """
    Executes the entire yield curve analysis, from estimation to robustness checks.

    This top-level orchestrator serves as the single entry point for a complete
    research run. It coordinates the execution of the core estimation pipeline
    and all subsequent, optional robustness and validation analyses.

    The workflow is as follows:
    1.  Runs the core estimation pipeline (Task 19) to generate the primary
        yield curve time series.
    2.  If a `parameter_grid` is provided, it runs the sensitivity analysis (Task 20).
    3.  If a `bootstrap_group` is specified, it runs the bootstrap confidence
        interval analysis (Task 21) for that specific group.
    4.  If the core pipeline succeeds, it runs the economic and statistical
        validation analysis (Task 22).

    Args:
        df_master (pd.DataFrame):
            The primary input DataFrame containing raw market data.
        fused_input_state (Dict[str, Any]):
            The base configuration dictionary for the analysis.
        parameter_grid (Optional[Dict[str, List[Any]]], optional):
            The grid of parameters for the sensitivity analysis (Task 20).
            If None, this analysis is skipped. Defaults to None.
        bootstrap_group (Optional[Tuple], optional):
            The specific group `(timestamp, asset, expiry_date)` for which to
            construct bootstrap confidence intervals (Task 21). If None, this
            analysis is skipped. Defaults to None.
        n_bootstrap (int, optional):
            The number of bootstrap iterations to perform if `bootstrap_group`
            is specified. Defaults to 1000.
        benchmark_df (Optional[pd.DataFrame], optional):
            An external benchmark DataFrame for economic validation (Task 22).
            If None, the benchmark comparison is skipped. Defaults to None.
        verbose (bool, optional):
            If True, prints the diagnostic report from the core pipeline.
            Defaults to False.

    Returns:
        Dict[str, Any]:
            A comprehensive, nested dictionary containing all results and
            diagnostics from the entire analysis run.
    """
    # --- Initialization ---
    # Initialize the master dictionary to store all results and diagnostics.
    full_analysis_report: Dict[str, Any] = {}

    # --- 1. Core Estimation Pipeline Execution (Task 19) ---
    # This is the foundational step. All subsequent analyses depend on its output.
    print("--- Starting Core Estimation Pipeline ---")
    yield_curves_df, core_diagnostics = orchestrate_yield_curve_estimation(
        df_master=df_master,
        fused_input_state=fused_input_state,
        verbose=verbose
    )
    # Store the primary results and diagnostics.
    full_analysis_report['core_pipeline_results'] = {
        "yield_curves_df": yield_curves_df,
        "diagnostics": core_diagnostics
    }
    print("--- Core Estimation Pipeline Complete ---")

    # --- 2. Parameter Sensitivity Analysis (Task 20) ---
    # This analysis is optional and runs only if a parameter grid is provided.
    if parameter_grid:
        print("\n--- Starting Parameter Sensitivity Analysis ---")
        try:
            # Run the sensitivity analysis orchestrator.
            rate_stability, coverage_stability = run_parameter_sensitivity_analysis(
                df_master=df_master,
                base_fused_input_state=fused_input_state,
                parameter_grid=parameter_grid
            )
            # Store the results.
            full_analysis_report['sensitivity_analysis'] = {
                "rate_stability_summary": rate_stability,
                "coverage_stability_summary": coverage_stability
            }
            print("--- Parameter Sensitivity Analysis Complete ---")
        except Exception as e:
            # Ensure failure in an optional analysis does not stop the entire process.
            print(f"Parameter Sensitivity Analysis failed with error: {e}")
            full_analysis_report['sensitivity_analysis'] = {"status": "FAILED", "error": str(e)}

    # --- 3. Bootstrap Confidence Interval Analysis (Task 21) ---
    # This analysis is optional and runs only if a specific group is targeted.
    if bootstrap_group:
        print(f"\n--- Starting Bootstrap Analysis for group {bootstrap_group} ---")
        try:
            # The bootstrap analysis requires the `df_paired` DataFrame, which is an
            # intermediate result of the core pipeline. We regenerate it here to
            # keep the main orchestrator's interface clean.
            hyperparams = fused_input_state['hyperparameters']
            df_temp, _ = filter_by_maturity(df_master, hyperparams['maturity_filter_days'])
            df_temp, _ = filter_by_liquidity(df_temp, hyperparams['liquidity_filter_spread_pct'])
            df_paired_for_bootstrap, _ = prepare_features_and_pairs(df_temp)

            # Run the bootstrap orchestrator.
            bootstrap_report = construct_bootstrap_confidence_intervals(
                df_paired=df_paired_for_bootstrap,
                fused_input_state=fused_input_state,
                group_to_analyze=bootstrap_group,
                n_bootstrap=n_bootstrap
            )
            # Store the results.
            full_analysis_report['bootstrap_analysis'] = bootstrap_report
            print("--- Bootstrap Analysis Complete ---")
        except Exception as e:
            print(f"Bootstrap Analysis failed with error: {e}")
            full_analysis_report['bootstrap_analysis'] = {"status": "FAILED", "error": str(e)}

    # --- 4. Economic Validation and Benchmarking (Task 22) ---
    # This analysis runs if the core pipeline produced a valid, non-empty result.
    if not yield_curves_df.empty:
        print("\n--- Starting Economic Validation and Benchmarking ---")
        try:
            # Run the economic validation orchestrator.
            economic_report = run_economic_validation(
                yield_curves_df=yield_curves_df,
                benchmark_df=benchmark_df
            )
            # Store the results.
            full_analysis_report['economic_validation'] = economic_report
            print("--- Economic Validation and Benchmarking Complete ---")
        except Exception as e:
            print(f"Economic Validation failed with error: {e}")
            full_analysis_report['economic_validation'] = {"status": "FAILED", "error": str(e)}
    else:
        # If the core pipeline failed, skip this step.
        print("\n--- Skipping Economic Validation (No valid yield curves produced) ---")
        full_analysis_report['economic_validation'] = {"status": "SKIPPED"}

    # Return the final, comprehensive report.
    return full_analysis_report
