# **`README.md`**

# Constrained Portfolio Optimization via Quantum Approximate Optimization Algorithm (QAOA) with XY-Mixers and Trotterized Initialization

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2602.14827-b31b1b.svg)](https://arxiv.org/abs/2602.14827)
[![Journal](https://img.shields.io/badge/Journal-ArXiv%20Preprint-003366)](https://arxiv.org/abs/2602.14827)
[![Year](https://img.shields.io/badge/Year-2026-purple)](https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm)
[![Discipline](https://img.shields.io/badge/Discipline-Financial%20Engineering%20%7C%20Quantum%20Computing-00529B)](https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm)
[![Data Sources](https://img.shields.io/badge/Data-Yahoo%20Finance%20%7C%20Bloomberg-lightgrey)](https://finance.yahoo.com/)
[![Core Method](https://img.shields.io/badge/Method-QAOA%20%7C%20Combinatorial%20Optimization-orange)](https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm)
[![Analysis](https://img.shields.io/badge/Analysis-Walk--Forward%20Backtesting-red)](https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm)
[![Validation](https://img.shields.io/badge/Validation-Out--of--Sample%20Performance-green)](https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm)
[![Robustness](https://img.shields.io/badge/Robustness-Trotterized%20Initialization-yellow)](https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%230C55A5.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![PennyLane](https://img.shields.io/badge/PennyLane-%23000000.svg?style=flat&logo=Xanadu&logoColor=white)](https://pennylane.ai/)
[![D-Wave](https://img.shields.io/badge/D--Wave%20Ocean-%2300AEEF.svg?style=flat&logo=D-Wave&logoColor=white)](https://docs.ocean.dwavesys.com/)
[![YAML](https://img.shields.io/badge/YAML-%23CB171E.svg?style=flat&logo=yaml&logoColor=white)](https://yaml.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![Open Source](https://img.shields.io/badge/Open%20Source-%E2%9D%A4-brightgreen)](https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm)

**Repository:** `https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm`

**Owner:** 2026 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2026 paper entitled **"Constrained Portfolio Optimization via Quantum Approximate Optimization Algorithm (QAOA) with XY-Mixers and Trotterized Initialization: A Hybrid Approach for Direct Indexing"** by:

*   **Javier Mancilla** (SquareOne Capital)
*   **Theodoros D. Bouloumis** (Aristotle University of Thessaloniki)
*   **Frederic Goguikian** (SquareOne Capital)

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire hybrid quantum-classical workflow: from the ingestion and rigorous validation of financial market data to the formulation and simulation of constraint-preserving quantum circuits, culminating in comprehensive out-of-sample evaluation against classical heuristics.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `execute_quantum_hybrid_portfolio_optimization`](#key-callable-execute_quantum_hybrid_portfolio_optimization)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Mancilla et al. (2026). The core of this repository is the iPython Notebook `constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm_draft.ipynb`, which contains a comprehensive suite of 27+ functions to replicate the paper's findings.

The pipeline addresses the critical challenge of **Cardinality Constrained Portfolio Optimization** in the context of "Direct Indexing." Selecting exactly $K$ assets from a universe of $N$ transforms standard convex Markowitz optimization into an NP-hard combinatorial problem.

The paper proposes a **Hard-Constraint QAOA** formulation. Unlike standard QAOA implementations that rely on soft penalty terms (which distort the energy landscape), this approach enforces constraints strictly via the quantum ansatz itself using Dicke states and XY-mixers. This codebase operationalizes the proposed solution:
-   **Validates** data integrity using strict schema checks and temporal causality enforcement.
-   **Engineers** the quantum state using Dicke state initialization to confine evolution to the feasible subspace.
-   **Simulates** the quantum circuit using PennyLane, employing a Trotterized parameter initialization to mitigate Barren Plateaus.
-   **Benchmarks** the quantum solver against Simulated Annealing (SA) and Hierarchical Risk Parity (HRP).
-   **Evaluates** performance via rigorous out-of-sample walk-forward backtesting, computing Sharpe Ratios, Drawdowns, and Turnover net of transaction costs.

## Theoretical Background

The implemented methods combine techniques from Financial Econometrics, Quantum Information Science, and Convex Optimization.

**1. The Combinatorial Objective:**
The objective is to select exactly $K$ assets to minimize the risk-return trade-off:
$$ \min_{x \in \{0,1\}^N} \left( q x^\top \Sigma x - (1-q) \mu^\top x \right) \quad \text{s.t.} \quad \sum_{i=1}^N x_i = K $$

**2. Constraint-Preserving Quantum Ansatz:**
*   **Dicke State Initialization:** The system begins in an equal superposition of all valid portfolios:
    $$ |\psi_0\rangle = |D^K_N\rangle = \binom{N}{K}^{-1/2} \sum_{|x|=K} |x\rangle $$
*   **XY-Mixer Hamiltonian:** The evolution operator performs partial SWAPs, commuting with the number operator to preserve the Hamming weight:
    $$ H_{XY} = \sum_{(i,j) \in E} (X_i X_j + Y_i Y_j) $$

**3. Trotterized Initialization:**
To avoid vanishing gradients in deep circuits, parameters are initialized via an adiabatic linear ramp:
$$ \gamma_l = \frac{l}{p}\Delta t, \quad \beta_l = \left(1 - \frac{l}{p}\right)\Delta t $$

## Features

The provided iPython Notebook implements the full research pipeline, including:

-   **Modular, 27-Task Architecture:** The pipeline is decomposed into highly specialized, mathematically rigorous functions.
-   **Configuration-Driven Design:** All study parameters (risk aversion, circuit depths, transaction costs) are managed in an external `config.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks schema integrity, timezone consistency, and strict causality (zero look-ahead bias).
-   **Advanced Quantum Simulation:** Uses `PennyLane` for statevector simulation, featuring a highly optimized single-pass `value_and_grad` Adam training loop.
-   **Classical Baselines:** Integrates `dimod` and `neal` for Simulated Annealing via QUBO, and `PyPortfolioOpt` for Hierarchical Risk Parity.
-   **Institutional Fidelity Assertions:** The pipeline concludes with mathematical proofs asserting that all generated portfolios strictly obeyed the $K$-hot and sum-to-one constraints.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Engineering (Tasks 1-4):** Validates the configuration, cleanses the raw price matrix (ffill/dropna), and freezes a deterministic, causal rebalance calendar.
2.  **Temporal & Return Infrastructure (Tasks 5-6):** Extracts strict $[t-L, t)$ causal windows and computes daily logarithmic returns.
3.  **Econometric Estimation (Tasks 7-9):** Computes annualized expected returns ($\mu$) and Ledoit-Wolf shrinkage covariance matrices ($\Sigma$), and manages the temporal continuity state.
4.  **Simulated Annealing Baseline (Tasks 10-12):** Constructs the penalized QUBO matrix, executes the `neal` sampler, and filters for feasible candidates.
5.  **QAOA Ansatz Construction (Tasks 13-15):** Prepares the Dicke statevector, builds the complete-graph XY-mixer, and maps the financial moments to the Ising Cost Hamiltonian.
6.  **QAOA Training & Selection (Tasks 16-18):** Executes the Trotterized Adam optimization loop across depths $p \in \{1 \dots 6\}$, extracts exact statevector probabilities, filters via bitwise operations, and selects the global optimum.
7.  **Continuous Allocation (Tasks 19-21):** Solves the Sharpe-max problem via SLSQP on the selected subsets, with a robust fallback to HRP.
8.  **Performance Accounting (Tasks 22-24):** Computes holding-period returns, $L_1$ turnover, applies 5 bps transaction costs, and aggregates final financial metrics and depth-scaling diagnostics.
9.  **Orchestration & Verification (Tasks 25-27):** Serializes all artifacts to disk and executes the final mathematical fidelity assertions.

## Core Components (Notebook Structure)

The project is contained within a single, comprehensive Jupyter Notebook: `constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm_draft.ipynb`. The notebook is structured as a logical pipeline with modular orchestrator functions for each of the 27 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `execute_quantum_hybrid_portfolio_optimization`

The project is designed around a single, top-level user-facing interface function:

-   **`execute_quantum_hybrid_portfolio_optimization`:** This master orchestrator function runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, managing data flow between validation, econometric estimation, quantum simulation, classical allocation, and final fidelity verification.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `pennylane`, `dimod`, `dwave-neal`, `PyPortfolioOpt`, `pyyaml`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm.git
    cd constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy pennylane dimod dwave-neal PyPortfolioOpt pyyaml
    ```

## Input Data Structure

The pipeline requires a single primary DataFrame (`raw_price_df`):

-   **Index:** `DatetimeIndex` (monotonically increasing trading days).
-   **Columns:** Exactly 10 string identifiers matching the configured universe (e.g., `AAPL`, `MSFT`, `GOOGL`, `AMZN`, `JPM`, `V`, `TSLA`, `UNH`, `LLY`, `XOM`).
-   **Values:** `float64` representing auto-adjusted closing prices (strictly positive).

*Note: The usage example below includes a synthetic data generator using Geometric Brownian Motion for testing purposes if access to live Yahoo Finance data is unavailable.*

## Usage

The following snippet demonstrates how to generate synthetic data, load the configuration, and execute the top-level orchestrator.

```python
import pandas as pd
import numpy as np
import yaml
import os

# Assuming all pipeline callables are loaded in the current namespace

# 1. Generate Synthetic Price Data (Geometric Brownian Motion)
np.random.seed(42)
dates = pd.date_range(start="2024-01-01", end="2025-12-31", freq='B')
universe = ["AAPL", "MSFT", "GOOGL", "AMZN", "JPM", "V", "TSLA", "UNH", "LLY", "XOM"]
prices = np.zeros((len(dates), len(universe)))
prices[0] = np.random.uniform(100, 200, size=len(universe))
dt = 1.0 / 252.0
for t in range(1, len(dates)):
    Z = np.random.standard_normal(len(universe))
    prices[t] = prices[t-1] * np.exp((0.10 - 0.5 * 0.20**2) * dt + 0.20 * np.sqrt(dt) * Z)

raw_price_df = pd.DataFrame(prices, index=dates, columns=universe).astype(np.float64)

# 2. Load Configuration
with open("config.yaml", "r") as file:
    config = yaml.safe_load(file)

# 3. Execute the Pipeline
output_directory = "./quantum_portfolio_artifacts"
study_artifacts = execute_quantum_hybrid_portfolio_optimization(
    raw_price_df=raw_price_df,
    raw_config=config,
    output_dir=output_directory
)

# 4. Inspect Results
print("\n[Fidelity Audit]")
print(f"Mathematical Constraints Verified: {study_artifacts['fidelity_verified']}")

print("\n[Financial Performance Metrics (2025)]")
metrics_df = pd.DataFrame.from_dict(study_artifacts["results_bundle"]["financial_metrics"], orient="index")
print(metrics_df)
```

## Output Structure

The pipeline returns a comprehensive dictionary containing:
-   **`validated_config`**: The strictly typed configuration object.
-   **`data_validation_report`**: Telemetry from the data cleansing phase.
-   **`cleaned_price_df`**: The canonical price matrix used for the study.
-   **`rebalance_dates`**: The frozen temporal schedule.
-   **`results_bundle`**: A nested dictionary containing:
    -   `financial_metrics`: Total Return, Volatility, Sharpe, MDD, Turnover for QAOA, SA, and HRP.
    -   `depth_scaling_table`: Aggregated quantum telemetry (Cost, Iterations, Gradient Norms) across depths $p=1 \dots 6$.
    -   `persisted_files`: A list of all artifacts serialized to disk (Parquet, NPY, JSON).
-   **`fidelity_verified`**: A boolean confirming all mathematical constraints were upheld.

## Project Structure

```
constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm/
│
├── constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm_draft.ipynb   # Main implementation notebook
├── config.yaml                                                                                     # Master configuration file
├── requirements.txt                                                                                # Python package dependencies
│
├── LICENSE                                                                                         # MIT Project License File
└── README.md                                                                                       # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify study parameters such as:
-   **Financial Setup:** Asset universe, lookback window ($L$), and risk aversion ($q$).
-   **Quantum Architecture:** Circuit depths ($p$), Trotterization bounds, and Adam optimizer step sizes.
-   **Classical Solvers:** SA reads/sweeps and SLSQP allocation bounds.
-   **Friction Models:** Transaction cost basis points ($\tau$) and continuity bonus ($\kappa$).

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, strict type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Hardware Execution:** Migrating the `default.qubit` statevector simulator to noisy quantum hardware (e.g., IBM, IonQ) via Amazon Braket or Azure Quantum.
-   **Multi-Period Regularization:** Incorporating explicit turnover penalties directly into the Ising Hamiltonian to optimize the net-of-fee objective natively on the QPU.
-   **Alternative Mixers:** Exploring ring-graph XY-mixers to reduce circuit depth and CNOT gate counts for near-term hardware compatibility.
-   **Expanded Universes:** Scaling the simulation beyond $N=10$ using tensor network simulators (e.g., `default.tensor`).

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{mancilla2026constrained,
  title={Constrained Portfolio Optimization via Quantum Approximate Optimization Algorithm (QAOA) with XY-Mixers and Trotterized Initialization: A Hybrid Approach for Direct Indexing},
  author={Mancilla, Javier and Bouloumis, Theodoros D. and Goguikian, Frederic},
  journal={arXiv preprint arXiv:2602.14827},
  year={2026}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2026). Constrained Portfolio Optimization via QAOA with XY-Mixers: An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm
```

## Acknowledgments

-   Credit to **Javier Mancilla, Theodoros D. Bouloumis, and Frederic Goguikian** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source quantum and scientific Python communities. Sincere thanks to the developers of **PennyLane, D-Wave Ocean, PyPortfolioOpt, Pandas, NumPy, and SciPy**.

--

*This README was generated based on the structure and content of the `constrained_portfolio_optimization_via_quantum_approximate_optimization_algorithm_draft.ipynb` notebook and follows best practices for research software documentation.*

# Paper

Title: "*Constrained Portfolio Optimization via Quantum Approximate Optimization Algorithm (QAOA) with XY-Mixers and Trotterized Initialization: A Hybrid Approach for Direct Indexing*"

Authors: Javier Mancilla, Theodoros D. Bouloumis, Frederic Goguikian

E-Journal Submission Date: 16 February 2026

Link: https://arxiv.org/abs/2602.14827

Abstract:

Portfolio optimization under strict cardinality constraints is a combinatorial challenge that defies classical convex optimization techniques, particularly in the context of "Direct Indexing" and ESG-constrained mandates. In the Noisy Intermediate-Scale Quantum (NISQ) era, the Quantum Approximate Optimization Algorithm (QAOA) offers a promising hybrid approach. However, standard QAOA implementations utilizing transverse field mixers often fail to strictly enforce hard constraints, necessitating soft penalties that distort the energy landscape. This paper presents a comprehensive analysis of a constraint-preserving QAOA formulation against Simulated Annealing (SA) and Hierarchical Risk Parity (HRP). We implement a specific QAOA ansatz utilizing a Dicke state initialization and an XY-mixer Hamiltonian that strictly preserves the Hamming weight of the solution, ensuring only valid portfolios of size K are explored. Furthermore, we introduce a Trotterized parameter initialization schedule inspired by adiabatic quantum computing to mitigate the "Barren Plateau" problem. Backtesting on a basket of 10 US equities over 2025 reveals that our QAOA approach achieves a Sharpe Ratio of 1.81, significantly outperforming Simulated Annealing (1.31) and HRP (0.98). We further analyze the operational implications of the algorithm's high turnover (76.8%), discussing the trade-offs between theoretical optimality and implementation costs in institutional settings.

# Summary

### **Executive Summary**
This paper addresses the **Cardinality Constrained Portfolio Optimization (CCPO)** problem—specifically within the context of "Direct Indexing"—where an investor must select exactly $K$ assets out of a universe of $N$ to maximize risk-adjusted returns. Mathematically, this transforms the convex Markowitz Mean-Variance Optimization (MVO) into an **NP-hard combinatorial problem**.

The authors propose a **Hard-Constraint QAOA (Quantum Approximate Optimization Algorithm)** formulation. Unlike standard QAOA implementations that rely on soft penalty terms (which distort the energy landscape), this approach enforces constraints strictly via the quantum ansatz itself. The method utilizes **Dicke State initialization** and **XY-Mixers** to confine the quantum evolution to the feasible subspace of Hamming weight $K$.

Empirically, the model is backtested on a basket of 10 US equities over a 2025 walk-forward period. The QAOA approach achieves a **Sharpe Ratio of 1.81**, significantly outperforming Simulated Annealing (1.31) and Hierarchical Risk Parity (0.98), albeit with significantly higher portfolio turnover.


### **Mathematical Formulation & The "Combinatorial Cliff"**
The authors identify the core computational bottleneck in Direct Indexing: selecting a subset of assets is discrete, while weight allocation is continuous.

*   **The Objective:** Minimize the Ising Hamiltonian representing the risk-return trade-off:
    $$ \min_{x \in \{0,1\}^N} \left( q x^\top \Sigma x - (1-q) \mu^\top x \right) $$
    Subject to the hard constraint:
    $$ \sum_{i=1}^N x_i = K $$
    Where $\mu$ is the expected return vector, $\Sigma$ is the covariance matrix, and $q$ is the risk aversion parameter (set to $0.3$).

*   **The Challenge:** As $N$ grows, the search space $\binom{N}{K}$ expands factorially. Classical heuristics (Simulated Annealing) often trap in local minima, while standard QAOA with transverse field mixers requires penalty terms ($+ P(\sum x_i - K)^2$) that complicate convergence.

### **The Quantum Methodology: Constraint-Preserving QAOA**
The paper’s primary contribution is a specific QAOA ansatz designed to strictly preserve the cardinality constraint $K$ without penalty terms.

#### **A. Dicke State Initialization**
Instead of the standard superposition of all states $|+\rangle^{\otimes N}$, the system is initialized in a **Dicke State** $|D^K_N\rangle$:
$$ |\psi_0\rangle = |D^K_N\rangle = \binom{N}{K}^{-1/2} \sum_{|x|=K} |x\rangle $$
This ensures the starting state is an equal superposition of *only* valid portfolios with Hamming weight $K$.

#### **B. The XY-Mixer Hamiltonian**
To evolve the system without leaving the feasible subspace, the authors replace the standard transverse field mixer ($H_X = \sum X_i$) with a complete-graph **XY-Mixer**:
$$ H_{XY} = \sum_{(i,j) \in E} (X_i X_j + Y_i Y_j) $$
*   **Mechanism:** This operator performs a partial SWAP ($|01\rangle \leftrightarrow |10\rangle$), exchanging excitation between qubits $i$ and $j$.
*   **Commutation:** Crucially, $[H_{XY}, \sum Z_i] = 0$. The mixer commutes with the number operator, guaranteeing that the unitary evolution $U(\beta) = e^{-i\beta H_{XY}}$ never alters the number of selected assets.

#### **C. Trotterized Parameter Initialization**
To mitigate the **Barren Plateau** problem (vanishing gradients in deep circuits), the authors employ a **Trotterized schedule** inspired by Adiabatic Quantum Computing (AQC).
*   Instead of random initialization, parameters $(\gamma, \beta)$ are initialized via a linear ramp:
    $$ \gamma_l = \frac{l}{p}\Delta t, \quad \beta_l = \left(1 - \frac{l}{p}\right)\Delta t $$
*   **Result:** Diagnostics confirm that gradient norms do *not* vanish even at circuit depth $p=6$, proving the trainability of the circuit.

### **The Hybrid Algorithmic Pipeline**
The authors implement a two-stage hybrid process:
1.  **Discrete Selection (Quantum):** The QAOA-XY circuit (simulated via PennyLane) identifies the optimal bitstring $x$ (the subset of 5 assets).
2.  **Continuous Allocation (Classical):** Once the subset $S$ is fixed, a classical solver (SLSQP) optimizes the weights $w_i$ to maximize the Sharpe Ratio, subject to box constraints ($0.05 \le w_i \le 0.50$).

**Benchmarks:**
*   **Simulated Annealing (SA):** Uses a QUBO formulation with penalty terms.
*   **Hierarchical Risk Parity (HRP):** A clustering-based approach that avoids matrix inversion (used as a baseline and fallback).

### **Empirical Results (2025 Walk-Forward)**
The experiment utilized a rolling window of 180 days to rebalance monthly throughout 2025.

#### **A. Financial Performance**
| Metric | QAOA (XY) | Simulated Annealing | HRP |
| :--- | :---: | :---: | :---: |
| **Total Return** | **30.09%** | 24.17% | 10.88% |
| **Sharpe Ratio** | **1.81** | 1.31 | 0.98 |
| **Max Drawdown** | -8.27% | -9.26% | -8.40% |
| **Monthly Turnover**| **76.8%** | 21.0% | 21.6% |

*   **Analysis:** QAOA consistently found lower-energy states (better risk-return portfolios) than SA. The quantum solver was more aggressive in reallocating capital to capture short-term alpha.

#### **B. Quantum Diagnostics**
*   **Depth Scaling:** Performance improved monotonically from $p=1$ to $p=6$. The final cost decreased from -0.1488 to -0.5355.
*   **Barren Plateaus:** Gradient magnitudes remained healthy ($5.47 \times 10^{-2}$ at $p=6$), validating the Trotterized initialization strategy.


### **Critical Discussion: The Turnover Trade-off**
While QAOA achieved the highest theoretical Sharpe Ratio, the paper highlights a critical operational constraint: **Turnover**.

*   **The Observation:** QAOA exhibited 76.8% monthly turnover, compared to ~21% for classical methods.
*   **Interpretation:** The quantum algorithm, unburdened by "warm-start" biases inherent in SA, aggressively re-optimized the portfolio every month to find the global minimum. While this generates higher gross returns, in a high-transaction-cost environment (e.g., illiquid markets), the net alpha could be eroded.
*   **Conclusion:** The method is currently best suited for liquid, low-fee environments (like US Large Caps).

### **Verdict**
This paper successfully demonstrates that **constraint-preserving QAOA** is not merely a theoretical construct but a viable solver for NP-hard financial problems. By eliminating penalty terms via **XY-mixers** and **Dicke states**, the authors provide a robust framework for Direct Indexing. The superior Sharpe Ratio confirms the quantum algorithm's ability to navigate complex energy landscapes better than thermal heuristics. However, future iterations must incorporate **multi-period coupling** or explicit turnover penalties into the Hamiltonian to make the strategy deployable for institutional capital with strict transaction cost mandates.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  Constrained Portfolio Optimization via QAOA with XY-Mixers and Trotterization
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Constrained Portfolio Optimization via
#  Quantum Approximate Optimization Algorithm (QAOA) with XY-Mixers and
#  Trotterized Initialization: A Hybrid Approach for Direct Indexing" by
#  Javier Mancilla, Theodoros D. Bouloumis, and Frederic Goguikian (2026).
#  It delivers a computationally tractable, hybrid quantum-classical system
#  for solving NP-hard cardinality-constrained portfolio selection problems,
#  specifically tailored for Direct Indexing and ESG-constrained mandates.
#
#  Core Methodological Components:
#  • Constraint-preserving QAOA formulation for exact K-of-N asset selection
#  • Dicke-state initialization to confine evolution to the feasible subspace
#  • Complete-graph XY-mixer Hamiltonian to strictly preserve Hamming weight
#  • Trotterized parameter initialization schedule to mitigate Barren Plateaus
#  • Simulated Annealing (SA) baseline via QUBO with adaptive penalty scaling
#  • Hierarchical Risk Parity (HRP) baseline and robust fallback mechanism
#  • Continuous Sharpe-max weight allocation via SLSQP with box constraints
#  • Walk-forward backtesting framework with explicit transaction cost modeling
#
#  Technical Implementation Features:
#  • PennyLane state-vector simulation for high-fidelity quantum circuit execution
#  • D-Wave Neal simulated annealing sampler for classical combinatorial optimization
#  • PyPortfolioOpt for Ledoit-Wolf covariance shrinkage and HRP clustering
#  • SciPy SLSQP for constrained convex optimization and weight allocation
#  • Strict causal slicing and timezone-aware temporal alignment
#  • Single-pass value_and_grad optimization to halve quantum simulation overhead
#  • Comprehensive artifact persistence and institutional-grade fidelity assertions
#
#  Paper Reference:
#  Mancilla, J., Bouloumis, T. D., & Goguikian, F. (2026). Constrained Portfolio
#  Optimization via Quantum Approximate Optimization Algorithm (QAOA) with
#  XY-Mixers and Trotterized Initialization: A Hybrid Approach for Direct Indexing.
#  arXiv preprint arXiv:2602.14827. https://arxiv.org/abs/2602.14827
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# Standard Library Imports
import os
import json
import math
import random
import logging
import datetime
import platform
import importlib.metadata
from pathlib import Path
from typing import Dict, Any, List, Tuple, Optional, Union, Set

# Third-Party Library Imports
import numpy as np
import pandas as pd
import pennylane as qml
import dimod
import neal
from scipy.optimize import minimize, OptimizeResult
from pypfopt import risk_models
from pypfopt.hierarchical_risk_parity import HRPOpt

# Configure module-level logger
logger = logging.getLogger(__name__)


# Implementation

# Draft 1

## **Discussion of Inputs-Processes-Outputs (IPO) of Key Callables**

This section defines the Inputs, Processes, Outputs, Data Transformations, and the exact mathematical and methodological roles each key callable plays in implementing the "*Constrained Portfolio Optimization via Quantum Approximate Optimization Algorithm (QAOA) with XY-Mixers and Trotterized Initialization: A Hybrid Approach for Direct Indexing*" research methodology:

### **Phase 1: Data Engineering & Validation**

**1. `validate_master_config`**
*   **Inputs:** Raw, unvalidated configuration dictionary.
*   **Processes:** Executes a recursive Depth-First Search to enforce a closed-world schema, mathematically asserts cross-parameter invariants (e.g., $1 \le K \le N$), and pins categorical conventions.
*   **Outputs:** A strictly typed, mathematically feasible configuration dictionary.
*   **Data Transformation:** Transforms an untrusted user payload into a mathematically verified state object.
*   **Role in Pipeline:** Ensures the foundational parameters of the study are exact. It implements the constraints: *"We set $K = 5$ and $q = 0.3$ in all experiments."*

**2. `validate_raw_price_df`**
*   **Inputs:** Raw price DataFrame and validated configuration.
*   **Processes:** Asserts `DatetimeIndex` monotonicity, timezone consistency, exact universe alignment, and quantifies missingness ratios.
*   **Outputs:** A diagnostic validation report dictionary.
*   **Data Transformation:** Transforms raw tabular data into structural metadata.
*   **Role in Pipeline:** Validates the integrity of the empirical inputs, fulfilling: *"Data source & preprocessing. We use Yahoo Finance auto-adjusted Close prices..."*

**3. `cleanse_price_data`**
*   **Inputs:** Raw price DataFrame and validated configuration.
*   **Processes:** Dynamically flattens MultiIndex structures, reorders columns to the canonical universe, applies forward-filling, and drops remaining NaNs.
*   **Outputs:** A pristine, `float64` canonical price matrix.
*   **Data Transformation:** Transforms a potentially noisy, multi-dimensional API response into a strictly 2D, dense, positive-definite matrix.
*   **Role in Pipeline:** Implements the exact data hygiene protocol: *"Missing values are forward-filled, and non-trading rows dropped."*

**4. `build_rebalance_calendar`**
*   **Inputs:** Cleansed price matrix and validated configuration.
*   **Processes:** Generates theoretical monthly anchors, maps them to realized trading days, and mathematically asserts the availability of $L=180$ prior days.
*   **Outputs:** An immutable tuple of 12 `pd.Timestamp` objects.
*   **Data Transformation:** Transforms configuration rules into a concrete, causal chronological schedule.
*   **Role in Pipeline:** Establishes the temporal backbone of the study, implementing the *"monthly walk-forward protocol... across calendar year 2025."*


### **Phase 2: Temporal & Return Infrastructure**

**5. `extract_lookback_window`**
*   **Inputs:** Cleansed price matrix, rebalance timestamp $t$, lookback $L$, and universe list.
*   **Processes:** Applies a strict inequality mask ($< t$) to slice the history, extracting exactly the last $L$ rows.
*   **Outputs:** A causal window DataFrame of shape $(L, N)$.
*   **Data Transformation:** Transforms the full historical matrix into a localized, causal cross-section.
*   **Role in Pipeline:** Prevents look-ahead bias by implementing: *"estimated from the preceding $L = 180$ trading days of returns, i.e., a rolling window $[t - L, t)$."*

**6. `compute_daily_log_returns`**
*   **Inputs:** Causal window DataFrame and validated configuration.
*   **Processes:** Computes element-wise logarithmic returns and truncates the shift-induced NaN row.
*   **Outputs:** A daily log-returns DataFrame of shape $(L-1, N)$.
*   **Data Transformation:** Transforms non-stationary prices $P_{\tau,i}$ into stationary returns $R_{\tau,i} = \ln(P_{\tau,i}/P_{\tau-1,i})$.
*   **Role in Pipeline:** Prepares the data for econometric estimation, fulfilling: *"compute daily returns; drop NA"*.


### **Phase 3: Econometric Estimation & State Management**

**7. `estimate_mu_annualized`**
*   **Inputs:** Daily log-returns DataFrame and validated configuration.
*   **Processes:** Computes the arithmetic sample mean along the time axis and applies the linear annualization scalar (252).
*   **Outputs:** The annualized expected return vector $\mu^{(\text{ann})} \in \mathbb{R}^N$.
*   **Data Transformation:** Collapses a 2D returns matrix into a 1D expected return vector.
*   **Role in Pipeline:** Generates the linear parameters for the objective functions, implementing: *"Estimate $\mu$ (mean)... and annualize both"*.

**8. `estimate_sigma_annualized`**
*   **Inputs:** Daily log-returns DataFrame and validated configuration.
*   **Processes:** Computes the Ledoit-Wolf shrinkage covariance, annualizes it, and asserts symmetry and positive-definiteness.
*   **Outputs:** The annualized covariance matrix $\Sigma^{(\text{ann})} \in \mathbb{R}^{N \times N}$.
*   **Data Transformation:** Transforms the returns matrix into a well-conditioned, symmetric positive-definite risk matrix.
*   **Role in Pipeline:** Generates the quadratic parameters, implementing: *"Covariance is estimated with Ledoit–Wolf shrinkage... and annualized"*.

**9. `update_continuity_state`**
*   **Inputs:** Previous month's binary selection vector, configuration, and solver identifier.
*   **Processes:** Initializes a zero vector for the first month or casts the previous selection to a strict integer indicator vector.
*   **Outputs:** The continuity indicator vector $s_{\text{prev}} \in \{0,1\}^N$.
*   **Data Transformation:** Transforms historical algorithmic state into a binary regularization mask.
*   **Role in Pipeline:** Implements the temporal regularization mechanism: *"If asset $i$ was held last month, we apply a continuity discount... to reduce churn."*


### **Phase 4: Simulated Annealing Discrete Selection**

**10. `build_qubo_matrix`**
*   **Inputs:** $\mu^{(\text{ann})}, \Sigma^{(\text{ann})}$, configuration, and $s_{\text{prev}}$.
*   **Processes:** Computes the adaptive penalty $P$, constructs the upper-triangular QUBO dictionary, and subtracts $\kappa$ from the diagonals of previously held assets.
*   **Outputs:** A sparse dictionary representing the QUBO matrix.
*   **Data Transformation:** Transforms financial moments into a classical quadratic energy landscape.
*   **Role in Pipeline:** Implements the classical baseline formulation (Eq. 7):
    $$ Q_{ii} = q \Sigma_{ii}^{(\text{ann})} - (1 - q) \mu_i^{(\text{ann})} + P (1 - 2K) $$
    $$ Q_{ij} = 2q \Sigma_{ij}^{(\text{ann})} + 2P \quad (i < j) $$

**11. `run_sa_sampling`**
*   **Inputs:** QUBO dictionary, $N$, and configuration.
*   **Processes:** Converts the QUBO to a `dimod.BinaryQuadraticModel`, instantiates the `neal` sampler, and executes the stochastic heuristic using pinned seeds.
*   **Outputs:** A raw `dimod.SampleSet`.
*   **Data Transformation:** Transforms the deterministic energy landscape into a stochastic distribution of binary samples.
*   **Role in Pipeline:** Executes the classical heuristic: *"Sample with neal: num\_reads=5000, num\_sweeps=1000"*.

**12. `filter_sa_samples`**
*   **Inputs:** Raw `SampleSet` and configuration.
*   **Processes:** Iterates over aggregated records, filters for exact Hamming weight $K$, and selects the minimum energy candidate via lexicographic tie-breaking.
*   **Outputs:** The optimal classical binary vector $x^{SA} \in \{0,1\}^N$.
*   **Data Transformation:** Collapses 5000 stochastic samples into a single, deterministic, feasible selection.
*   **Role in Pipeline:** Enforces the hard constraint classically: *"keep best feasible ($\sum_i x_i = K$)"*.


### **Phase 5: QAOA Ansatz Construction**

**13. `prepare_dicke_state_vector`**
*   **Inputs:** Universe size $N$ and cardinality $K$.
*   **Processes:** Enumerates the $2^N$ Hilbert space, identifies indices with Hamming weight $K$, and assigns a uniform real amplitude.
*   **Outputs:** A dense statevector array of length $2^N$.
*   **Data Transformation:** Transforms scalar constraints into a quantum probability amplitude vector.
*   **Role in Pipeline:** Implements the constraint-preserving initialization (Eq. 2):
    $$ |\psi_0\rangle = |D^K_N\rangle = \binom{N}{K}^{-1/2} \sum_{|x|=K} |x\rangle $$

**14. `build_xy_mixer_hamiltonian`**
*   **Inputs:** Universe size $N$ and configuration.
*   **Processes:** Generates complete graph edges and constructs the $X_i X_j + Y_i Y_j$ Pauli tensor products.
*   **Outputs:** A `qml.Hamiltonian` object.
*   **Data Transformation:** Transforms graph topology into a quantum mixing operator.
*   **Role in Pipeline:** Implements the particle-preserving mixer (Eq. 3):
    $$ H_{XY} = \sum_{(i,j) \in E} (X_i X_j + Y_i Y_j) $$

**15. `build_cost_hamiltonian`**
*   **Inputs:** $\mu^{(\text{ann})}, \Sigma^{(\text{ann})}$, configuration, and $s_{\text{prev}}$.
*   **Processes:** Computes $\alpha_i$ (applying continuity) and $\beta_{ij}$, mapping them to Pauli-Z observables.
*   **Outputs:** A `qml.Hamiltonian` object.
*   **Data Transformation:** Transforms financial moments into a quantum phase separator operator.
*   **Role in Pipeline:** Implements the Ising mapping (Eq. 5):
    $$ H_C = \sum_i \alpha_i Z_i + \sum_{i<j} \beta_{ij} Z_i Z_j $$


### **Phase 6: QAOA Training & Selection**

**16. `train_qaoa_single_depth`**
*   **Inputs:** Depth $p$, Dicke vector, $H_C, H_{XY}$, and configuration.
*   **Processes:** Generates Trotterized parameters, constructs the QNode, and executes a single-pass `value_and_grad` Adam optimization with early stopping.
*   **Outputs:** Optimized parameters $\gamma^*, \beta^*$ and a telemetry log.
*   **Data Transformation:** Transforms heuristic initial parameters into optimized variational parameters via gradient descent.
*   **Role in Pipeline:** Implements the Barren Plateau mitigation (Eq. 6) and training:
    $$ \gamma_l = \frac{l}{p}\Delta t, \quad \beta_l = \left(1 - \frac{l}{p}\right)\Delta t $$

**17. `qaoa_readout_and_rescore`**
*   **Inputs:** Trained parameters, quantum invariants, $\mu, \Sigma$, and configuration.
*   **Processes:** Computes exact statevector probabilities, filters via bitwise operations ($|x|=K, \Pr \ge 0.01$), and rescores using the true Markowitz objective.
*   **Outputs:** The optimal quantum binary vector $x^*$, its classical cost, and the diagnostic table.
*   **Data Transformation:** Collapses a continuous quantum probability distribution into a discrete, deterministic selection.
*   **Role in Pipeline:** Bridges the quantum and classical domains: *"filter weight-$K$, prob $\ge 1\%$... Score candidates by classical cost Eq. (1); keep best"*.

**18. `qaoa_select_best_across_depths`**
*   **Inputs:** Quantum invariants, $\mu, \Sigma$, and configuration.
*   **Processes:** Iterates through depths $p \in \{1 \dots 6\}$, aggregates results, and selects the global minimum classical cost.
*   **Outputs:** The final $x^{QAOA}$ vector and the depth-scaling table.
*   **Data Transformation:** Reduces multiple depth evaluations into a single optimal portfolio.
*   **Role in Pipeline:** Implements the depth-scaling logic: *"Choose final QAOA selection as best across $p = 1, \dots, 6$"*.


### **Phase 7: Continuous Allocation & Baseline**

**19. `allocate_sharpe_max`**
*   **Inputs:** Selection vector $x_t, \mu, \Sigma$, and configuration.
*   **Processes:** Extracts the $K$-dimensional subset, executes SLSQP to minimize negative Sharpe, validates bounds, and expands back to $N$ dimensions.
*   **Outputs:** The continuous weight vector $w_t \in \mathbb{R}^N$.
*   **Data Transformation:** Transforms a discrete binary selection into an optimal continuous capital allocation.
*   **Role in Pipeline:** Implements the classical convex allocation: *"Solve Sharpe-max (SLSQP) on $S$ with bounds $0.05 \le w_i \le 0.50$ and $\sum_i w_i = 1$"*.

**20. `compute_hrp_weights`**
*   **Inputs:** Daily log-returns DataFrame and configuration.
*   **Processes:** Instantiates `HRPOpt`, executes recursive bisection clustering, and aligns the output to the canonical universe.
*   **Outputs:** The HRP weight vector $w_t \in \mathbb{R}^N$.
*   **Data Transformation:** Transforms a returns matrix into a hierarchical, risk-parity weighted vector.
*   **Role in Pipeline:** Implements the robust baseline: *"Compute HRP baseline weights (PyPortfolioOpt)"*.

**21. `dispatch_allocation`**
*   **Inputs:** Selection $x_t, \mu, \Sigma$, returns DataFrame, configuration, and metadata.
*   **Processes:** Attempts primary SLSQP allocation, triggers HRP fallback on failure, and mathematically asserts final invariants (sum-to-one, non-negativity).
*   **Outputs:** The final, validated weight vector $w_t$ and a fallback flag.
*   **Data Transformation:** Routes and resolves intermediate allocations into a guaranteed valid portfolio state.
*   **Role in Pipeline:** Implements the fault-tolerance mechanism: *"If the selector fails, we fall back to HRP weights"*.


### **Phase 8: Performance Accounting & Evaluation**

**22. `compute_holding_returns_and_turnover`**
*   **Inputs:** Price DataFrame, timestamps $t, t^+$, weights $w_t, w_{t-1}$, and configuration.
*   **Processes:** Computes price-relative asset returns, aggregates gross portfolio return via dot product, and calculates $L_1$ turnover.
*   **Outputs:** Gross return, asset returns vector, and scalar turnover.
*   **Data Transformation:** Transforms prices and weights into realized performance scalars.
*   **Role in Pipeline:** Implements the operational accounting: *"turnover = $\sum_i |w_{i,t} - w_{i,t-1}|$"*.

**23. `compute_net_return_and_value`**
*   **Inputs:** Gross return, turnover, previous value $V_{t-1}$, and configuration.
*   **Processes:** Applies the transaction cost penalty, geometrically compounds the portfolio value, and asserts $V_t > 0$.
*   **Outputs:** Net return and updated value $V_t$.
*   **Data Transformation:** Transforms gross metrics into a net wealth path.
*   **Role in Pipeline:** Implements the friction model: *"Net return = gross - $\tau \times$ turnover; $V_t \leftarrow V_{t-1} (1 + \text{net return})$"*.

**24. `compute_all_metrics`**
*   **Inputs:** Strategy time-series dictionaries, QAOA diagnostics, and configuration.
*   **Processes:** Computes Total Return, unbiased Annualized Volatility, Sharpe Ratio, Maximum Drawdown, and aggregates the depth-scaling table.
*   **Outputs:** The final comprehensive results dictionary.
*   **Data Transformation:** Transforms chronological arrays into summary statistical tables.
*   **Role in Pipeline:** Generates the data for Table II and Table III, fulfilling: *"report net performance"*.

### **Phase 9: Orchestration, Persistence & Verification**

**25. `persist_artifacts`**
*   **Inputs:** All generated data, decisions, metrics, configuration, and output directory.
*   **Processes:** Serializes DataFrames to Parquet, arrays to NPY, dicts to JSON/CSV, and generates a software version manifest.
*   **Outputs:** A list of written absolute file paths.
*   **Data Transformation:** Freezes in-memory state into immutable disk artifacts.
*   **Role in Pipeline:** Ensures institutional reproducibility and auditability of the research pipeline.

**26. `run_full_backtest`**
*   **Inputs:** Cleansed price DataFrame, rebalance dates, and configuration.
*   **Processes:** Initializes state, executes the Algorithm 1 walk-forward loop, finalizes metrics, and triggers persistence.
*   **Outputs:** The comprehensive results bundle.
*   **Data Transformation:** Orchestrates the macro-transformation of clean data into a complete backtest execution state.
*   **Role in Pipeline:** Implements the entirety of *"Algorithm 1 QAOA-XY / SA Hybrid Portfolio Construction"*.

**27. `run_fidelity_assertions`**
*   **Inputs:** Results bundle, price DataFrame, rebalance dates, and configuration.
*   **Processes:** Mathematically asserts $K$-hot feasibility, sum-to-one constraints, box bounds, strict causality ($< t$), and structural completeness.
*   **Outputs:** None (raises exceptions on failure).
*   **Data Transformation:** Transforms the results bundle into a mathematical proof of validity.
*   **Role in Pipeline:** Guarantees the pipeline strictly adhered to the *"hard constraints"* and causality requirements of the study.

**28. `execute_quantum_hybrid_portfolio_optimization` (Top-Level)**
*   **Inputs:** Raw price DataFrame, raw configuration, and output directory.
*   **Processes:** Executes Phase 1 (Data Engineering) followed by Phase 2 (Optimization & Verification).
*   **Outputs:** The final, verified, comprehensive dictionary of all artifacts.
*   **Data Transformation:** The ultimate Facade; transforms raw, unstructured inputs into verified, publication-grade artifacts.
*   **Role in Pipeline:** Serves as the master entry point for the *"end-to-end evaluation pipeline"*.


<br><br>

## **Usage Example**

Here is the granular, step-by-step guide to executing the end-to-end pipeline for **"Constrained Portfolio Optimization via Quantum Approximate Optimization Algorithm (QAOA) with XY-Mixers and Trotterized Initialization."**

*Note: This example assumes that all the callables defined in this conversation (from Task 1 through Task 27-plus, including the top-level orchestrator `execute_quantum_hybrid_portfolio_optimization`) are loaded into the current namespace of a single Jupyter Notebook. No external `.py` module imports are required for the pipeline functions.*

<br>

### **Step 1: Synthetic Data Generation (`raw_price_df`)**

The first requirement is a high-fidelity synthetic representation of the raw time-series price matrix. This dataset must adhere strictly to the schema defined in the study: a monotonically increasing `DatetimeIndex` and 10 specific asset columns containing strictly positive, auto-adjusted `float64` prices.

**Methodology:**
1.  **Date Generation:** We generate a sequence of business days (Monday-Friday) spanning from January 2024 through December 2025. This guarantees sufficient pre-history to satisfy the $L=180$ lookback window requirement prior to the first rebalance in January 2025.
2.  **Price Simulation:** We simulate asset paths using Geometric Brownian Motion (GBM). GBM is the industry standard for modeling equity prices because it mathematically guarantees strict positivity ($P_t > 0$) and log-normally distributed returns, perfectly aligning with the study's econometric assumptions.
3.  **Schema Enforcement:** We explicitly cast the index to `datetime64[ns]` and the matrix to `float64` to satisfy the strict validation gates of Task 2.

```python
import pandas as pd
import numpy as np
import yaml
import os
from typing import Dict, Any

# Set global seed for reproducible synthetic data generation
np.random.seed(42)

def generate_synthetic_price_matrix(
    start_date: str = "2024-01-01",
    end_date: str = "2025-12-31",
    universe: list = ["AAPL", "MSFT", "GOOGL", "AMZN", "JPM", "V", "TSLA", "UNH", "LLY", "XOM"]
) -> pd.DataFrame:
    """
    Generates a synthetic, high-fidelity price matrix using Geometric Brownian Motion.

    Purpose:
        To create a mathematically plausible dataset that mimics the schema and statistical
        properties of the yfinance auto-adjusted close prices. This allows for the execution
        of the quantum-hybrid pipeline without requiring live API access.

    Inputs:
        start_date (str): The start date of the simulation. Default is "2024-01-01".
        end_date (str): The end date of the simulation. Default is "2025-12-31".
        universe (list): The canonical list of 10 asset tickers.

    Processes:
        1. Date Generation: Creates a business-day frequency DatetimeIndex.
        2. GBM Simulation: Simulates strictly positive price paths using drift (mu)
           and volatility (sigma) parameters typical of US Large Cap equities.
        3. Schema Enforcement: Constructs the DataFrame and enforces float64 types.

    Outputs:
        pd.DataFrame: A DataFrame of shape (T, 10) with a DatetimeIndex.
    """
    # 1. Generate Trading Days (Business Days)
    dates = pd.date_range(start=start_date, end=end_date, freq='B')
    num_days = len(dates)
    num_assets = len(universe)

    # 2. Simulate Prices via Geometric Brownian Motion (GBM)
    # Assume an average annualized return of 10% and annualized volatility of 20%
    mu_annual = 0.10
    sigma_annual = 0.20
    dt = 1.0 / 252.0  # Daily time step

    # Initialize price matrix with starting prices around $150
    prices = np.zeros((num_days, num_assets))
    prices[0] = np.random.uniform(100, 200, size=num_assets)

    # Generate daily shocks
    for t in range(1, num_days):
        # Standard normal shocks
        Z = np.random.standard_normal(num_assets)
        # GBM discrete exact solution: P_t = P_{t-1} * exp((mu - sigma^2/2)*dt + sigma*sqrt(dt)*Z)
        drift = (mu_annual - 0.5 * sigma_annual**2) * dt
        diffusion = sigma_annual * np.sqrt(dt) * Z
        prices[t] = prices[t-1] * np.exp(drift + diffusion)

    # 3. Construct DataFrame and Enforce Schema
    raw_price_df = pd.DataFrame(prices, index=dates, columns=universe)
    raw_price_df.index.name = "Date"
    raw_price_df = raw_price_df.astype(np.float64)

    return raw_price_df

# Generate the synthetic dataset
raw_price_df = generate_synthetic_price_matrix()

# Preview the data to verify schema compliance
print("Synthetic Price Matrix Preview:")
print(raw_price_df.head())
print(f"\nShape: {raw_price_df.shape}")
print(f"Index Type: {type(raw_price_df.index)}")
```

<br>

### **Step 2: Loading the Configuration (`config.yaml`)**

The study relies on a deterministic configuration file (`config.yaml`) that defines all hyperparameters, quantum circuit architectures, and evaluation metrics. We assume this file has been saved in the current working directory.

**Methodology:**
1.  **File I/O:** Open `config.yaml` in read mode.
2.  **Parsing:** Use `yaml.safe_load` to convert the YAML structure into a nested Python dictionary.
3.  **Validation:** The resulting dictionary will be passed to the orchestrator, which will subject it to the rigorous recursive type-checking defined in Task 1.

```python
def load_study_configuration(filepath: str = "config.yaml") -> Dict[str, Any]:
    """
    Loads the study configuration parameters from a YAML file into a Python dictionary.

    Purpose:
        To ingest the deterministic hyperparameters, quantum circuit definitions, and
        evaluation metrics defined in the external configuration file. This ensures
        institutional reproducibility by separating code from configuration state.

    Inputs:
        filepath (str): The path to the YAML configuration file. Default is "config.yaml".

    Outputs:
        Dict[str, Any]: A nested dictionary containing the study configuration.

    Raises:
        FileNotFoundError: If the configuration file is missing.
        yaml.YAMLError: If the file contains invalid YAML syntax.
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"CRITICAL: Configuration file '{filepath}' not found in the working directory.")

    try:
        with open(filepath, "r") as file:
            config = yaml.safe_load(file)
        print(f"\nSuccessfully loaded configuration from {filepath}")
        return config
    except yaml.YAMLError as e:
        print(f"\nError parsing YAML file {filepath}: {e}")
        raise

# Load the configuration
# Note: Ensure 'config.yaml' is in your working directory with the content provided previously.
config = load_study_configuration("config.yaml")
```

<br>

### **Step 3: Executing the Pipeline (`execute_quantum_hybrid_portfolio_optimization`)**

With the data (`raw_price_df`) and configuration (`config`) in memory, we invoke the top-level orchestrator. This function manages the entire lifecycle: data cleansing, causal calendar generation, quantum circuit simulation (QAOA-XY), classical simulated annealing (SA), continuous weight allocation (SLSQP), performance accounting, and the final mathematical fidelity audit.

**Methodology:**
1.  **Function Call:** Pass the DataFrame, Dictionary, and an output directory path to the orchestrator.
2.  **Output Handling:** The function returns a comprehensive dictionary containing the validated artifacts, financial metrics, depth-scaling tables, and a boolean flag confirming that all mathematical constraints (e.g., $K$-hot feasibility, zero look-ahead bias) were strictly upheld.

```python
# ==============================================================================
# Execution of the End-to-End Study Pipeline
# ==============================================================================

if __name__ == "__main__":
    # Ensure we have valid inputs before running
    if not raw_price_df.empty and config:
        
        print("\nInitiating Quantum-Hybrid Portfolio Optimization Pipeline...")
        
        # Define the output directory for serialized artifacts (Parquet, NPY, JSON)
        output_directory = "./quantum_portfolio_artifacts"
        
        # Execute the pipeline
        # This will trigger the extensive logging defined throughout the 27 tasks
        study_artifacts = execute_quantum_hybrid_portfolio_optimization(
            raw_price_df=raw_price_df,
            raw_config=config,
            output_dir=output_directory
        )
        
        # ==============================================================================
        # Inspecting the Outputs
        # ==============================================================================
        
        print("\n" + "="*80)
        print("STUDY EXECUTION COMPLETE")
        print("="*80)
        
        # 1. Verify Institutional Fidelity
        fidelity_status = study_artifacts.get("fidelity_verified", False)
        print(f"\n[Fidelity Audit]")
        print(f"Mathematical Constraints & Causality Verified: {fidelity_status}")
        
        # 2. Accessing Financial Metrics (Table III equivalent)
        print("\n[Financial Performance Metrics (2025)]")
        metrics_dict = study_artifacts["results_bundle"]["financial_metrics"]
        metrics_df = pd.DataFrame.from_dict(metrics_dict, orient="index")
        # Format for readability
        metrics_df["Total Return"] = metrics_df["Total Return"].apply(lambda x: f"{x:.2%}")
        metrics_df["Annualized Volatility"] = metrics_df["Annualized Volatility"].apply(lambda x: f"{x:.2%}")
        metrics_df["Max Drawdown"] = metrics_df["Max Drawdown"].apply(lambda x: f"{x:.2%}")
        metrics_df["Average Monthly Turnover"] = metrics_df["Average Monthly Turnover"].apply(lambda x: f"{x:.2%}")
        metrics_df["Sharpe Ratio"] = metrics_df["Sharpe Ratio"].apply(lambda x: f"{x:.2f}")
        print(metrics_df)
        
        # 3. Accessing QAOA Depth-Scaling Diagnostics (Table II equivalent)
        print("\n[QAOA-XY Depth Scaling Diagnostics]")
        depth_table = study_artifacts["results_bundle"].get("depth_scaling_table", [])
        if depth_table:
            depth_df = pd.DataFrame(depth_table)
            print(depth_df.to_string(index=False))
            
        # 4. Confirm Artifact Persistence
        persisted_files = study_artifacts["results_bundle"].get("persisted_files", [])
        print(f"\n[Artifact Persistence]")
        print(f"Successfully serialized {len(persisted_files)} artifacts to '{output_directory}'.")
        
    else:
        print("Error: Missing data or configuration. Cannot proceed.")
```

### **Summary of the Execution Flow**

1.  **Data Ingestion:** The synthetic `raw_price_df` and parsed `config` are passed to the top-level orchestrator.
2.  **Phase 1 (Data Engineering):** The pipeline recursively validates the schema, cleanses the price matrix (handling NaNs), and freezes a strictly causal 12-month rebalance calendar for 2025.
3.  **Phase 2 (Optimization Loop):** For each month, it estimates $\mu$ and $\Sigma$, constructs the QUBO and QAOA Hamiltonians, executes the quantum simulation across depths $p \in \{1 \dots 6\}$, and selects the optimal $K=5$ assets.
4.  **Allocation & Accounting:** It solves the continuous Sharpe-max problem via SLSQP, applies the 5 bps transaction cost to the realized turnover, and compounds the portfolio value.
5.  **Verification:** It mathematically proves that no look-ahead bias occurred and that all portfolios strictly adhered to the $K$-hot and sum-to-one constraints before returning the final publication-grade metrics.


<br><br>

## **Implemented Callables**

In [None]:
# Task 1 — Validate `MASTER_STUDY_CONFIGURATION` integrity and completeness

# ==============================================================================
# Task 1: Validate MASTER_STUDY_CONFIGURATION integrity and completeness
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1: Verify required top-level and nested keys exist with correct types (closed schema).
# -------------------------------------------------------------------------------------------------------------------------------

# Define the function to verify the schema types and completeness
def _verify_schema_types_and_completeness(config: Dict[str, Any]) -> None:
    """
    Recursively verifies that the configuration dictionary strictly adheres to the exhaustive schema.

    This function enforces a "closed-world assumption" for the configuration payload. It
    traverses every node of the dictionary, mathematically asserting that no keys are missing,
    no unauthorized keys are present, and every value strictly matches its expected Python type.
    This prevents silent numerical drift caused by misconfigured solver parameters.

    Args:
        config (Dict[str, Any]): The master study configuration dictionary.

    Raises:
        ValueError: If any keys are missing or unexpected at any level of the hierarchy.
        TypeError: If any configuration value does not match the expected type.
    """
    # Define the absolute ground truth schema for the entire configuration payload.
    # Note: Free-form dictionaries (like plotting_parameters) are mapped to the `dict` type class,
    # while structured dictionaries are mapped to nested dictionary instances.
    exhaustive_schema = {
        "global_setup": {
            "universe": list, "lookback_window_L": int, "annualization_factor": int,
            "rebalance_frequency": str, "backtest_year": int
        },
        "rebalance_calendar": {
            "rebalance_rule": str, "rebalance_dates_override": (list, type(None))
        },
        "data_conventions": {
            "price_field": str, "yfinance_auto_adjust": bool, "missing_data_handling": str,
            "window_construction_policy": str, "min_observations_per_window": int
        },
        "return_conventions": {
            "return_type": str, "holding_period_return_method": str
        },
        "capital_and_bounds": {
            "initial_capital": float, "allocation_bounds": tuple
        },
        "objective_function": {
            "cardinality_K": int, "risk_aversion_q": float, "continuity_bonus_kappa": float,
            "continuity_bonus_application_rule": str, "classical_objective_rescoring_policy": str
        },
        "estimation": {
            "covariance_estimator": str, "mean_estimator": str, "annualize_mu_and_sigma": bool
        },
        "qaoa_architecture": {
            "qaoa_depths_p": list, "qaoa_device_name": str, "qaoa_measurement_mode": str,
            "qaoa_shots": (int, type(None)), "xy_mixer_graph_type": str,
            "qaoa_gamma_bounds": tuple, "qaoa_beta_bounds": tuple, "trotter_initialization_rule": str
        },
        "qaoa_optimization": {
            "qaoa_stepsize": float, "qaoa_epsilon": float, "qaoa_max_iterations": int,
            "qaoa_use_early_stopping": bool,
            "qaoa_early_stopping": {  # Explicitly defined nested schema
                "monitor": str,
                "patience": int,
                "min_delta": float
            },
            "qaoa_readout_filter_prob": float, "qaoa_readout_feasibility_filter": str
        },
        "sa_solver": {
            "sa_penalty_scalar": float, "sa_num_reads": int, "sa_num_sweeps": int,
            "sa_feasibility_filter": str
        },
        "allocation": {
            "allocation_objective": str, "risk_free_rate_annual": float, "optimizer": str,
            "full_investment_constraint": str, "bounds": tuple
        },
        "hrp": {
            "compute_hrp_baseline": bool, "use_hrp_as_fallback": bool
        },
        "evaluation_metrics": {
            "transaction_cost_bps": float, "turnover_definition": str,
            "periods_per_year": int, "sharpe_definition_policy": str
        },
        "randomness": {
            "global_seed": int, "sa_seed": int, "qaoa_seed": int
        },
        "plotting_parameters": dict,      # Leaf node: free-form dictionary
        "raw_data_schema_example": dict   # Leaf node: free-form dictionary
    }

    def _recursive_type_check(config_node: Dict[str, Any], schema_node: Dict[str, Any], path_string: str) -> None:
        """Helper function to recursively traverse and validate the configuration tree."""
        config_keys = set(config_node.keys())
        schema_keys = set(schema_node.keys())

        # Assert exact set equality of keys at the current node
        if config_keys != schema_keys:
            missing = schema_keys - config_keys
            unexpected = config_keys - schema_keys
            error_msg = f"Schema violation at '{path_string}'."
            if missing:
                error_msg += f" Missing keys: {missing}."
            if unexpected:
                error_msg += f" Unexpected keys: {unexpected}."
            raise ValueError(error_msg)

        # Iterate through keys and assert types
        for key, expected_type in schema_node.items():
            current_path = f"{path_string}.{key}" if path_string else key
            value = config_node[key]

            # Bifurcate logic to handle nested dictionaries vs primitive types
            if isinstance(expected_type, dict):
                # If the schema defines a nested dictionary, assert the value is a dict
                if not isinstance(value, dict):
                    raise TypeError(f"Type violation at '{current_path}'. Expected dict, got {type(value).__name__}.")

                # Recurse deeper into the tree
                _recursive_type_check(value, expected_type, current_path)
            else:
                # If the schema defines a primitive type (or tuple of types), assert it directly
                if not isinstance(value, expected_type):
                    raise TypeError(f"Type violation at '{current_path}'. Expected {expected_type}, got {type(value).__name__}.")

    # Initiate the recursive validation from the root of the configuration dictionary
    _recursive_type_check(config, exhaustive_schema, "")
    logger.debug("Exhaustive recursive schema validation passed.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Validate cross-parameter invariants that determine mathematical feasibility.
# -------------------------------------------------------------------------------------------------------------------------------

# Define the function to validate mathematical invariants
def _validate_cross_parameter_invariants(config: Dict[str, Any]) -> None:
    """
    Validates the mathematical feasibility of the configuration parameters.

    This function ensures that the cardinality constraint is valid given the universe size,
    that the risk aversion parameter is within the unit interval, that the continuous
    allocation bounds permit a feasible sum-to-one solution, and that solver parameters
    are strictly positive.

    Args:
        config (Dict[str, Any]): The master study configuration dictionary.

    Raises:
        ValueError: If any mathematical invariant is violated.
    """
    # Extract the universe size N
    n_assets = len(config["global_setup"]["universe"])
    # Extract the cardinality constraint K
    k_cardinality = config["objective_function"]["cardinality_K"]

    # Assert that K is between 1 and N inclusive
    if not (1 <= k_cardinality <= n_assets):
        # Raise an error if the cardinality constraint is mathematically invalid
        raise ValueError(f"Cardinality K ({k_cardinality}) must satisfy 1 <= K <= N ({n_assets})")

    # Extract the risk aversion parameter q
    risk_aversion_q = config["objective_function"]["risk_aversion_q"]

    # Assert that q is within the closed interval [0, 1]
    if not (0.0 <= risk_aversion_q <= 1.0):
        # Raise an error if the risk aversion parameter is out of bounds
        raise ValueError(f"Risk aversion q ({risk_aversion_q}) must satisfy 0 <= q <= 1")

    # Extract the minimum and maximum allocation bounds
    w_min, w_max = config["capital_and_bounds"]["allocation_bounds"]

    # Calculate the minimum possible sum of weights for K assets
    min_possible_sum = k_cardinality * w_min
    # Calculate the maximum possible sum of weights for K assets
    max_possible_sum = k_cardinality * w_max

    # Assert that the sum-to-one constraint is feasible given the bounds and cardinality
    if not (min_possible_sum <= 1.0 <= max_possible_sum):
        # Raise an error if the allocation bounds make the equality constraint infeasible
        raise ValueError(f"Allocation bounds ({w_min}, {w_max}) with K={k_cardinality} make sum=1 constraint infeasible. "
                         f"Min sum: {min_possible_sum}, Max sum: {max_possible_sum}")

    # Extract the list of QAOA depths to scan
    qaoa_depths = config["qaoa_architecture"]["qaoa_depths_p"]

    # Assert that the depths list is not empty
    if not qaoa_depths:
        # Raise an error if no QAOA depths are provided
        raise ValueError("QAOA depths list cannot be empty")

    # Assert that the maximum depth does not exceed the practical limit of 10
    if max(qaoa_depths) > 10:
        # Raise an error if the circuit depth is pathologically high
        raise ValueError(f"Maximum QAOA depth ({max(qaoa_depths)}) exceeds practical limit of 10")

    # Iterate through the provided QAOA depths
    for depth in qaoa_depths:
        # Assert that each depth is a strictly positive integer
        if not isinstance(depth, int) or depth <= 0:
            # Raise an error if an invalid depth is found
            raise ValueError(f"QAOA depths must be positive integers, got {depth}")

    # Extract the number of reads for the Simulated Annealing solver
    sa_reads = config["sa_solver"]["sa_num_reads"]
    # Extract the number of sweeps for the Simulated Annealing solver
    sa_sweeps = config["sa_solver"]["sa_num_sweeps"]

    # Assert that the number of SA reads is strictly positive
    if sa_reads <= 0:
        # Raise an error if SA reads are invalid
        raise ValueError(f"SA num_reads must be > 0, got {sa_reads}")
    # Assert that the number of SA sweeps is strictly positive
    if sa_sweeps <= 0:
        # Raise an error if SA sweeps are invalid
        raise ValueError(f"SA num_sweeps must be > 0, got {sa_sweeps}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Validate that every drift-inducing convention is explicitly pinned.
# -------------------------------------------------------------------------------------------------------------------------------

# Define the function to validate categorical conventions
def _validate_drift_inducing_conventions(config: Dict[str, Any]) -> None:
    """
    Validates that all categorical and drift-inducing conventions are explicitly pinned.

    This function ensures that string-based configurations match the exact permitted
    literals, preventing silent methodological drift. It also enforces the determinism
    contract for random seeds and measurement modes.

    Args:
        config (Dict[str, Any]): The master study configuration dictionary.

    Raises:
        ValueError: If a convention string is unrecognized or a seed is invalid.
    """
    # Extract the rebalance rule string
    rebalance_rule = config["rebalance_calendar"].get("rebalance_rule")
    # Extract the explicit rebalance dates override list
    rebalance_override = config["rebalance_calendar"].get("rebalance_dates_override")

    # Assert that at least one deterministic rebalance schedule method is provided
    if not rebalance_rule and not rebalance_override:
        # Raise an error if the rebalance schedule is completely ambiguous
        raise ValueError("Must provide either a rebalance_rule or rebalance_dates_override")

    # Extract the return type convention
    return_type = config["return_conventions"]["return_type"]
    # Assert that the return type is exactly 'log' or 'simple'
    if return_type not in {"log", "simple"}:
        # Raise an error if the return type is unrecognized
        raise ValueError(f"return_type must be 'log' or 'simple', got '{return_type}'")

    # Extract the holding period return method convention
    hp_method = config["return_conventions"]["holding_period_return_method"]
    # Assert that the holding period method is exactly 'price_relative' or 'compound_daily_returns'
    if hp_method not in {"price_relative", "compound_daily_returns"}:
        # Raise an error if the holding period method is unrecognized
        raise ValueError(f"holding_period_return_method must be 'price_relative' or 'compound_daily_returns', got '{hp_method}'")

    # Extract the XY mixer graph topology convention
    mixer_type = config["qaoa_architecture"]["xy_mixer_graph_type"]
    # Assert that the mixer topology is exactly 'complete' or 'ring'
    if mixer_type not in {"complete", "ring"}:
        # Raise an error if the mixer topology is unrecognized
        raise ValueError(f"xy_mixer_graph_type must be 'complete' or 'ring', got '{mixer_type}'")

    # Extract the Trotterized initialization rule convention
    trotter_rule = config["qaoa_architecture"]["trotter_initialization_rule"]
    # Assert that the Trotter rule matches the pinned literal exactly
    if trotter_rule != "linear_ramp_between_bounds_per_depth":
        # Raise an error if the Trotter rule is unrecognized
        raise ValueError(f"trotter_initialization_rule must be 'linear_ramp_between_bounds_per_depth', got '{trotter_rule}'")

    # Extract the global random seed
    global_seed = config["randomness"]["global_seed"]
    # Extract the Simulated Annealing random seed
    sa_seed = config["randomness"]["sa_seed"]
    # Extract the QAOA random seed
    qaoa_seed = config["randomness"]["qaoa_seed"]

    # Iterate through the extracted seeds to validate them
    for seed_name, seed_val in [("global_seed", global_seed), ("sa_seed", sa_seed), ("qaoa_seed", qaoa_seed)]:
        # Assert that each seed is a non-negative integer
        if not isinstance(seed_val, int) or seed_val < 0:
            # Raise an error if a seed is invalid, breaking the determinism contract
            raise ValueError(f"{seed_name} must be a non-negative integer, got {seed_val}")

    # Extract the annual risk-free rate
    rf_rate = config["allocation"]["risk_free_rate_annual"]
    # Assert that the risk-free rate is explicitly provided as a float
    if not isinstance(rf_rate, float):
        # Raise an error if the risk-free rate is not a float
        raise TypeError(f"risk_free_rate_annual must be a float, got {type(rf_rate).__name__}")

    # Extract the QAOA measurement mode
    measurement_mode = config["qaoa_architecture"]["qaoa_measurement_mode"]
    # Extract the number of QAOA shots
    qaoa_shots = config["qaoa_architecture"]["qaoa_shots"]

    # Check if the measurement mode is set to full statevector probabilities
    if measurement_mode == "statevector_full_probs":
        # Assert that shots are explicitly set to None to prevent shot noise drift
        if qaoa_shots is not None:
            # Raise an error if shots are provided during statevector simulation
            raise ValueError("qaoa_shots must be None when qaoa_measurement_mode is 'statevector_full_probs'")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

# Define the main orchestrator function for configuration validation
def validate_master_config(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the comprehensive validation of the master study configuration.

    This function acts as a strict gatekeeper, ensuring that the provided configuration
    dictionary meets all structural, mathematical, and categorical requirements before
    allowing the research pipeline to proceed. It executes the validation steps in a
    deterministic sequence and returns the unmutated configuration upon success.

    Args:
        config (Dict[str, Any]): The raw master study configuration dictionary.

    Returns:
        Dict[str, Any]: The validated, unmutated configuration dictionary.

    Raises:
        ValueError: If any structural, mathematical, or categorical invariant is violated.
        TypeError: If any configuration value possesses an incorrect data type.
    """
    # Execute Step 1: Verify structural schema and types
    _verify_schema_types_and_completeness(config)

    # Execute Step 2: Verify mathematical feasibility invariants
    _validate_cross_parameter_invariants(config)

    # Execute Step 3: Verify categorical conventions and determinism contracts
    _validate_drift_inducing_conventions(config)

    # Return the validated configuration dictionary, confirming it is safe for downstream use
    return config


In [None]:
# Task 2 — Validate raw_price_df schema, index, and value integrity (validate_raw_price_df)

# ==============================================================================
# Task 2: Validate raw_price_df schema, index, and value integrity
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Validate the index: type, monotonicity, duplicates, and horizon sufficiency.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_index_integrity(raw_price_df: pd.DataFrame) -> Dict[str, Any]:
    """
    Validates the temporal integrity and timezone consistency of the raw price DataFrame's index.

    This function ensures the index is a DatetimeIndex, is strictly monotonically
    increasing, contains no duplicate timestamps, possesses a consistent timezone state,
    and spans a sufficient chronological horizon to support the 2025 backtest.

    Args:
        raw_price_df (pd.DataFrame): The raw time-series price matrix.

    Returns:
        Dict[str, Any]: A dictionary containing index validation metrics.

    Raises:
        TypeError: If the index is not a pandas DatetimeIndex.
        ValueError: If the index is not monotonic, has duplicates, or lacks sufficient history.
    """
    idx = raw_price_df.index

    # Assert the index is of type pandas.DatetimeIndex
    if not isinstance(idx, pd.DatetimeIndex):
        raise TypeError(f"raw_price_df index must be a pd.DatetimeIndex, got {type(idx).__name__}")

    # Assert the index is strictly monotonically increasing
    if not idx.is_monotonic_increasing:
        raise ValueError("raw_price_df index is not monotonically increasing. Chronological ordering is required.")

    # Assert there are no duplicate timestamps
    if idx.has_duplicates:
        duplicates = idx[idx.duplicated()].unique()
        raise ValueError(f"raw_price_df index contains duplicate timestamps: {duplicates}")

    # Assert timezone consistency
    # A valid DatetimeIndex in pandas will have a single tz attribute. If it were mixed,
    # pandas would have cast the index to an Object dtype, which we caught above.
    # We log the timezone state for downstream alignment awareness.
    index_tz = idx.tz
    if index_tz is None:
        logger.debug("Price index is timezone-naive.")
    else:
        logger.debug(f"Price index is timezone-aware: {index_tz}")

    earliest_date = idx.min()
    latest_date = idx.max()

    # Assert the latest date is in or after the backtest year (2025)
    if latest_date < pd.Timestamp('2025-01-01', tz=index_tz):
        raise ValueError(f"Data horizon insufficient. Latest date {latest_date.date()} is before the 2025 backtest period.")

    # Enforce the heuristic: count rows strictly before 2025-02-01 to ensure L=180 is feasible
    pre_rebalance_cutoff = pd.Timestamp('2025-02-01', tz=index_tz)
    pre_history_mask = idx < pre_rebalance_cutoff
    pre_history_count = pre_history_mask.sum()

    # Assert the pre-history count meets the L=180 requirement
    if pre_history_count < 180:
        raise ValueError(f"Insufficient historical data. Found {pre_history_count} trading days before "
                         f"{pre_rebalance_cutoff.date()}, but at least L=180 are required.")

    return {
        "total_rows": len(idx),
        "earliest_date": earliest_date.isoformat(),
        "latest_date": latest_date.isoformat(),
        "timezone": str(index_tz) if index_tz else "naive",
        "pre_2025_02_01_row_count": int(pre_history_count)
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Validate columns: single-level index, exact universe membership, numeric types, non-empty series.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_column_integrity(raw_price_df: pd.DataFrame, universe: List[str]) -> Dict[str, Any]:
    """
    Validates the structural and schema integrity of the raw price DataFrame's columns.

    This function ensures the columns form a single-level index, exactly match the
    canonical universe of tickers without omissions or additions, contain only numeric
    data types, and are not entirely composed of missing values.

    Args:
        raw_price_df (pd.DataFrame): The raw time-series price matrix.
        universe (List[str]): The canonical list of asset tickers.

    Returns:
        Dict[str, Any]: A dictionary containing column validation metrics.

    Raises:
        TypeError: If the columns are a MultiIndex or contain non-numeric data.
        ValueError: If the column labels do not exactly match the universe, or if a column is entirely NaN.
    """
    # Extract the columns object
    cols = raw_price_df.columns

    # Assert the column index is a single-level index (not a MultiIndex)
    if isinstance(cols, pd.MultiIndex):
        raise TypeError("raw_price_df columns must be a single-level index. MultiIndex detected. "
                        "Data must be normalized prior to this validation step.")

    # Convert columns and universe to sets for exact membership comparison
    col_set = set(cols)
    universe_set = set(universe)

    # Assert exact set equality between DataFrame columns and the canonical universe
    if col_set != universe_set:
        missing = universe_set - col_set
        unexpected = col_set - universe_set
        error_msg = "Column labels do not exactly match the canonical universe."
        if missing:
            error_msg += f" Missing tickers: {missing}."
        if unexpected:
            error_msg += f" Unexpected tickers: {unexpected}."
        raise ValueError(error_msg)

    # Iterate through each column to validate data types and non-empty status
    for ticker in universe:
        series = raw_price_df[ticker]

        # Assert the column data type is numeric (float64 or coercible)
        if not pd.api.types.is_numeric_dtype(series):
            raise TypeError(f"Column '{ticker}' must be numeric, got dtype {series.dtype}")

        # Assert the column is not entirely composed of NaN values
        if series.isna().all():
            raise ValueError(f"Column '{ticker}' is entirely NaN. Asset cannot be optimized.")

    # Return the column validation metrics for the audit report
    return {
        "column_count": len(cols),
        "universe_match": True,
        "all_numeric": True
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Validate value integrity: missingness quantification, temporary forward-fill sanity, strict positivity.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_value_integrity(raw_price_df: pd.DataFrame) -> Dict[str, Any]:
    """
    Validates the numerical integrity of the raw price DataFrame's values.

    This function quantifies missing data (NaNs) per asset, logs warnings if the missingness
    ratio exceeds 5%, and performs a strict positivity check on a temporary forward-filled
    view of the data to ensure no zero or negative prices exist.

    Args:
        raw_price_df (pd.DataFrame): The raw time-series price matrix.

    Returns:
        Dict[str, Any]: A dictionary containing value integrity metrics and NaN statistics.

    Raises:
        ValueError: If any non-positive prices are detected after forward-filling.
    """
    total_rows = len(raw_price_df)
    nan_counts = {}
    nan_ratios = {}
    warnings_logged = []

    # Iterate through each column to quantify missingness
    for col in raw_price_df.columns:
        # Calculate the absolute number of NaN values
        nan_count = int(raw_price_df[col].isna().sum())
        # Calculate the ratio of NaN values to total rows
        nan_ratio = float(nan_count / total_rows)

        nan_counts[col] = nan_count
        nan_ratios[col] = nan_ratio

        # Implement the prescribed heuristic: warn if NaN ratio > 5%
        if nan_ratio > 0.05:
            warning_msg = f"Asset '{col}' has a high missing data ratio: {nan_ratio:.2%} ({nan_count} rows)."
            logger.warning(warning_msg)
            warnings_logged.append(warning_msg)

    # Create a temporary forward-filled view of the DataFrame for the positivity check
    # This propagates the last known price forward, simulating the cleansing policy
    temp_ffilled_df = raw_price_df.ffill()

    # Evaluate strict positivity: P_{t,i} > 0
    # We only check non-NaN entries, as leading NaNs will remain NaN after ffill
    # The condition (temp_ffilled_df <= 0) creates a boolean mask of violations
    if (temp_ffilled_df <= 0).any().any():
        # Identify the specific columns containing non-positive values for the error message
        violating_cols = temp_ffilled_df.columns[(temp_ffilled_df <= 0).any()].tolist()
        raise ValueError(f"Data integrity failure: Non-positive prices (<= 0) detected in assets {violating_cols} "
                         f"after temporary forward-fill.")

    # Return the value validation metrics for the audit report
    return {
        "nan_counts_per_asset": nan_counts,
        "nan_ratios_per_asset": nan_ratios,
        "positivity_check_passed": True,
        "warnings": warnings_logged
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_raw_price_df(raw_price_df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the comprehensive validation of the raw time-series price matrix.

    This function acts as a strict gatekeeper for the data engineering phase. It sequentially
    validates the temporal index integrity, the column schema and universe alignment, and the
    numerical value integrity of the ingested price data. It returns a consolidated audit report
    if all checks pass, or raises a fatal exception if any structural or numerical invariant is violated.

    Args:
        raw_price_df (pd.DataFrame): The raw time-series price matrix ingested from the data provider.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Dict[str, Any]: A comprehensive validation report aggregating metrics from all checks.

    Raises:
        TypeError: If the index or columns possess incorrect data types or structures.
        ValueError: If chronological, schema, or numerical invariants are violated.
    """
    logger.info("Commencing raw price DataFrame validation suite.")

    # Initialize the consolidated validation report
    validation_report: Dict[str, Any] = {}

    # Execute Step 1: Validate temporal index integrity
    index_report = _validate_index_integrity(raw_price_df)
    validation_report["index_metrics"] = index_report
    logger.info("Index integrity validation passed.")

    # Extract the canonical universe from the configuration for Step 2
    universe = config["global_setup"]["universe"]

    # Execute Step 2: Validate column schema and universe alignment
    column_report = _validate_column_integrity(raw_price_df, universe)
    validation_report["column_metrics"] = column_report
    logger.info("Column schema and universe alignment validation passed.")

    # Execute Step 3: Validate numerical value integrity and quantify missingness
    value_report = _validate_value_integrity(raw_price_df)
    validation_report["value_metrics"] = value_report
    logger.info("Numerical value integrity validation passed.")

    logger.info("Raw price DataFrame validation suite completed successfully.")

    # Return the consolidated audit artifact
    return validation_report


In [None]:
# Task 3 — Cleanse and normalize raw_price_df (cleanse_price_data)

# ==============================================================================
# Task 3: Cleanse and normalize raw_price_df
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Normalize column structure: canonical ticker order, deterministic MultiIndex flattening, and float64 casting.
# -------------------------------------------------------------------------------------------------------------------------------

def _normalize_column_structure(raw_price_df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
    """
    Normalizes the column structure of the raw price DataFrame using dynamic resolution.

    This function creates a copy of the input data, dynamically detects and flattens a
    MultiIndex by searching all levels for the configured price field, reorders the
    columns to exactly match the canonical universe list, and casts the entire matrix
    to numpy.float64.

    Args:
        raw_price_df (pd.DataFrame): The raw time-series price matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        pd.DataFrame: A normalized DataFrame with a single-level column index.

    Raises:
        ValueError: If the MultiIndex cannot be resolved or if canonical tickers are missing.
        TypeError: If the data cannot be cast to float64.
    """
    # Create a deep copy to prevent mutating the original raw data artifact
    normalized_df = raw_price_df.copy(deep=True)

    universe = config["global_setup"]["universe"]
    price_field = config["data_conventions"]["price_field"]

    # Dynamically resolve and flatten a MultiIndex if present
    if isinstance(normalized_df.columns, pd.MultiIndex):
        logger.info("MultiIndex detected in columns. Attempting dynamic flattening.")
        field_found = False

        # Iterate through all available levels to locate the target price field
        for level_idx in range(normalized_df.columns.nlevels):
            if price_field in normalized_df.columns.get_level_values(level_idx):
                # Extract the cross-section corresponding to the target field
                normalized_df = normalized_df.xs(price_field, axis=1, level=level_idx)
                field_found = True
                logger.debug(f"Successfully flattened MultiIndex using level {level_idx}.")
                break

        # Raise a fatal error if the dynamic search fails
        if not field_found:
            raise ValueError(f"Cannot flatten MultiIndex: price_field '{price_field}' not found in any level.")

    try:
        # Reorder the columns to exactly match the canonical universe list
        normalized_df = normalized_df[universe]
    except KeyError as e:
        raise ValueError(f"Failed to reorder columns to match canonical universe. Missing tickers: {e}")

    try:
        # Cast the entire DataFrame to 64-bit floating point numbers for high precision
        normalized_df = normalized_df.astype(np.float64)
    except ValueError as e:
        raise TypeError(f"Failed to cast normalized DataFrame to float64. Non-numeric data present: {e}")

    return normalized_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Apply the configured missing-data policy: forward-fill then dropna; log row impacts.
# -------------------------------------------------------------------------------------------------------------------------------

def _apply_missing_data_policy(normalized_df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
    """
    Applies the deterministic missing-data handling policy to the normalized DataFrame.

    This function enforces the 'ffill_then_dropna' policy, propagating the last known
    prices forward along the time axis to preserve causality, and subsequently dropping
    any rows that still contain missing values (typically leading NaNs).

    Args:
        normalized_df (pd.DataFrame): The structurally normalized price matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        pd.DataFrame: The cleansed DataFrame with no missing values.

    Raises:
        ValueError: If an unrecognized missing-data handling policy is configured.
    """
    # Extract the pinned missing data handling policy from the configuration
    policy = config["data_conventions"]["missing_data_handling"]

    # Assert that the policy exactly matches the required manuscript convention
    if policy != "ffill_then_dropna":
        raise ValueError(f"Unrecognized missing_data_handling policy: '{policy}'. Expected 'ffill_then_dropna'.")

    # Record the initial number of rows for the audit trail
    initial_row_count = len(normalized_df)

    # Apply forward-fill along the time axis (axis=0) to propagate last known prices
    # This is a causal operation that does not introduce look-ahead bias
    cleansed_df = normalized_df.ffill(axis=0)

    # Record the number of rows after forward-filling (should be identical to initial)
    post_ffill_row_count = len(cleansed_df)

    # Apply dropna along the time axis (axis=0) to remove rows with any remaining NaNs
    # This typically removes leading rows before an asset's first trading day
    cleansed_df = cleansed_df.dropna(axis=0, how='any')

    # Record the final number of rows after dropping NaNs
    final_row_count = len(cleansed_df)
    # Calculate the exact number of rows removed during the dropna phase
    rows_removed = post_ffill_row_count - final_row_count

    # Log the row impacts for institutional auditability and reproducibility
    logger.info(f"Missing data policy '{policy}' applied.")
    logger.info(f"Initial rows: {initial_row_count} | Post-ffill rows: {post_ffill_row_count} | "
                f"Final rows: {final_row_count} | Rows removed: {rows_removed}")

    # Return the cleansed DataFrame
    return cleansed_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Post-clean integrity re-validation: no NaN, strictly positive, sufficient rows for backtest.
# -------------------------------------------------------------------------------------------------------------------------------

def _post_clean_integrity_validation(cleansed_df: pd.DataFrame, config: Dict[str, Any]) -> None:
    """
    Performs a final, rigorous integrity validation on the cleansed DataFrame.

    This function acts as a belt-and-suspenders check, mathematically asserting that
    the matrix contains zero NaNs, strictly positive prices, a monotonically increasing
    index, and a sufficient chronological horizon to execute the full backtest.

    Args:
        cleansed_df (pd.DataFrame): The cleansed and normalized price matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Raises:
        ValueError: If any post-cleansing integrity invariant is violated.
    """
    # Assert that the sum of all NaN values across the entire matrix is exactly zero
    if cleansed_df.isna().sum().sum() != 0:
        raise ValueError("Post-clean validation failed: NaN values remain in the DataFrame.")

    # Assert that every single price in the matrix is strictly greater than zero
    # This prevents undefined logarithms during return computations
    if not (cleansed_df > 0).all().all():
        raise ValueError("Post-clean validation failed: Non-positive prices (<= 0) detected in the DataFrame.")

    # Assert that the chronological index remains strictly monotonically increasing
    if not cleansed_df.index.is_monotonic_increasing:
        raise ValueError("Post-clean validation failed: Index is no longer monotonically increasing.")

    # Assert that the index contains no duplicate timestamps
    if cleansed_df.index.has_duplicates:
        raise ValueError("Post-clean validation failed: Index contains duplicate timestamps.")

    # Extract the lookback window length from the configuration
    lookback_l = config["global_setup"]["lookback_window_L"]
    # Calculate the absolute minimum required rows: lookback window + 12 months of backtesting
    min_required_rows = lookback_l + 12

    # Assert that the cleansed DataFrame contains sufficient rows to complete the study
    if len(cleansed_df) < min_required_rows:
        raise ValueError(f"Post-clean validation failed: Insufficient rows. "
                         f"Have {len(cleansed_df)}, require at least {min_required_rows} (L={lookback_l} + 12).")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_price_data(raw_price_df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
    """
    Orchestrates the cleansing and normalization of the raw time-series price matrix.

    This function executes the data engineering pipeline required to transform raw API
    data into a canonical, mathematically sound artifact. It normalizes the column
    structure, applies deterministic missing-data handling, and rigorously re-validates
    the integrity of the final matrix before releasing it to downstream econometric
    and quantum optimization modules.

    Args:
        raw_price_df (pd.DataFrame): The raw time-series price matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        pd.DataFrame: The canonical, cleansed, and normalized price matrix.

    Raises:
        ValueError: If structural, policy, or numerical invariants are violated.
        TypeError: If data casting fails.
    """
    logger.info("Commencing price data cleansing and normalization pipeline.")

    # Execute Step 1: Normalize column structure, flatten MultiIndex, and cast to float64
    normalized_df = _normalize_column_structure(raw_price_df, config)
    logger.info("Column structure normalized and cast to float64.")

    # Execute Step 2: Apply the deterministic missing-data handling policy
    cleansed_df = _apply_missing_data_policy(normalized_df, config)

    # Execute Step 3: Perform rigorous post-cleansing integrity validation
    _post_clean_integrity_validation(cleansed_df, config)
    logger.info("Post-cleansing integrity validation passed.")

    logger.info("Price data cleansing and normalization pipeline completed successfully.")

    # Return the canonical data artifact for the study
    return cleansed_df


In [None]:
# Task 4 — Construct deterministic monthly rebalance calendar for 2025 (build_rebalance_calendar)

# ==============================================================================
# Task 4: Construct deterministic monthly rebalance calendar for 2025
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Generate 12 candidate month anchors (rule-based) or parse override list.
# -------------------------------------------------------------------------------------------------------------------------------

def _align_timestamp_timezone(ts: pd.Timestamp, target_tz: Optional[datetime.tzinfo]) -> pd.Timestamp:
    """
    Aligns a pandas Timestamp to a target timezone deterministically.

    This helper function abstracts the verbose boilerplate required to safely compare
    or slice DatetimeIndices that may be timezone-naive or timezone-aware.

    Args:
        ts (pd.Timestamp): The timestamp to align.
        target_tz (Optional[datetime.tzinfo]): The target timezone (None if naive).

    Returns:
        pd.Timestamp: The aligned timestamp.
    """
    if target_tz is not None:
        # Target is aware
        if ts.tz is None:
            return ts.tz_localize(target_tz)
        else:
            return ts.tz_convert(target_tz)
    else:
        # Target is naive
        if ts.tz is not None:
            return ts.tz_localize(None)
        else:
            return ts


def _generate_candidate_anchors(config: Dict[str, Any], target_tz: Optional[datetime.tzinfo]) -> List[pd.Timestamp]:
    """
    Generates exactly 12 candidate month anchors for the backtest year.

    This function enforces the precedence rule: if an explicit override list is provided,
    it is parsed and used. Otherwise, it generates theoretical anchors for the first
    calendar day of each month. It utilizes the dedicated helper function to ensure
    clean, DRY timezone alignment.

    Args:
        config (Dict[str, Any]): The validated master study configuration dictionary.
        target_tz (Optional[datetime.tzinfo]): The timezone of the empirical price index.

    Returns:
        List[pd.Timestamp]: A list of exactly 12 chronological candidate anchors.

    Raises:
        ValueError: If the override list is invalid, or if an unrecognized rule is specified.
    """
    rebalance_calendar_cfg = config["rebalance_calendar"]
    override_dates = rebalance_calendar_cfg.get("rebalance_dates_override")
    backtest_year = config["global_setup"]["backtest_year"]

    anchors: List[pd.Timestamp] = []

    # Precedence 1: Explicit override list
    if override_dates is not None and len(override_dates) > 0:
        logger.info("Rebalance schedule precedence: Using explicit 'rebalance_dates_override'.")
        if len(override_dates) != 12:
            raise ValueError(f"Override list must contain exactly 12 dates, got {len(override_dates)}.")
        try:
            for date_str in override_dates:
                ts = pd.to_datetime(date_str)
                anchors.append(ts)
        except Exception as e:
            raise ValueError(f"Failed to parse override dates as ISO strings: {e}")

    # Precedence 2: Rule-based generation
    else:
        rule = rebalance_calendar_cfg.get("rebalance_rule")
        logger.info(f"Rebalance schedule precedence: Using rule '{rule}'.")
        if rule != "first_trading_day_of_month":
            raise ValueError(f"Unrecognized rebalance_rule: '{rule}'. Expected 'first_trading_day_of_month'.")

        for month in range(1, 13):
            ts = pd.Timestamp(year=backtest_year, month=month, day=1)
            anchors.append(ts)

    # Ensure timezone consistency using the abstracted helper function
    aligned_anchors = [_align_timestamp_timezone(ts, target_tz) for ts in anchors]

    return aligned_anchors

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Map each anchor to an actual trading day present in cleaned_price_df.index.
# -------------------------------------------------------------------------------------------------------------------------------

def _map_anchors_to_trading_days(anchors: List[pd.Timestamp], price_index: pd.DatetimeIndex) -> List[pd.Timestamp]:
    """
    Maps theoretical calendar anchors to realized trading days in the empirical dataset.

    For each anchor, this function finds the earliest timestamp in the price index that
    is greater than or equal to the anchor. This operationalizes the 'first trading day'
    logic while strictly adhering to the realized data availability.

    Args:
        anchors (List[pd.Timestamp]): The 12 theoretical candidate anchors.
        price_index (pd.DatetimeIndex): The chronological index of the cleansed price matrix.

    Returns:
        List[pd.Timestamp]: A list of exactly 12 realized trading timestamps.

    Raises:
        ValueError: If an anchor falls beyond the available data, or if mapped dates are not strictly increasing.
    """
    mapped_dates: List[pd.Timestamp] = []

    for i, anchor in enumerate(anchors):
        # Create a boolean mask for all realized dates >= the theoretical anchor
        valid_dates_mask = price_index >= anchor

        # Extract the subset of the index that satisfies the condition
        valid_dates = price_index[valid_dates_mask]

        # Assert that at least one valid date exists
        if len(valid_dates) == 0:
            raise ValueError(f"Data coverage error: No trading days found on or after anchor {anchor.date()}. "
                             f"The dataset does not cover the full backtest horizon.")

        # The earliest date in the valid subset is the realized trading day
        realized_trading_day = valid_dates[0]
        mapped_dates.append(realized_trading_day)

    # Assert that the resulting mapped dates are strictly monotonically increasing
    # This prevents duplicate rebalance dates if the dataset has multi-month gaps
    for i in range(1, len(mapped_dates)):
        if mapped_dates[i] <= mapped_dates[i-1]:
            raise ValueError(f"Mapping error: Mapped rebalance dates are not strictly increasing. "
                             f"Conflict between {mapped_dates[i-1].date()} and {mapped_dates[i].date()}.")

    return mapped_dates

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Validate causality feasibility for every rebalance date and freeze schedule.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_causality_and_freeze(mapped_dates: List[pd.Timestamp], price_index: pd.DatetimeIndex, lookback_l: int) -> Tuple[pd.Timestamp, ...]:
    """
    Validates the causal feasibility of the rebalance schedule and freezes it.

    This function mathematically asserts that for every mapped rebalance date, there
    exist at least L strictly prior trading days in the dataset. This guarantees that
    the rolling estimation windows can be constructed without look-ahead bias.

    Args:
        mapped_dates (List[pd.Timestamp]): The 12 realized trading timestamps.
        price_index (pd.DatetimeIndex): The chronological index of the cleansed price matrix.
        lookback_l (int): The required number of historical trading days (L=180).

    Returns:
        Tuple[pd.Timestamp, ...]: An immutable tuple of the 12 validated rebalance dates.

    Raises:
        ValueError: If any rebalance date lacks sufficient historical data.
    """
    for i, t in enumerate(mapped_dates):
        # Create a boolean mask for strict inequality: tau < t
        # This strictly prevents look-ahead bias by excluding the rebalance date itself
        prior_days_mask = price_index < t

        # Count the number of available historical trading days
        available_history = int(prior_days_mask.sum())

        # Assert that the available history meets or exceeds the lookback requirement
        if available_history < lookback_l:
            raise ValueError(f"Causality violation at rebalance date {t.date()} (Month {i+1}). "
                             f"Requires L={lookback_l} prior trading days, but only {available_history} are available.")

        logger.debug(f"Rebalance {i+1} ({t.date()}): Causality verified. Available pre-history: {available_history} days.")

    # Convert the validated list to an immutable tuple to freeze the schedule
    frozen_schedule = tuple(mapped_dates)
    return frozen_schedule

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def build_rebalance_calendar(cleaned_price_df: pd.DataFrame, config: Dict[str, Any]) -> Tuple[pd.Timestamp, ...]:
    """
    Orchestrates the construction of a deterministic, causal monthly rebalance calendar.

    This function generates theoretical calendar anchors based on the study configuration,
    maps them to realized trading days within the empirical price dataset, and rigorously
    validates that each resulting date possesses sufficient historical data to support
    a causal lookback window of length L. It returns an immutable schedule.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical, cleansed price matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[pd.Timestamp, ...]: An immutable tuple containing exactly 12 validated rebalance timestamps.

    Raises:
        ValueError: If the schedule cannot be generated, mapped, or causally validated.
    """
    logger.info("Commencing deterministic rebalance calendar construction.")

    # Extract the timezone of the empirical data to ensure consistency
    target_tz = cleaned_price_df.index.tz

    # Execute Step 1: Generate theoretical candidate anchors
    anchors = _generate_candidate_anchors(config, target_tz)
    logger.info(f"Generated {len(anchors)} theoretical candidate anchors.")

    # Execute Step 2: Map anchors to realized trading days
    mapped_dates = _map_anchors_to_trading_days(anchors, cleaned_price_df.index)
    logger.info("Successfully mapped anchors to realized empirical trading days.")

    # Extract the lookback window length L from the configuration
    lookback_l = config["global_setup"]["lookback_window_L"]

    # Execute Step 3: Validate causality and freeze the schedule
    frozen_schedule = _validate_causality_and_freeze(mapped_dates, cleaned_price_df.index, lookback_l)

    # Log the final, authoritative schedule for institutional auditability
    logger.info("Rebalance calendar causality validated and frozen.")
    for i, date in enumerate(frozen_schedule):
        logger.info(f"  Rebalance {i+1:02d}: {date.date()}")

    return frozen_schedule


In [None]:
# Task 5 — Build causal rolling-window extraction callable (extract_lookback_window)

# ==============================================================================
# Task 5: Build causal rolling-window extraction callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1: Implement strict causal slicing and take the last L rows prior to t.
# -------------------------------------------------------------------------------------------------------------------------------

def _slice_causal_history(cleaned_price_df: pd.DataFrame, t: pd.Timestamp, L: int) -> pd.DataFrame:
    """
    Extracts a strictly causal historical window of length L prior to timestamp t.

    This function enforces the interval (-infinity, t) by using a strict inequality
    mask on the DatetimeIndex, mathematically guaranteeing the absence of look-ahead
    bias. It then extracts the last L trading days to form the [t-L, t) window.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical, cleansed price matrix.
        t (pd.Timestamp): The current rebalance timestamp.
        L (int): The required lookback window length in trading days.

    Returns:
        pd.DataFrame: A deep copy of the causally sliced historical window.
    """
    # Ensure timezone consistency between the rebalance timestamp and the DataFrame index
    target_tz = cleaned_price_df.index.tz
    if target_tz is not None:
        if t.tz is None:
            # Localize naive timestamp to match aware index
            t_aligned = t.tz_localize(target_tz)
        else:
            # Convert aware timestamp to match index timezone
            t_aligned = t.tz_convert(target_tz)
    else:
        if t.tz is not None:
            # Strip timezone from aware timestamp to match naive index
            t_aligned = t.tz_localize(None)
        else:
            t_aligned = t

    # Create a boolean mask for strict inequality: tau < t
    # This is the critical operation that prevents look-ahead bias
    causal_mask = cleaned_price_df.index < t_aligned

    # Apply the mask to filter out all data at or after the rebalance date
    causal_history = cleaned_price_df.loc[causal_mask]

    # Extract exactly the last L rows from the causal history
    # We use .copy(deep=True) to ensure the returned window is an independent artifact
    window_df = causal_history.tail(L).copy(deep=True)

    if not window_df.empty:
        window_start = window_df.index.min().date()
        window_end = window_df.index.max().date()
        logger.debug(f"Sliced causal window for rebalance {t_aligned.date()}: [{window_start}, {window_end}].")
    else:
        logger.debug(f"Sliced causal window for rebalance {t_aligned.date()} is empty.")

    return window_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 2: Enforce the minimum observation requirement (must be exactly L rows).
# -------------------------------------------------------------------------------------------------------------------------------

def _enforce_window_length(window_df: pd.DataFrame, t: pd.Timestamp, L: int) -> None:
    """
    Mathematically asserts that the extracted window contains exactly L trading days.

    This function prevents the econometric estimators from processing undersized
    samples, which would artificially inflate variance and violate the study's methodology.

    Args:
        window_df (pd.DataFrame): The causally sliced historical window.
        t (pd.Timestamp): The current rebalance timestamp (for error reporting).
        L (int): The required lookback window length.

    Raises:
        ValueError: If the number of rows in the window is not exactly equal to L.
    """
    # Calculate the actual number of rows in the extracted window
    actual_rows = len(window_df)

    # Assert exact equality with the required lookback length
    if actual_rows != L:
        shortfall = L - actual_rows
        raise ValueError(f"Insufficient historical data for rebalance date {t.date()}. "
                         f"Expected exactly L={L} rows, but observed {actual_rows} rows "
                         f"(shortfall of {shortfall} trading days).")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Return the window DataFrame with guaranteed shape and integrity.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_window_integrity(window_df: pd.DataFrame, universe: List[str]) -> None:
    """
    Performs a final integrity validation on the extracted window DataFrame.

    This function acts as a belt-and-suspenders check, asserting that the window
    maintains the canonical column ordering, contains zero NaNs, and consists
    entirely of strictly positive prices.

    Args:
        window_df (pd.DataFrame): The causally sliced and length-validated window.
        universe (List[str]): The canonical list of asset tickers.

    Raises:
        ValueError: If column ordering, NaN, or positivity invariants are violated.
    """
    # Assert that the columns exactly match the canonical universe ordering
    # This is critical for mapping matrix indices to specific assets downstream
    if window_df.columns.tolist() != universe:
        raise ValueError("Window integrity violation: Column ordering does not match the canonical universe.")

    # Assert that the window contains exactly zero NaN values
    if window_df.isna().sum().sum() != 0:
        raise ValueError("Window integrity violation: NaN values detected in the extracted window.")

    # Assert that all prices in the window are strictly positive
    # This guarantees that subsequent logarithmic return calculations are mathematically defined
    if not (window_df > 0).all().all():
        raise ValueError("Window integrity violation: Non-positive prices (<= 0) detected in the extracted window.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def extract_lookback_window(cleaned_price_df: pd.DataFrame, t: pd.Timestamp, L: int, universe: List[str]) -> pd.DataFrame:
    """
    Orchestrates the extraction of a strictly causal, validated rolling estimation window.

    This function isolates the historical price data required for a specific rebalance
    period. It mathematically guarantees the absence of look-ahead bias by strictly
    excluding data at or after timestamp t. It enforces the exact sample size requirement
    (L) and re-validates the structural and numerical integrity of the resulting matrix.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical, cleansed price matrix.
        t (pd.Timestamp): The current rebalance timestamp.
        L (int): The required lookback window length in trading days (e.g., 180).
        universe (List[str]): The canonical list of asset tickers.

    Returns:
        pd.DataFrame: A deep copy of the validated, causal historical window of shape (L, N).

    Raises:
        ValueError: If the window cannot be extracted, lacks sufficient data, or fails integrity checks.
    """
    logger.debug(f"Extracting causal lookback window for rebalance date: {t.date()}")

    # Execute Step 1: Slice the causal history and extract the last L rows
    window_df = _slice_causal_history(cleaned_price_df, t, L)

    # Execute Step 2: Enforce the exact minimum observation requirement
    _enforce_window_length(window_df, t, L)

    # Execute Step 3: Re-validate the structural and numerical integrity of the window
    _validate_window_integrity(window_df, universe)

    logger.debug(f"Successfully extracted and validated window of shape {window_df.shape}.")

    # Return the canonical estimation window artifact for the current month
    return window_df


In [None]:
# Task 6 — Build daily log-return computation callable (compute_daily_log_returns)

# ==============================================================================
# Task 6: Build daily log-return computation callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1: Compute element-wise log returns consistent with the pinned return convention.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_raw_log_returns(window_df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
    """
    Computes the element-wise logarithmic returns for the provided price window.

    This function enforces the pinned 'log' return convention. It calculates the
    natural logarithm of the ratio between the current day's price and the previous
    day's price for all assets simultaneously using vectorized operations.

    Args:
        window_df (pd.DataFrame): The causally sliced, strictly positive price matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        pd.DataFrame: A DataFrame containing the raw log returns (first row will be NaN).

    Raises:
        ValueError: If the configuration does not specify 'log' returns, or if non-positive prices exist.
    """
    # Extract the pinned return type convention
    return_type = config["return_conventions"]["return_type"]

    # Assert strict adherence to the logarithmic return requirement
    if return_type != "log":
        raise ValueError(f"Methodological violation: Expected return_type 'log', but config specifies '{return_type}'.")

    # Defensive check: Ensure all prices are strictly positive to prevent undefined logarithms
    if not (window_df > 0).all().all():
        raise ValueError("Mathematical violation: Non-positive prices detected. Logarithm is undefined.")

    # Compute logarithmic returns: R_{tau, i} = ln(P_{tau, i} / P_{tau-1, i})
    # The .shift(1) operation shifts the data down by one row along the DatetimeIndex
    # np.log applies the natural logarithm element-wise to the resulting ratio matrix
    raw_returns_df = np.log(window_df / window_df.shift(1))

    return raw_returns_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 2: Remove the first row (NaN from shifting) and assert shape and NaN-free output.
# -------------------------------------------------------------------------------------------------------------------------------

def _remove_first_row_and_validate(raw_returns_df: pd.DataFrame, expected_L: int, expected_N: int) -> pd.DataFrame:
    """
    Truncates the shift-induced NaN row and mathematically asserts matrix dimensions.

    This function removes the first chronological observation (which lacks a preceding
    denominator for the return calculation) and rigorously verifies that the resulting
    matrix matches the expected (L-1, N) shape and contains absolutely no missing values.

    Args:
        raw_returns_df (pd.DataFrame): The raw log returns matrix containing a leading NaN row.
        expected_L (int): The expected lookback window length (e.g., 180).
        expected_N (int): The expected number of assets in the universe (e.g., 10).

    Returns:
        pd.DataFrame: The truncated, NaN-free returns matrix.

    Raises:
        ValueError: If the resulting shape is incorrect or if unexpected NaNs are present.
    """
    # Deterministically remove the first row using integer-location based indexing
    # This drops index 0 and retains indices 1 through L-1
    truncated_returns_df = raw_returns_df.iloc[1:].copy(deep=True)

    # Calculate the expected number of rows after truncation
    expected_rows = expected_L - 1

    # Assert the exact shape of the resulting matrix
    actual_shape = truncated_returns_df.shape
    expected_shape = (expected_rows, expected_N)

    if actual_shape != expected_shape:
        raise ValueError(f"Dimensionality violation: Expected returns matrix shape {expected_shape}, "
                         f"but observed {actual_shape}.")

    # Assert that exactly zero NaNs remain in the truncated matrix
    # Any remaining NaNs indicate missing data that bypassed the Phase 1 cleansing
    if truncated_returns_df.isna().sum().sum() != 0:
        raise ValueError("Data integrity violation: Unexpected NaN values detected in the returns matrix "
                         "after removing the initial shift-induced row.")

    return truncated_returns_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Return the daily returns DataFrame with canonical ordering and numeric integrity.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_returns_integrity_and_format(returns_df: pd.DataFrame, universe: List[str]) -> None:
    """
    Performs a final integrity validation on the truncated returns DataFrame.

    This function acts as a strict gatekeeper, ensuring the columns perfectly align
    with the canonical universe, the data type is float64, and all computed returns
    are finite (no infinity or negative infinity).

    Args:
        returns_df (pd.DataFrame): The truncated, NaN-free returns matrix.
        universe (List[str]): The canonical list of asset tickers.

    Raises:
        ValueError: If column ordering is corrupted or if infinite values are detected.
        TypeError: If the matrix is not composed of float64 data.
    """
    # Assert that the columns exactly match the canonical universe ordering
    if returns_df.columns.tolist() != universe:
        raise ValueError("Structural violation: Returns matrix column ordering does not match the canonical universe.")

    # Assert that the data type is float64 to guarantee precision for covariance shrinkage
    if not all(returns_df.dtypes == np.float64):
        raise TypeError("Type violation: Returns matrix must be strictly float64.")

    # Assert that all values are finite (no inf or -inf from extreme price movements)
    if not np.isfinite(returns_df.values).all():
        raise ValueError("Mathematical violation: Infinite values (inf or -inf) detected in the returns matrix.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_daily_log_returns(window_df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
    """
    Orchestrates the computation of the daily logarithmic returns matrix.

    This function transforms a causally sliced price window into a mathematically
    pristine returns matrix. It enforces the logarithmic return convention, handles
    the shift-induced missing data deterministically, and rigorously validates the
    shape, ordering, and numerical finiteness of the resulting artifact before
    releasing it to the econometric estimation modules.

    Args:
        window_df (pd.DataFrame): The causally sliced, strictly positive price matrix of shape (L, N).
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        pd.DataFrame: The canonical daily log-returns matrix of shape (L-1, N).

    Raises:
        ValueError: If mathematical, structural, or configuration invariants are violated.
        TypeError: If data type constraints are violated.
    """
    logger.debug("Commencing daily log-return computation.")

    # Extract expected dimensions from the configuration
    expected_L = config["global_setup"]["lookback_window_L"]
    universe = config["global_setup"]["universe"]
    expected_N = len(universe)

    # Execute Step 1: Compute element-wise log returns
    raw_returns_df = _compute_raw_log_returns(window_df, config)

    # Execute Step 2: Remove the first row and assert shape/NaN-free output
    truncated_returns_df = _remove_first_row_and_validate(raw_returns_df, expected_L, expected_N)

    # Execute Step 3: Validate canonical ordering and numeric integrity
    _validate_returns_integrity_and_format(truncated_returns_df, universe)

    logger.debug(f"Successfully computed log-returns matrix of shape {truncated_returns_df.shape}.")

    # Return the canonical returns artifact for the current month
    return truncated_returns_df


In [None]:
# Task 7 — Build annualized expected return ((\mu)) estimation callable (estimate_mu_annualized)

# ==============================================================================
# Task 7: Build annualized expected return (mu) estimation callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1: Compute the daily sample mean vector from the daily log-return matrix.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_daily_sample_mean(returns_df: pd.DataFrame, universe: List[str]) -> np.ndarray:
    """
    Computes the daily sample mean return vector from the log-returns matrix.

    This function calculates the arithmetic mean of the daily log-returns for each
    asset across the chronological index (T=179). It enforces the canonical universe
    ordering and extracts the result as a high-performance numpy array.

    Args:
        returns_df (pd.DataFrame): The canonical daily log-returns matrix.
        universe (List[str]): The canonical list of asset tickers.

    Returns:
        np.ndarray: A 1D array containing the daily expected returns for each asset.

    Raises:
        ValueError: If the input matrix contains NaNs or if column ordering fails.
    """
    # Defensive check: Ensure the input matrix is mathematically pristine
    if returns_df.isna().sum().sum() != 0:
        raise ValueError("Econometric violation: Cannot compute mean on a matrix containing NaN values.")

    try:
        # Enforce canonical ordering before computation to guarantee index alignment
        aligned_returns = returns_df[universe]
    except KeyError as e:
        raise ValueError(f"Structural violation: Returns matrix is missing canonical tickers: {e}")

    # Compute the arithmetic mean along the time axis (axis=0)
    # Equation: mu_i^(daily) = (1/T) * sum(R_{tau, i})
    daily_mean_series = aligned_returns.mean(axis=0)

    # Extract the underlying values as a float64 numpy array
    # This strips the pandas index overhead, preparing the vector for matrix algebra
    mu_daily = daily_mean_series.values.astype(np.float64)

    return mu_daily

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 2: Annualize the mean vector using the configured annualization factor.
# -------------------------------------------------------------------------------------------------------------------------------

def _annualize_mean_vector(mu_daily: np.ndarray, config: Dict[str, Any]) -> np.ndarray:
    """
    Annualizes the daily expected return vector using linear scaling.

    This function applies the pinned annualization factor (typically 252) to the
    daily mean vector. It strictly adheres to the linear scaling convention for
    logarithmic returns, avoiding geometric compounding.

    Args:
        mu_daily (np.ndarray): The 1D array of daily expected returns.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        np.ndarray: A 1D array containing the annualized expected returns.
    """
    # Extract the pinned annualization factor from the configuration
    annualization_factor = config["global_setup"]["annualization_factor"]

    # Apply linear scaling: mu_i^(ann) = A * mu_i^(daily)
    # This is a vectorized scalar-array multiplication
    mu_annualized = mu_daily * annualization_factor

    return mu_annualized

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 3: Validate and return the annualized mean vector.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_mu_vector_integrity(mu_annualized: np.ndarray, expected_N: int) -> None:
    """
    Performs a rigorous integrity validation on the annualized expected return vector.

    This function mathematically asserts that the resulting vector possesses the
    exact expected shape (N,), is composed strictly of float64 data, and contains
    only finite real numbers.

    Args:
        mu_annualized (np.ndarray): The annualized expected return vector.
        expected_N (int): The expected number of assets in the universe.

    Raises:
        ValueError: If the shape is incorrect or if infinite/NaN values are detected.
        TypeError: If the array is not of type float64.
    """
    # Assert the exact shape of the vector
    if mu_annualized.shape != (expected_N,):
        raise ValueError(f"Dimensionality violation: Expected mu vector shape ({expected_N},), "
                         f"but observed {mu_annualized.shape}.")

    # Assert the data type is strictly float64
    if mu_annualized.dtype != np.float64:
        raise TypeError(f"Type violation: mu vector must be strictly float64, got {mu_annualized.dtype}.")

    # Assert that all elements are finite real numbers
    if not np.isfinite(mu_annualized).all():
        raise ValueError("Mathematical violation: Infinite or NaN values detected in the annualized mu vector.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_mu_annualized(returns_df: pd.DataFrame, config: Dict[str, Any]) -> np.ndarray:
    """
    Orchestrates the estimation of the annualized expected return vector (mu).

    This function computes the daily sample mean from the causally sliced log-returns
    matrix, scales it to an annualized basis using the configured factor, and rigorously
    validates the structural and numerical integrity of the resulting vector. The output
    is the canonical mu vector used in both the quantum (QAOA) and classical (SA) solvers.

    Args:
        returns_df (pd.DataFrame): The canonical daily log-returns matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        np.ndarray: The validated, annualized expected return vector of shape (N,).

    Raises:
        ValueError: If mathematical, structural, or configuration invariants are violated.
        TypeError: If data type constraints are violated.
    """
    logger.debug("Commencing annualized expected return (mu) estimation.")

    # Extract the canonical universe to ensure perfect index alignment
    universe = config["global_setup"]["universe"]
    expected_N = len(universe)

    # Execute Step 1: Compute the daily sample mean vector
    mu_daily = _compute_daily_sample_mean(returns_df, universe)

    # Execute Step 2: Annualize the mean vector
    mu_annualized = _annualize_mean_vector(mu_daily, config)

    # Execute Step 3: Validate the structural and numerical integrity of the vector
    _validate_mu_vector_integrity(mu_annualized, expected_N)

    logger.debug(f"Successfully estimated annualized mu vector of shape {mu_annualized.shape}.")

    # Return the canonical mu artifact for the current month
    return mu_annualized


In [None]:
# Task 8 — Build annualized Ledoit–Wolf covariance ((\Sigma)) estimation callable (estimate_sigma_annualized)

# ==============================================================================
# Task 8: Build annualized Ledoit–Wolf covariance (Sigma) estimation callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1: Compute the daily Ledoit–Wolf shrinkage covariance matrix.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_daily_ledoit_wolf_covariance(returns_df: pd.DataFrame, universe: List[str]) -> np.ndarray:
    """
    Computes the daily Ledoit-Wolf shrinkage covariance matrix from log-returns.

    This function utilizes PyPortfolioOpt to shrink the sample covariance matrix
    towards a structured target (constant correlation). This ensures the resulting
    matrix is well-conditioned and positive-definite, even when N is close to T.
    It explicitly operates on returns data to prevent methodological ambiguity.

    Args:
        returns_df (pd.DataFrame): The canonical daily log-returns matrix.
        universe (List[str]): The canonical list of asset tickers.

    Returns:
        np.ndarray: A 2D array representing the daily covariance matrix.

    Raises:
        ValueError: If the input matrix contains NaNs or if column ordering fails.
    """
    # Defensive check: Ensure the input matrix is mathematically pristine
    if returns_df.isna().sum().sum() != 0:
        raise ValueError("Econometric violation: Cannot compute covariance on a matrix containing NaN values.")

    try:
        # Enforce canonical ordering before computation to guarantee index alignment
        aligned_returns = returns_df[universe]
    except KeyError as e:
        raise ValueError(f"Structural violation: Returns matrix is missing canonical tickers: {e}")

    # Instantiate the CovarianceShrinkage object.
    # We explicitly set returns_data=True to pin the convention and avoid ambiguity.
    shrinkage_model = risk_models.CovarianceShrinkage(aligned_returns, returns_data=True)

    # Compute the Ledoit-Wolf shrinkage estimator
    # This returns a pandas DataFrame with the assets as both index and columns
    daily_cov_df = shrinkage_model.ledoit_wolf()

    # Extract the underlying values as a float64 numpy array
    # This strips the pandas index overhead, preparing the matrix for downstream algebra
    sigma_daily = daily_cov_df.values.astype(np.float64)

    return sigma_daily

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 2: Annualize the covariance matrix.
# -------------------------------------------------------------------------------------------------------------------------------

def _annualize_covariance_matrix(sigma_daily: np.ndarray, config: Dict[str, Any]) -> np.ndarray:
    """
    Annualizes the daily covariance matrix using linear scaling.

    This function applies the pinned annualization factor (typically 252) to the
    daily covariance matrix. It strictly adheres to the linear scaling property
    of variance and covariance with respect to time.

    Args:
        sigma_daily (np.ndarray): The 2D array representing the daily covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        np.ndarray: A 2D array representing the annualized covariance matrix.
    """
    # Extract the pinned annualization factor from the configuration
    annualization_factor = config["global_setup"]["annualization_factor"]

    # Apply linear scaling: Sigma_{ij}^(ann) = A * Sigma_{ij}^(daily)
    # This is a vectorized scalar-matrix multiplication
    sigma_annualized = sigma_daily * annualization_factor

    return sigma_annualized

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 3: Validate symmetry, positive definiteness (within tolerance), finiteness, and return Sigma^(ann).
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_sigma_matrix_integrity(sigma_annualized: np.ndarray, expected_N: int) -> None:
    """
    Performs a rigorous integrity validation on the annualized covariance matrix.

    This function mathematically asserts that the resulting matrix possesses the
    exact expected shape (N, N), is strictly symmetric, is positive-definite
    (allowing for microscopic floating-point noise), and contains only finite numbers.

    Args:
        sigma_annualized (np.ndarray): The annualized covariance matrix.
        expected_N (int): The expected number of assets in the universe.

    Raises:
        ValueError: If shape, symmetry, positive-definiteness, or finiteness invariants are violated.
        TypeError: If the array is not of type float64.
    """
    # Assert the exact shape of the matrix
    if sigma_annualized.shape != (expected_N, expected_N):
        raise ValueError(f"Dimensionality violation: Expected Sigma matrix shape ({expected_N}, {expected_N}), "
                         f"but observed {sigma_annualized.shape}.")

    # Assert the data type is strictly float64
    if sigma_annualized.dtype != np.float64:
        raise TypeError(f"Type violation: Sigma matrix must be strictly float64, got {sigma_annualized.dtype}.")

    # Assert that all elements are finite real numbers
    if not np.isfinite(sigma_annualized).all():
        raise ValueError("Mathematical violation: Infinite or NaN values detected in the annualized Sigma matrix.")

    # Assert strict symmetry: ||Sigma - Sigma^T||_max < 10^-12
    max_asymmetry = np.max(np.abs(sigma_annualized - sigma_annualized.T))
    if max_asymmetry >= 1e-12:
        raise ValueError(f"Mathematical violation: Sigma matrix is not symmetric. Max asymmetry: {max_asymmetry}")

    # Assert positive definiteness by computing the eigenvalues
    # We use eigvalsh because it is highly optimized for symmetric/Hermitian matrices
    eigenvalues = np.linalg.eigvalsh(sigma_annualized)
    min_eigenvalue = np.min(eigenvalues)

    # We allow a microscopic negative tolerance (-10^-10) to account for floating-point noise
    if min_eigenvalue <= -1e-10:
        raise ValueError(f"Mathematical violation: Sigma matrix is not positive-definite. "
                         f"Minimum eigenvalue: {min_eigenvalue}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_sigma_annualized(returns_df: pd.DataFrame, config: Dict[str, Any]) -> np.ndarray:
    """
    Orchestrates the estimation of the annualized Ledoit-Wolf covariance matrix (Sigma).

    This function computes the daily shrinkage covariance from the causally sliced
    log-returns matrix, scales it to an annualized basis using the configured factor,
    and rigorously validates the structural, symmetric, and positive-definite integrity
    of the resulting matrix. The output is the canonical Sigma matrix used in both
    the quantum (QAOA) and classical (SA) solvers.

    Args:
        returns_df (pd.DataFrame): The canonical daily log-returns matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        np.ndarray: The validated, annualized covariance matrix of shape (N, N).

    Raises:
        ValueError: If mathematical, structural, or configuration invariants are violated.
        TypeError: If data type constraints are violated.
    """
    logger.debug("Commencing annualized Ledoit-Wolf covariance (Sigma) estimation.")

    # Extract the canonical universe to ensure perfect index alignment
    universe = config["global_setup"]["universe"]
    expected_N = len(universe)

    # Execute Step 1: Compute the daily Ledoit-Wolf shrinkage covariance matrix
    sigma_daily = _compute_daily_ledoit_wolf_covariance(returns_df, universe)

    # Execute Step 2: Annualize the covariance matrix
    sigma_annualized = _annualize_covariance_matrix(sigma_daily, config)

    # Execute Step 3: Validate the structural and numerical integrity of the matrix
    _validate_sigma_matrix_integrity(sigma_annualized, expected_N)

    logger.debug(f"Successfully estimated annualized Sigma matrix of shape {sigma_annualized.shape}.")

    # Return the canonical Sigma artifact for the current month
    return sigma_annualized


In [None]:
# Task 9 — Build continuity state management callable (update_continuity_state)

# ==============================================================================
# Task 9: Build continuity state management callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 1: Define initialization for first rebalance month (no prior selection).
# -------------------------------------------------------------------------------------------------------------------------------

def _initialize_zero_continuity_state(expected_N: int) -> np.ndarray:
    """
    Initializes a zero-vector continuity state for the first rebalance month.

    This function handles the edge case where no prior portfolio exists. It returns
    a mathematically pristine zero vector, ensuring that downstream equations
    (which subtract the continuity bonus) operate correctly without requiring
    complex conditional logic for the first month.

    Args:
        expected_N (int): The expected number of assets in the universe.

    Returns:
        np.ndarray: A 1D integer array of zeros of shape (N,).
    """
    # Generate a zero vector of the exact required length
    # We explicitly use np.int32 to prevent accidental floating-point comparisons downstream
    s_prev = np.zeros(expected_N, dtype=np.int32)

    logger.debug("No previous selection provided. Initialized zero continuity state.")
    return s_prev

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 2: Convert previous selection into an indicator vector aligned to canonical universe order.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_and_cast_previous_selection(previous_selection: np.ndarray, expected_N: int) -> np.ndarray:
    """
    Validates and casts the previous selection into a canonical indicator vector.

    This function enforces a strict data contract: the input must be a 1D numpy array
    of shape (N,) containing only binary values {0, 1}. It casts the array to an integer
    type to guarantee exact mathematical operations in the solver modules.

    Args:
        previous_selection (np.ndarray): The raw previous selection vector.
        expected_N (int): The expected number of assets in the universe.

    Returns:
        np.ndarray: The validated, integer-cast indicator vector.

    Raises:
        TypeError: If the input is not a numpy array.
        ValueError: If the shape is incorrect or if non-binary values are detected.
    """
    # Assert the input is strictly a numpy array
    if not isinstance(previous_selection, np.ndarray):
        raise TypeError(f"State violation: previous_selection must be a numpy.ndarray, got {type(previous_selection).__name__}.")

    # Assert the exact shape of the vector
    if previous_selection.shape != (expected_N,):
        raise ValueError(f"Dimensionality violation: Expected previous_selection shape ({expected_N},), "
                         f"but observed {previous_selection.shape}.")

    # Assert that the array contains only binary values (0 or 1)
    # We use np.unique to inspect the set of realized values
    unique_values = np.unique(previous_selection)
    if not np.isin(unique_values, [0, 1]).all():
        raise ValueError(f"Mathematical violation: previous_selection must contain only binary values {{0, 1}}. "
                         f"Observed values: {unique_values}")

    # Cast the vector to a strict integer type to prevent floating-point drift
    s_prev = previous_selection.astype(np.int32)

    return s_prev

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 3: Persist and return the indicator vector; log active continuity assets for audit.
# -------------------------------------------------------------------------------------------------------------------------------

def _log_and_return_continuity_state(s_prev: np.ndarray, universe: List[str], solver_name: str) -> np.ndarray:
    """
    Logs the active continuity assets for institutional auditability and returns the vector.

    This function extracts the indices of the previously held assets, maps them to their
    canonical ticker symbols, and logs them. This provides a transparent audit trail
    proving exactly which assets will receive the continuity bonus in the current month.

    Args:
        s_prev (np.ndarray): The validated, integer-cast indicator vector.
        universe (List[str]): The canonical list of asset tickers.
        solver_name (str): The identifier of the solver (e.g., 'SA' or 'QAOA') for logging context.

    Returns:
        np.ndarray: The canonical continuity indicator vector.
    """
    # Extract the integer indices where the indicator vector equals 1
    active_indices = np.where(s_prev == 1)[0]

    # Map the active indices to their corresponding ticker symbols
    active_tickers = [universe[i] for i in active_indices]

    if len(active_tickers) > 0:
        logger.info(f"[{solver_name}] Continuity bonus will be applied to {len(active_tickers)} assets: {active_tickers}")
    else:
        logger.info(f"[{solver_name}] No continuity bonus applied (zero active assets).")

    return s_prev

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def update_continuity_state(previous_selection: Optional[np.ndarray], config: Dict[str, Any], solver_name: str = "UNKNOWN") -> np.ndarray:
    """
    Orchestrates the management of the continuity state indicator vector.

    This function generates the canonical indicator vector s_prev used to apply the
    continuity bonus (kappa) in both the SA QUBO and QAOA cost Hamiltonian. It handles
    the initialization for the first month and rigorously validates the state carried
    over from previous months, ensuring perfect alignment with the canonical universe.

    Args:
        previous_selection (Optional[np.ndarray]): The binary selection vector from the prior month, or None.
        config (Dict[str, Any]): The validated master study configuration dictionary.
        solver_name (str): An identifier for the calling solver (used for audit logging).

    Returns:
        np.ndarray: The canonical continuity indicator vector of shape (N,) and type int32.

    Raises:
        ValueError: If structural or mathematical invariants of the previous selection are violated.
        TypeError: If the previous selection is of an incorrect data type.
    """
    logger.debug(f"[{solver_name}] Commencing continuity state update.")

    # Extract the canonical universe to ensure perfect index alignment
    universe = config["global_setup"]["universe"]
    expected_N = len(universe)

    # Branch logic based on the presence of a previous selection
    if previous_selection is None:
        # Execute Step 1: Initialize zero state for the first month
        s_prev = _initialize_zero_continuity_state(expected_N)
    else:
        # Execute Step 2: Validate and cast the existing state
        s_prev = _validate_and_cast_previous_selection(previous_selection, expected_N)

    # Execute Step 3: Log the active assets and return the canonical vector
    s_prev_final = _log_and_return_continuity_state(s_prev, universe, solver_name)

    return s_prev_final


In [None]:
# Task 10 — Build penalized QUBO matrix construction callable (build_qubo_matrix)

# ==============================================================================
# Task 10: Build penalized QUBO matrix construction callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1: Compute penalty magnitude P deterministically and log it.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_deterministic_penalty(mu_ann: np.ndarray, sigma_ann: np.ndarray, q: float, N: int, penalty_scalar: float) -> float:
    """
    Computes the deterministic penalty magnitude P for the QUBO formulation.

    This function rigorously defines max_coeff as the maximum absolute value among
    all raw return contributions and all raw covariance contributions (including diagonals).
    It then scales this maximum by the configured scalar and the universe size N to
    ensure the penalty strictly enforces the cardinality constraint without causing
    numerical overflow in the energy landscape.

    Args:
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        q (float): The risk aversion parameter.
        N (int): The number of assets in the universe.
        penalty_scalar (float): The scaling multiplier (e.g., 2.5).

    Returns:
        float: The computed penalty magnitude P.
    """
    # Compute the absolute magnitudes of the linear return contributions: |(1-q) * mu_i|
    return_magnitudes = np.abs((1.0 - q) * mu_ann)
    max_return_mag = np.max(return_magnitudes)

    # Extract the upper triangular elements of the covariance matrix (including the diagonal)
    # This ensures we evaluate all unique variances and pairwise covariances
    upper_tri_indices = np.triu_indices_from(sigma_ann)
    sigma_upper_tri = sigma_ann[upper_tri_indices]

    # Compute the absolute magnitudes of the quadratic covariance contributions: |q * Sigma_ij|
    covariance_magnitudes = np.abs(q * sigma_upper_tri)
    max_cov_mag = np.max(covariance_magnitudes)

    # Pin max_coeff as the global maximum across both sets of contributions
    max_coeff = max(max_return_mag, max_cov_mag)

    # Compute the final penalty magnitude P exactly as specified in the manuscript
    # Equation: P = scalar * max_coeff * N
    penalty_P = penalty_scalar * max_coeff * N

    # Determine the dominant contributor type for audit logging
    dominant_type = "Return" if max_return_mag > max_cov_mag else "Covariance"

    logger.debug(f"QUBO Penalty Computation: max_coeff={max_coeff:.6f} (Dominant: {dominant_type}), "
                 f"Scalar={penalty_scalar}, N={N} -> P={penalty_P:.6f}")

    return float(penalty_P)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2: Construct QUBO diagonal and off-diagonal coefficients exactly per Eq. (7).
# -------------------------------------------------------------------------------------------------------------------------------

def _construct_base_qubo_matrix(mu_ann: np.ndarray, sigma_ann: np.ndarray, q: float, K: int, P: float, N: int) -> Dict[Tuple[int, int], float]:
    """
    Constructs the base Quadratic Unconstrained Binary Optimization (QUBO) dictionary.

    This function directly translates Equation (7) from the manuscript into a sparse
    dictionary representation. It strictly populates only the upper triangle (i <= j)
    to prevent double-counting of symmetric interactions in the dimod BQM.

    Args:
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        q (float): The risk aversion parameter.
        K (int): The cardinality constraint.
        P (float): The computed penalty magnitude.
        N (int): The number of assets in the universe.

    Returns:
        Dict[Tuple[int, int], float]: The upper-triangular QUBO dictionary.
    """
    qubo: Dict[Tuple[int, int], float] = {}

    # Iterate over all possible pairs of assets to construct the upper-triangular matrix
    for i in range(N):
        for j in range(i, N):
            if i == j:
                # Diagonal terms (Linear contributions + Diagonal variance + Penalty linear expansion)
                # Equation (7a): Q_ii = q * Sigma_ii - (1-q) * mu_i + P * (1 - 2K)
                q_ii = (q * sigma_ann[i, i]) - ((1.0 - q) * mu_ann[i]) + (P * (1.0 - 2.0 * K))
                qubo[(i, i)] = float(q_ii)
            else:
                # Off-diagonal terms (Pairwise covariance + Penalty quadratic expansion)
                # Equation (7b): Q_ij = 2q * Sigma_ij + 2P  (for i < j)
                q_ij = (2.0 * q * sigma_ann[i, j]) + (2.0 * P)
                qubo[(i, j)] = float(q_ij)

    return qubo

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 3: Apply continuity bonus on the diagonal exactly once, using the pinned arithmetic rule, and return.
# -------------------------------------------------------------------------------------------------------------------------------

def _apply_continuity_bonus(qubo: Dict[Tuple[int, int], float], s_prev: np.ndarray, kappa: float, N: int, universe: List[str]) -> Dict[Tuple[int, int], float]:
    """
    Applies the temporal regularization (continuity bonus) to the QUBO diagonal.

    This function enforces the pinned arithmetic rule 'subtract_kappa_from_linear_term'.
    It reduces the energy cost of previously held assets, discouraging unnecessary
    turnover. It logs the exact modifications for institutional auditability.

    Args:
        qubo (Dict[Tuple[int, int], float]): The base QUBO dictionary.
        s_prev (np.ndarray): The binary continuity indicator vector.
        kappa (float): The continuity bonus strength.
        N (int): The number of assets in the universe.
        universe (List[str]): The canonical list of asset tickers (for logging).

    Returns:
        Dict[Tuple[int, int], float]: The regularized QUBO dictionary.
    """
    modifications = []

    for i in range(N):
        # Check if the asset was held in the previous month
        if s_prev[i] == 1:
            original_val = qubo[(i, i)]
            # Apply the pinned rule: Q_ii <- Q_ii - kappa
            new_val = original_val - kappa
            qubo[(i, i)] = new_val

            # Record the modification for the audit trail
            modifications.append(f"{universe[i]}: {original_val:.4f} -> {new_val:.4f}")

    if modifications:
        logger.debug(f"Applied continuity bonus (kappa={kappa}) to QUBO diagonals: " + " | ".join(modifications))

    return qubo

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def build_qubo_matrix(mu_ann: np.ndarray, sigma_ann: np.ndarray, config: Dict[str, Any], s_prev: np.ndarray) -> Dict[Tuple[int, int], float]:
    """
    Orchestrates the construction of the penalized, regularized QUBO matrix.

    This function translates the constrained portfolio optimization problem into a
    Quadratic Unconstrained Binary Optimization (QUBO) format suitable for Simulated
    Annealing. It computes a deterministic penalty to enforce the K-of-N constraint,
    constructs the base energy landscape per Equation (7), and applies a continuity
    bonus to penalize turnover.

    Args:
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.
        s_prev (np.ndarray): The binary continuity indicator vector from the prior month.

    Returns:
        Dict[Tuple[int, int], float]: The finalized, upper-triangular QUBO dictionary.

    Raises:
        ValueError: If the continuity application rule is unrecognized.
    """
    logger.debug("Commencing penalized QUBO matrix construction.")

    # Extract required parameters from the configuration
    q = config["objective_function"]["risk_aversion_q"]
    K = config["objective_function"]["cardinality_K"]
    penalty_scalar = config["sa_solver"]["sa_penalty_scalar"]
    kappa = config["objective_function"]["continuity_bonus_kappa"]
    rule = config["objective_function"]["continuity_bonus_application_rule"]
    universe = config["global_setup"]["universe"]
    N = len(universe)

    # Assert the continuity rule matches the pinned convention
    if rule != "subtract_kappa_from_linear_term":
        raise ValueError(f"Unrecognized continuity rule: '{rule}'. Expected 'subtract_kappa_from_linear_term'.")

    # Execute Step 1: Compute the deterministic penalty magnitude P
    P = _compute_deterministic_penalty(mu_ann, sigma_ann, q, N, penalty_scalar)

    # Execute Step 2: Construct the base QUBO dictionary per Equation (7)
    base_qubo = _construct_base_qubo_matrix(mu_ann, sigma_ann, q, K, P, N)

    # Execute Step 3: Apply the continuity bonus to the diagonal terms
    final_qubo = _apply_continuity_bonus(base_qubo, s_prev, kappa, N, universe)

    logger.debug(f"Successfully constructed QUBO matrix with {len(final_qubo)} terms.")

    # Return the canonical classical objective function artifact
    return final_qubo


In [None]:
# Task 11 — Build SA sampling callable (run_sa_sampling)

# ==============================================================================
# Task 11: Build SA sampling callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1: Convert QUBO dict into a dimod.BinaryQuadraticModel (BQM) with pinned variable labeling.
# -------------------------------------------------------------------------------------------------------------------------------

def _convert_qubo_to_bqm(qubo: Dict[Tuple[int, int], float], expected_N: int) -> dimod.BinaryQuadraticModel:
    """
    Converts the sparse QUBO dictionary into a dimod BinaryQuadraticModel (BQM).

    This function rigorously enforces the upper-triangular invariant of the input
    dictionary to prevent double-counting of off-diagonal penalties. It instantiates
    the BQM and mathematically asserts that the variable labels are strictly integers
    from 0 to N-1, guaranteeing deterministic mapping back to the canonical asset universe.

    Args:
        qubo (Dict[Tuple[int, int], float]): The upper-triangular QUBO dictionary.
        expected_N (int): The expected number of assets in the universe.

    Returns:
        dimod.BinaryQuadraticModel: The instantiated BQM object.

    Raises:
        ValueError: If the QUBO is not upper-triangular or if variable labels are corrupted.
    """
    # Assert the upper-triangular invariant: i <= j for all keys
    for (i, j) in qubo.keys():
        if i > j:
            raise ValueError(f"QUBO invariant violation: Found lower-triangular key ({i}, {j}). "
                             f"QUBO must be strictly upper-triangular to prevent double-counting.")

    # Instantiate the BQM from the QUBO dictionary
    # We explicitly specify the Vartype as BINARY (0 or 1)
    bqm = dimod.BinaryQuadraticModel.from_qubo(qubo)

    # Extract the variables present in the BQM
    bqm_variables = list(bqm.variables)

    # Assert that exactly N variables exist in the model
    if len(bqm_variables) != expected_N:
        raise ValueError(f"BQM dimensionality violation: Expected {expected_N} variables, "
                         f"but BQM contains {len(bqm_variables)}.")

    # Assert that the variables are strictly integers from 0 to N-1
    expected_variables = list(range(expected_N))
    if sorted(bqm_variables) != expected_variables:
        raise ValueError(f"BQM labeling violation: Expected integer variables {expected_variables}, "
                         f"but observed {sorted(bqm_variables)}.")

    return bqm

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 2: Instantiate neal.SimulatedAnnealingSampler and set seed behavior explicitly.
# -------------------------------------------------------------------------------------------------------------------------------

def _prepare_sampler_and_hyperparameters(config: Dict[str, Any]) -> Tuple[neal.SimulatedAnnealingSampler, int, int, int]:
    """
    Instantiates the Simulated Annealing sampler and extracts pinned hyperparameters.

    This function enforces the determinism contract by explicitly extracting the
    random seed, along with the required number of reads and sweeps, ensuring the
    stochastic heuristic is executed as reproducibly as the underlying library permits.

    Args:
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[neal.SimulatedAnnealingSampler, int, int, int]:
            The sampler instance, num_reads, num_sweeps, and the random seed.
    """
    # Instantiate the highly optimized C++ backed Simulated Annealing sampler
    sampler = neal.SimulatedAnnealingSampler()

    # Extract the pinned hyperparameters from the configuration
    num_reads = config["sa_solver"]["sa_num_reads"]
    num_sweeps = config["sa_solver"]["sa_num_sweeps"]
    sa_seed = config["randomness"]["sa_seed"]

    logger.debug(f"Prepared SA Sampler. Hyperparameters: reads={num_reads}, sweeps={num_sweeps}, seed={sa_seed}")

    return sampler, num_reads, num_sweeps, sa_seed

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 3: Execute sampling and return a dimod.SampleSet without filtering.
# -------------------------------------------------------------------------------------------------------------------------------

def _execute_sa_sampling(bqm: dimod.BinaryQuadraticModel, sampler: neal.SimulatedAnnealingSampler,
                         num_reads: int, num_sweeps: int, sa_seed: int) -> dimod.SampleSet:
    """
    Executes the Simulated Annealing heuristic on the BQM to generate raw samples.

    This function calls the sampler with the pinned hyperparameters. It explicitly
    returns the raw, unfiltered SampleSet, delegating the domain-specific feasibility
    filtering (sum x_i = K) to the subsequent task to maintain strict separation of concerns.

    Args:
        bqm (dimod.BinaryQuadraticModel): The validated BQM representing the energy landscape.
        sampler (neal.SimulatedAnnealingSampler): The instantiated SA sampler.
        num_reads (int): The number of independent heuristic runs.
        num_sweeps (int): The number of sweeps per run.
        sa_seed (int): The random seed for the stochastic process.

    Returns:
        dimod.SampleSet: The raw collection of binary samples and their associated energies.

    Raises:
        RuntimeError: If the sampler fails to return the requested number of reads.
    """
    # Execute the sampling process
    # We pass the seed explicitly to enforce the determinism contract
    sample_set = sampler.sample(bqm, num_reads=num_reads, num_sweeps=num_sweeps, seed=sa_seed)

    # Calculate the total number of samples returned (should equal num_reads)
    total_samples_returned = sum(sample_set.record.num_occurrences)

    # Assert that the sampler honored the num_reads request
    if total_samples_returned != num_reads:
        raise RuntimeError(f"Sampling execution failure: Requested {num_reads} reads, "
                           f"but sampler returned {total_samples_returned}.")

    # Extract minimum and maximum energies for diagnostic logging
    min_energy = sample_set.first.energy
    max_energy = sample_set.record.energy.max()

    logger.debug(f"SA Sampling completed. Returned {total_samples_returned} samples. "
                 f"Energy range: [{min_energy:.4f}, {max_energy:.4f}]")

    return sample_set

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_sa_sampling(qubo: Dict[Tuple[int, int], float], expected_N: int, config: Dict[str, Any]) -> dimod.SampleSet:
    """
    Orchestrates the execution of the Simulated Annealing baseline solver.

    This function translates the sparse QUBO dictionary into a rigorous BinaryQuadraticModel,
    instantiates the neal sampler, and executes the stochastic heuristic using the pinned
    hyperparameters and determinism contract. It returns the raw, unfiltered SampleSet
    for downstream feasibility analysis.

    Args:
        qubo (Dict[Tuple[int, int], float]): The upper-triangular QUBO dictionary.
        expected_N (int): The expected number of assets in the universe.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        dimod.SampleSet: The raw collection of binary samples and their associated energies.

    Raises:
        ValueError: If the QUBO structure or BQM labeling is invalid.
        RuntimeError: If the sampling execution fails.
    """
    logger.info("Commencing Simulated Annealing (SA) sampling execution.")

    # Execute Step 1: Convert the QUBO dictionary to a validated BQM
    bqm = _convert_qubo_to_bqm(qubo, expected_N)

    # Execute Step 2: Prepare the sampler and extract hyperparameters
    sampler, num_reads, num_sweeps, sa_seed = _prepare_sampler_and_hyperparameters(config)

    # Execute Step 3: Execute the sampling process and retrieve the raw SampleSet
    sample_set = _execute_sa_sampling(bqm, sampler, num_reads, num_sweeps, sa_seed)

    logger.info("Simulated Annealing sampling execution completed successfully.")

    # Return the raw SampleSet artifact
    return sample_set


In [None]:
# Task 12 — Build SA feasibility filtering and selection callable (filter_sa_samples)

# ==============================================================================
# Task 12: Build SA feasibility filtering and selection callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1: Iterate through SampleSet and filter to feasibility (sum x_i = K) exactly.
# -------------------------------------------------------------------------------------------------------------------------------

def _filter_feasible_samples(sample_set: Any, K: int, expected_N: int) -> List[Tuple[np.ndarray, float]]:
    """
    Filters the raw SampleSet to retain only strictly feasible bitstrings.

    This function iterates through the unique aggregated records of the stochastic samples,
    mathematically asserting the hard cardinality constraint (sum x_i = K). By iterating
    over .record instead of .data(), it eliminates redundant processing of identical samples,
    significantly improving computational efficiency.

    Args:
        sample_set (dimod.SampleSet): The raw output from the SA sampler.
        K (int): The exact cardinality constraint.
        expected_N (int): The expected number of assets in the universe.

    Returns:
        List[Tuple[np.ndarray, float]]: A list of tuples containing feasible binary vectors and their energies.
    """
    feasible_candidates: List[Tuple[np.ndarray, float]] = []

    # Iterate through the unique aggregated records to avoid redundant processing
    for record in sample_set.record:
        # Extract the sample array directly. Because we pinned the BQM variables to 0..N-1
        # in Task 11, this array is already perfectly aligned with the canonical universe.
        x_vector = record.sample.astype(np.int32)

        # Compute the Hamming weight (sum of the binary vector)
        hamming_weight = np.sum(x_vector)

        # Enforce the strict feasibility constraint
        if hamming_weight == K:
            # record.energy is a scalar float
            feasible_candidates.append((x_vector, float(record.energy)))

    return feasible_candidates

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 2: Select the minimum-energy feasible sample with deterministic tie-breaking.
# -------------------------------------------------------------------------------------------------------------------------------

def _select_optimal_deterministic_candidate(feasible_candidates: List[Tuple[np.ndarray, float]]) -> Tuple[np.ndarray, float]:
    """
    Selects the minimum-energy candidate with a strict lexicographic tie-breaking rule.

    This function sorts the feasible candidates primarily by energy (ascending). To
    guarantee absolute determinism across different environments, it resolves energy
    ties (within a 1e-12 tolerance) by selecting the lexicographically smallest bitstring.

    Args:
        feasible_candidates (List[Tuple[np.ndarray, float]]): The list of feasible candidates.

    Returns:
        Tuple[np.ndarray, float]: The optimal binary vector and its energy.
    """
    # Define a custom sort key that returns a tuple: (energy, lexicographic_tuple)
    # Python sorts tuples element by element, perfectly implementing our tie-break logic
    def sort_key(candidate: Tuple[np.ndarray, float]) -> Tuple[float, Tuple[int, ...]]:
        x_vector, energy = candidate
        # Round energy to 12 decimal places to group floating-point near-ties
        rounded_energy = round(energy, 12)
        # Convert the numpy array to a standard Python tuple for lexicographic comparison
        lexicographic_tuple = tuple(x_vector.tolist())
        return (rounded_energy, lexicographic_tuple)

    # Sort the candidates using the deterministic key
    feasible_candidates.sort(key=sort_key)

    # The optimal candidate is the first element in the sorted list
    optimal_candidate = feasible_candidates[0]

    return optimal_candidate

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Return the selected bitstring as a canonical numpy vector; log feasibility rate and selected assets.
# -------------------------------------------------------------------------------------------------------------------------------

def _log_diagnostics_and_format_output(optimal_x: np.ndarray, optimal_energy: float,
                                       total_reads: int, feasible_count: int, universe: List[str]) -> np.ndarray:
    """
    Logs comprehensive diagnostic telemetry and returns the canonical selection vector.

    This function maps the selected binary vector back to the canonical ticker symbols
    for institutional auditability. It also logs the feasibility rate, which is critical
    for diagnosing the efficacy of the QUBO penalty scalar.

    Args:
        optimal_x (np.ndarray): The selected optimal binary vector.
        optimal_energy (float): The energy of the selected vector.
        total_reads (int): The total number of samples generated by the SA solver.
        feasible_count (int): The number of samples that satisfied the cardinality constraint.
        universe (List[str]): The canonical list of asset tickers.

    Returns:
        np.ndarray: The canonical selection vector x_SA.
    """
    # Calculate the feasibility rate
    feasible_fraction = feasible_count / total_reads

    # Extract the indices of the selected assets
    selected_indices = np.where(optimal_x == 1)[0]

    # Map indices to ticker symbols
    selected_tickers = [universe[i] for i in selected_indices]

    logger.info(f"SA Selection Complete. Optimal Energy: {optimal_energy:.6f}")
    logger.info(f"SA Feasibility Rate: {feasible_fraction:.2%} ({feasible_count}/{total_reads} reads)")
    logger.info(f"SA Selected Assets: {selected_tickers}")

    return optimal_x

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def filter_sa_samples(sample_set: dimod.SampleSet, config: Dict[str, Any]) -> Optional[np.ndarray]:
    """
    Orchestrates the feasibility filtering and optimal selection of Simulated Annealing samples.

    This function processes the raw stochastic output from the SA solver. It rigorously
    enforces the K-of-N cardinality constraint, deterministically selects the minimum-energy
    solution using lexicographic tie-breaking, and logs critical diagnostic telemetry.
    If no feasible samples are found, it gracefully returns None to trigger the HRP fallback.

    Args:
        sample_set (dimod.SampleSet): The raw output from the SA sampler.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Optional[np.ndarray]: The canonical binary selection vector x_SA, or None if zero feasible samples exist.

    Raises:
        ValueError: If the sample dictionaries are malformed.
    """
    logger.debug("Commencing SA feasibility filtering and optimal selection.")

    # Extract required parameters from the configuration
    K = config["objective_function"]["cardinality_K"]
    universe = config["global_setup"]["universe"]
    expected_N = len(universe)
    total_reads = config["sa_solver"]["sa_num_reads"]

    # Execute Step 1: Filter the raw SampleSet to strictly feasible candidates
    feasible_candidates = _filter_feasible_samples(sample_set, K, expected_N)
    feasible_count = len(feasible_candidates)

    # Handle the critical edge case: Zero feasible samples
    if feasible_count == 0:
        logger.warning(f"CRITICAL: SA solver produced ZERO feasible samples (K={K}, N={expected_N}). "
                       f"Penalty scalar may be insufficient. Triggering HRP fallback.")
        return None

    # Execute Step 2: Select the optimal candidate deterministically
    optimal_x, optimal_energy = _select_optimal_deterministic_candidate(feasible_candidates)

    # Execute Step 3: Log diagnostics and format the final output
    final_x_sa = _log_diagnostics_and_format_output(optimal_x, optimal_energy, total_reads, feasible_count, universe)

    return final_x_sa


In [None]:
# Task 13 — Build Dicke state preparation callable (prepare_dicke_state_vector)

# ==============================================================================
# Task 13: Build Dicke state preparation callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1: Enumerate all computational basis indices and identify those with Hamming weight K.
# -------------------------------------------------------------------------------------------------------------------------------

def _identify_valid_basis_indices(N: int, K: int) -> List[int]:
    """
    Enumerates the 2^N Hilbert space and identifies indices with Hamming weight K.

    This function enforces a strict big-endian bit-ordering convention:
    k = sum(x_i * 2^(N-1-i)). This guarantees that wire 0 corresponds to the most
    significant bit (asset 0), and wire N-1 corresponds to the least significant bit.

    Args:
        N (int): The number of qubits (assets in the universe).
        K (int): The required Hamming weight (cardinality constraint).

    Returns:
        List[int]: A list of integer indices corresponding to the valid basis states.

    Raises:
        ValueError: If the number of valid indices does not match the binomial coefficient.
    """
    valid_indices: List[int] = []
    total_states = 2 ** N

    # Iterate through all possible computational basis states
    for k in range(total_states):
        # Use Python's highly optimized native bit counting method
        if k.bit_count() == K:
            valid_indices.append(k)

    # Mathematically assert that the number of valid states equals N choose K
    expected_count = math.comb(N, K)
    if len(valid_indices) != expected_count:
        raise ValueError(f"Combinatorial violation: Expected {expected_count} valid states "
                         f"for N={N}, K={K}, but found {len(valid_indices)}.")

    return valid_indices

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 2: Construct the amplitude vector of length 2^N with equal amplitudes on the valid indices.
# -------------------------------------------------------------------------------------------------------------------------------

def _construct_amplitude_vector(valid_indices: List[int], N: int, K: int) -> np.ndarray:
    """
    Constructs the dense statevector array for the Dicke state |D^K_N>.

    This function initializes a zero vector of length 2^N and assigns the uniform
    real amplitude to all valid indices. It explicitly casts the Python list of indices
    to a numpy array before applying the mask, ensuring optimal memory access patterns
    and preventing implicit type coercion overhead.

    Args:
        valid_indices (List[int]): The list of integer indices with Hamming weight K.
        N (int): The number of qubits.
        K (int): The required Hamming weight.

    Returns:
        np.ndarray: The dense 1D statevector array.
    """
    total_states = 2 ** N

    # Initialize a strictly real, double-precision zero vector
    statevector = np.zeros(total_states, dtype=np.float64)

    # Compute the exact uniform amplitude
    combinatorial_count = math.comb(N, K)
    uniform_amplitude = 1.0 / math.sqrt(combinatorial_count)

    # Explicitly cast the Python list to a numpy array of integers
    # This is a critical type-hygiene step for optimal advanced indexing
    indices_array = np.array(valid_indices, dtype=np.int32)

    # Apply the amplitude to all valid indices simultaneously
    statevector[indices_array] = uniform_amplitude

    return statevector

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 3: Validate normalization and sparsity exactly; return the amplitude vector.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_statevector_integrity(statevector: np.ndarray, N: int, K: int) -> None:
    """
    Performs rigorous quantum mechanical validation on the constructed statevector.

    This function mathematically asserts that the statevector is perfectly normalized
    (L2 norm squared equals 1.0) and strictly obeys the sparsity constraint (exactly
    N choose K non-zero elements).

    Args:
        statevector (np.ndarray): The constructed statevector array.
        N (int): The number of qubits.
        K (int): The required Hamming weight.

    Raises:
        ValueError: If normalization or sparsity invariants are violated.
    """
    # Assert perfect normalization: sum(|a_k|^2) == 1.0
    # We use a strict tolerance of 1e-12 to account for floating-point arithmetic
    l2_norm_squared = np.sum(np.square(statevector))
    if not math.isclose(l2_norm_squared, 1.0, abs_tol=1e-12):
        raise ValueError(f"Quantum mechanics violation: Statevector is not normalized. "
                         f"L2 norm squared = {l2_norm_squared:.15f}")

    # Assert strict sparsity: exactly N choose K non-zero elements
    expected_non_zeros = math.comb(N, K)
    actual_non_zeros = np.count_nonzero(statevector)
    if actual_non_zeros != expected_non_zeros:
        raise ValueError(f"Sparsity violation: Expected exactly {expected_non_zeros} non-zero amplitudes, "
                         f"but observed {actual_non_zeros}.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_dicke_state_vector(N: int, K: int) -> np.ndarray:
    """
    Orchestrates the preparation of the canonical Dicke state |D^K_N> statevector.

    This function generates the exact initial quantum state required for the constraint-
    preserving QAOA ansatz. It enumerates the Hilbert space under a pinned big-endian
    convention, constructs the uniform superposition over the feasible K-hot subspace,
    and rigorously validates the quantum mechanical properties of the resulting vector.

    Args:
        N (int): The number of qubits (assets in the universe).
        K (int): The cardinality constraint (number of assets to select).

    Returns:
        np.ndarray: The validated, dense statevector of length 2^N and type float64.

    Raises:
        ValueError: If combinatorial, normalization, or sparsity invariants are violated.
    """
    logger.debug(f"Commencing Dicke state preparation for N={N}, K={K}.")

    # Execute Step 1: Identify all valid basis indices in the 2^N space
    valid_indices = _identify_valid_basis_indices(N, K)

    # Execute Step 2: Construct the dense amplitude vector
    statevector = _construct_amplitude_vector(valid_indices, N, K)

    # Execute Step 3: Validate normalization and sparsity
    _validate_statevector_integrity(statevector, N, K)

    logger.debug(f"Successfully prepared and validated Dicke statevector of length {len(statevector)}.")

    # Return the canonical initial state artifact
    return statevector


In [None]:
# Task 14 — Build XY mixer Hamiltonian callable (build_xy_mixer_hamiltonian)

# ==============================================================================
# Task 14: Build XY mixer Hamiltonian callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1: Generate the interaction edge set E deterministically from the configured topology.
# -------------------------------------------------------------------------------------------------------------------------------

def _generate_interaction_edges(N: int, graph_type: str) -> List[Tuple[int, int]]:
    """
    Generates the deterministic edge set for the XY mixer interaction graph.

    This function enforces the pinned 'complete' graph topology specified in the
    manuscript. It generates all unique pairwise interactions (i < j) in a strict
    lexicographic order to guarantee absolute reproducibility of the Hamiltonian structure.

    Args:
        N (int): The number of qubits (assets in the universe).
        graph_type (str): The configured graph topology (must be 'complete').

    Returns:
        List[Tuple[int, int]]: A lexicographically sorted list of edge tuples.

    Raises:
        ValueError: If the graph topology is unrecognized or if the edge count is incorrect.
    """
    # Assert strict adherence to the pinned complete graph topology
    if graph_type != "complete":
        raise ValueError(f"Topology violation: Expected 'complete' graph type, got '{graph_type}'.")

    edges: List[Tuple[int, int]] = []

    # Generate edges lexicographically: 0 <= i < j <= N-1
    for i in range(N):
        for j in range(i + 1, N):
            edges.append((i, j))

    # Mathematically assert the correct number of edges for a complete graph
    expected_edges = math.comb(N, 2)
    if len(edges) != expected_edges:
        raise ValueError(f"Combinatorial violation: Expected {expected_edges} edges for a complete graph "
                         f"of size {N}, but generated {len(edges)}.")

    return edges

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 2: Construct H_XY term-by-term as a PennyLane Hamiltonian with exactly 2|E| Pauli terms.
# -------------------------------------------------------------------------------------------------------------------------------

def _construct_xy_hamiltonian(edges: List[Tuple[int, int]]) -> qml.Hamiltonian:
    """
    Constructs the PennyLane Hamiltonian object for the XY mixer.

    This function directly translates Equation (3) into quantum observables. For each
    edge (i, j), it generates the partial SWAP generators (X_i @ X_j) and (Y_i @ Y_j)
    with a strict coefficient of 1.0.

    NOTE: Utilizing qml.Hamiltonian for exact fidelity with the manuscript's era.
    In modern PennyLane (v0.36+), this should be migrated to qml.ops.LinearCombination
    or constructed via qml.dot(coeffs, observables) for optimal performance and
    future-proofing.

    Args:
        edges (List[Tuple[int, int]]): The deterministic list of interaction edges.

    Returns:
        qml.Hamiltonian: The instantiated XY mixer Hamiltonian.
    """
    coeffs: List[float] = []
    observables: List[Any] = []

    # Iterate through the deterministic edge list
    for i, j in edges:
        # Term 1: X_i (x) X_j
        coeffs.append(1.0)
        observables.append(qml.PauliX(i) @ qml.PauliX(j))

        # Term 2: Y_i (x) Y_j
        coeffs.append(1.0)
        observables.append(qml.PauliY(i) @ qml.PauliY(j))

    # Instantiate the PennyLane Hamiltonian
    # This object encapsulates the entire mixing evolution operator
    h_xy = qml.Hamiltonian(coeffs, observables)

    return h_xy

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 3: Validate expected term count and rely on the commutation property as the feasibility guarantee.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_hamiltonian_structure(h_xy: qml.Hamiltonian, expected_edges: int) -> None:
    """
    Performs rigorous structural validation on the constructed XY Hamiltonian.

    This function mathematically asserts that the Hamiltonian contains exactly the
    expected number of terms (2 * |E|). This structural guarantee is the operational
    condition under which the physical commutation property [H_XY, sum(Z_i)] = 0 holds,
    ensuring the QAOA evolution remains strictly within the feasible K-hot subspace.

    Args:
        h_xy (qml.Hamiltonian): The constructed XY mixer Hamiltonian.
        expected_edges (int): The number of edges in the interaction graph.

    Raises:
        ValueError: If the number of terms in the Hamiltonian is incorrect.
    """
    # Calculate the expected number of Pauli terms
    expected_terms = 2 * expected_edges

    # Extract the actual number of coefficients in the Hamiltonian
    actual_terms = len(h_xy.coeffs)

    # Assert strict equality
    if actual_terms != expected_terms:
        raise ValueError(f"Structural violation: Expected exactly {expected_terms} terms in the XY Hamiltonian, "
                         f"but observed {actual_terms}.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def build_xy_mixer_hamiltonian(N: int, config: Dict[str, Any]) -> qml.Hamiltonian:
    """
    Orchestrates the construction of the constraint-preserving XY mixer Hamiltonian.

    This function generates the canonical mixing operator required for the QAOA ansatz.
    It enforces the complete graph topology, constructs the exact Pauli tensor products
    required for the particle-preserving partial SWAP operations, and rigorously validates
    the structural integrity of the resulting quantum object. This Hamiltonian is built
    once and reused across all rebalance months.

    Args:
        N (int): The number of qubits (assets in the universe).
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        qml.Hamiltonian: The validated, canonical XY mixer Hamiltonian.

    Raises:
        ValueError: If topological or structural invariants are violated.
    """
    logger.debug(f"Commencing XY mixer Hamiltonian construction for N={N}.")

    # Extract the pinned graph topology from the configuration
    graph_type = config["qaoa_architecture"]["xy_mixer_graph_type"]

    # Execute Step 1: Generate the deterministic interaction edge set
    edges = _generate_interaction_edges(N, graph_type)

    # Execute Step 2: Construct the Hamiltonian term-by-term
    h_xy = _construct_xy_hamiltonian(edges)

    # Execute Step 3: Validate the structural integrity of the Hamiltonian
    _validate_hamiltonian_structure(h_xy, len(edges))

    logger.debug(f"Successfully constructed XY mixer Hamiltonian with {len(h_xy.coeffs)} terms.")

    # Return the canonical mixer artifact
    return h_xy


In [None]:
# Task 15 — Build cost Hamiltonian construction callable (build_cost_hamiltonian)

# ==============================================================================
# Task 15: Build cost Hamiltonian construction callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 1: Compute linear coefficients alpha_i exactly as defined in Eq. (5), apply continuity bonus, and log.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_linear_coefficients(mu_ann: np.ndarray, q: float, s_prev: np.ndarray, kappa: float, universe: List[str]) -> np.ndarray:
    """
    Computes the linear coefficients (alpha_i) for the cost Hamiltonian.

    This function strictly implements the definition from Equation (5):
    alpha_i = -(1-q) * mu_i. It then applies the pinned continuity rule by
    subtracting kappa for previously held assets, lowering their energy cost.

    Args:
        mu_ann (np.ndarray): The annualized expected return vector.
        q (float): The risk aversion parameter.
        s_prev (np.ndarray): The binary continuity indicator vector.
        kappa (float): The continuity bonus strength.
        universe (List[str]): The canonical list of asset tickers (for logging).

    Returns:
        np.ndarray: The computed alpha vector of shape (N,).
    """
    # Compute the base linear coefficients exactly per Eq. (5)
    alpha_base = -(1.0 - q) * mu_ann

    # Create a copy to store the regularized coefficients
    alpha_final = np.copy(alpha_base)

    modifications = []
    N = len(universe)

    # Apply the continuity bonus (discount) to previously held assets
    for i in range(N):
        if s_prev[i] == 1:
            original_val = alpha_base[i]
            # Apply the pinned rule: alpha_i <- alpha_i - kappa
            new_val = original_val - kappa
            alpha_final[i] = new_val

            # Record the modification for the audit trail
            modifications.append(f"{universe[i]}: {original_val:.4f} -> {new_val:.4f}")

    if modifications:
        logger.debug(f"Applied continuity discount (kappa={kappa}) to Cost Hamiltonian alphas: " + " | ".join(modifications))

    return alpha_final

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 2: Compute pairwise coefficients beta_ij exactly and only for i < j.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_pairwise_coefficients(sigma_ann: np.ndarray, q: float, N: int) -> List[Tuple[Tuple[int, int], float]]:
    """
    Computes the pairwise quadratic coefficients (beta_ij) for the cost Hamiltonian.

    This function strictly implements the definition from Equation (5):
    beta_ij = 2q * Sigma_ij. It enforces the strict inequality i < j to extract
    exactly the unique off-diagonal interactions, preventing the inclusion of
    identity-shifting diagonal terms.

    Args:
        sigma_ann (np.ndarray): The annualized covariance matrix.
        q (float): The risk aversion parameter.
        N (int): The number of assets in the universe.

    Returns:
        List[Tuple[Tuple[int, int], float]]: A list of tuples containing the edge (i, j) and its beta coefficient.
    """
    beta_coefficients: List[Tuple[Tuple[int, int], float]] = []

    # Iterate over all unique pairs (i < j)
    for i in range(N):
        for j in range(i + 1, N):
            # Compute the pairwise coefficient exactly per Eq. (5)
            beta_ij = 2.0 * q * sigma_ann[i, j]
            beta_coefficients.append(((i, j), float(beta_ij)))

    # Mathematically assert the correct number of pairwise coefficients
    expected_count = math.comb(N, 2)
    if len(beta_coefficients) != expected_count:
        raise ValueError(f"Combinatorial violation: Expected {expected_count} beta coefficients, "
                         f"but computed {len(beta_coefficients)}.")

    return beta_coefficients

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 3: Assemble the PennyLane Hamiltonian with exactly 55 terms and return it as H_C.
# -------------------------------------------------------------------------------------------------------------------------------

def _assemble_cost_hamiltonian(alpha: np.ndarray, beta_list: List[Tuple[Tuple[int, int], float]], N: int) -> qml.Hamiltonian:
    """
    Assembles the PennyLane Hamiltonian object for the cost function.

    This function constructs the Ising Hamiltonian H_C = sum(alpha_i * Z_i) + sum(beta_ij * Z_i Z_j).
    It rigorously validates that the resulting quantum object contains exactly the
    expected number of terms (N + N choose 2).

    Note: This Hamiltonian represents the proxy energy landscape for QAOA training.
    Final portfolio selection relies on classical rescoring via Eq. (1).

    Args:
        alpha (np.ndarray): The linear coefficient vector.
        beta_list (List[Tuple[Tuple[int, int], float]]): The list of pairwise coefficients.
        N (int): The number of assets in the universe.

    Returns:
        qml.Hamiltonian: The instantiated cost Hamiltonian.

    Raises:
        ValueError: If the number of terms in the Hamiltonian is incorrect.
    """
    coeffs: List[float] = []
    observables: List[Any] = []

    # Add the linear terms: alpha_i * Z_i
    for i in range(N):
        coeffs.append(float(alpha[i]))
        observables.append(qml.PauliZ(i))

    # Add the quadratic terms: beta_ij * (Z_i @ Z_j)
    for (i, j), beta_ij in beta_list:
        coeffs.append(beta_ij)
        observables.append(qml.PauliZ(i) @ qml.PauliZ(j))

    # Instantiate the PennyLane Hamiltonian
    h_c = qml.Hamiltonian(coeffs, observables)

    # Calculate the expected number of terms: N linear + (N choose 2) quadratic
    expected_terms = N + math.comb(N, 2)

    # Extract the actual number of coefficients in the Hamiltonian
    actual_terms = len(h_c.coeffs)

    # Assert strict equality
    if actual_terms != expected_terms:
        raise ValueError(f"Structural violation: Expected exactly {expected_terms} terms in the Cost Hamiltonian, "
                         f"but observed {actual_terms}.")

    return h_c

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def build_cost_hamiltonian(mu_ann: np.ndarray, sigma_ann: np.ndarray, config: Dict[str, Any], s_prev: np.ndarray) -> qml.Hamiltonian:
    """
    Orchestrates the construction of the regularized QAOA cost Hamiltonian.

    This function translates the econometric parameters into the Ising model coefficients
    defined in Equation (5). It applies the temporal regularization (continuity bonus)
    to the linear terms, constructs the exact Pauli-Z tensor products, and rigorously
    validates the structural integrity of the resulting quantum object.

    Args:
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.
        s_prev (np.ndarray): The binary continuity indicator vector from the prior month.

    Returns:
        qml.Hamiltonian: The validated, canonical cost Hamiltonian for the current month.

    Raises:
        ValueError: If the continuity application rule is unrecognized or structural invariants fail.
    """
    logger.debug("Commencing cost Hamiltonian construction.")

    # Extract required parameters from the configuration
    q = config["objective_function"]["risk_aversion_q"]
    kappa = config["objective_function"]["continuity_bonus_kappa"]
    rule = config["objective_function"]["continuity_bonus_application_rule"]
    universe = config["global_setup"]["universe"]
    N = len(universe)

    # Assert the continuity rule matches the pinned convention
    if rule != "subtract_kappa_from_linear_term":
        raise ValueError(f"Unrecognized continuity rule: '{rule}'. Expected 'subtract_kappa_from_linear_term'.")

    # Execute Step 1: Compute linear coefficients (alpha) and apply continuity bonus
    alpha = _compute_linear_coefficients(mu_ann, q, s_prev, kappa, universe)

    # Execute Step 2: Compute pairwise coefficients (beta)
    beta_list = _compute_pairwise_coefficients(sigma_ann, q, N)

    # Execute Step 3: Assemble and validate the PennyLane Hamiltonian
    h_c = _assemble_cost_hamiltonian(alpha, beta_list, N)

    logger.debug(f"Successfully constructed Cost Hamiltonian with {len(h_c.coeffs)} terms.")

    # Return the canonical cost Hamiltonian artifact
    return h_c


In [None]:
# Task 16 — Build Trotterized initialization and single-depth QAOA training callable (train_qaoa_single_depth)

# ==================================================================================
# Task 16: Build Trotterized initialization and single-depth QAOA training callable
# ==================================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 1: Generate initial parameters via Trotterized ramp.
# -------------------------------------------------------------------------------------------------------------------------------

def _generate_trotterized_parameters(p: int, config: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates the initial QAOA parameters using a deterministic Trotterized ramp.

    This function strictly implements the discretized adiabatic schedule derived from
    Equation (6). It computes a linear ramp between the configured bounds, ensuring
    gamma monotonically increases and beta monotonically decreases. It explicitly
    handles the p=1 degeneracy by returning the maximum bounds.

    Args:
        p (int): The circuit depth.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[np.ndarray, np.ndarray]: The initial gamma and beta parameter arrays of length p.

    Raises:
        ValueError: If the initialization rule is unrecognized.
    """
    # Extract the pinned initialization rule and bounds
    rule = config["qaoa_architecture"]["trotter_initialization_rule"]
    if rule != "linear_ramp_between_bounds_per_depth":
        raise ValueError(f"Unrecognized trotter rule: '{rule}'.")

    gamma_min, gamma_max = config["qaoa_architecture"]["qaoa_gamma_bounds"]
    beta_max, beta_min = config["qaoa_architecture"]["qaoa_beta_bounds"]

    gamma_init = np.zeros(p, dtype=np.float64)
    beta_init = np.zeros(p, dtype=np.float64)

    # Compute the linear ramp: l ranges from 1 to p
    for index in range(p):
        l = index + 1
        # Equation: gamma_l = gamma_min + (l/p) * (gamma_max - gamma_min)
        gamma_init[index] = gamma_min + (l / p) * (gamma_max - gamma_min)
        # Equation: beta_l = beta_max - (l/p) * (beta_max - beta_min)
        beta_init[index] = beta_max - (l / p) * (beta_max - beta_min)

    # Log the initialization vectors for auditability
    logger.debug(f"Trotter Init (p={p}): gamma={np.round(gamma_init, 4).tolist()}, beta={np.round(beta_init, 4).tolist()}")

    return gamma_init, beta_init

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2: Construct the PennyLane QNode exactly.
# -------------------------------------------------------------------------------------------------------------------------------

def _construct_qaoa_qnode(dicke_vector: np.ndarray, h_c: qml.Hamiltonian, h_xy: qml.Hamiltonian, p: int, N: int) -> qml.QNode:
    """
    Constructs the exact PennyLane QNode for the constraint-preserving QAOA ansatz.

    This function defines the quantum execution graph. It initializes the system in
    the canonical Dicke state, applies p alternating layers of the Cost and Mixer
    Hamiltonians using exactly 1 Trotter step, and returns the expectation value
    of the Cost Hamiltonian.

    Args:
        dicke_vector (np.ndarray): The dense amplitude vector for |D^K_N>.
        h_c (qml.Hamiltonian): The proxy cost Hamiltonian.
        h_xy (qml.Hamiltonian): The constraint-preserving XY mixer Hamiltonian.
        p (int): The circuit depth.
        N (int): The number of qubits.

    Returns:
        qml.QNode: The callable quantum node.
    """
    # Instantiate the statevector simulator device
    dev = qml.device("default.qubit", wires=N)

    @qml.qnode(dev)
    def qaoa_circuit(params: List[np.ndarray]) -> float:
        gamma, beta = params[0], params[1]

        # Step 1: Prepare the initial Dicke state
        qml.QubitStateVector(dicke_vector, wires=range(N))

        # Step 2: Apply p alternating layers of Cost and Mixer evolutions
        for l in range(p):
            # Apply Cost evolution: exp(-i * gamma_l * H_C)
            qml.ApproxTimeEvolution(h_c, gamma[l], 1)
            # Apply Mixer evolution: exp(-i * beta_l * H_XY)
            qml.ApproxTimeEvolution(h_xy, beta[l], 1)

        # Step 3: Measure the expectation value of the Cost Hamiltonian
        return qml.expval(h_c)

    return qaoa_circuit

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 3: Run Adam optimization with deterministic early stopping; log trajectory.
# -------------------------------------------------------------------------------------------------------------------------------

def _execute_adam_optimization(qnode: qml.QNode, initial_params: List[np.ndarray], config: Dict[str, Any]) -> Tuple[List[np.ndarray], Dict[str, Any]]:
    """
    Executes the Adam optimization loop with rigorous early stopping and optimized telemetry.

    This function trains the QAOA parameters to minimize the expected cost. It utilizes
    qml.math.value_and_grad to extract both the cost and the gradient in a single quantum
    circuit evaluation, eliminating the severe computational overhead of redundant passes.
    It manually applies the gradient update and logs the exact L2 gradient norm.

    Args:
        qnode (qml.QNode): The callable QAOA circuit.
        initial_params (List[np.ndarray]): The initial [gamma, beta] arrays.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[List[np.ndarray], Dict[str, Any]]: The optimized parameters and the comprehensive training log.
    """
    # Extract optimization hyperparameters
    opt_cfg = config["qaoa_optimization"]
    stepsize = opt_cfg["qaoa_stepsize"]
    epsilon = opt_cfg["qaoa_epsilon"]
    max_iterations = opt_cfg["qaoa_max_iterations"]

    # Extract early stopping parameters
    es_cfg = opt_cfg["qaoa_early_stopping"]
    patience = es_cfg["patience"]
    min_delta = es_cfg["min_delta"]

    # Instantiate the PennyLane Adam optimizer
    opt = qml.AdamOptimizer(stepsize=stepsize, eps=epsilon)

    # Initialize state variables
    params = initial_params
    best_cost = float('inf')
    patience_counter = 0

    # Initialize telemetry storage
    training_log: Dict[str, Any] = {
        "cost_trajectory": [],
        "gradient_norms": [],
        "iterations_to_convergence": max_iterations,
        "stopping_reason": "max_iterations_reached",
        "final_expected_cost": 0.0
    }

    # Define the single-pass value and gradient function
    # This is the critical optimization that halves the quantum simulation overhead
    val_and_grad_fn = qml.math.value_and_grad(qnode)

    for i in range(max_iterations):
        # Execute the single forward/backward pass
        # cost is a scalar float, gradients is a tuple of arrays matching params
        cost, gradients = val_and_grad_fn(params)

        # Compute the L2 norm of the flattened gradient vector
        # Equation: ||nabla||_2 = sqrt( sum(dC/dgamma)^2 + sum(dC/dbeta)^2 )
        flat_grad = np.concatenate([np.ravel(g) for g in gradients])
        grad_norm = float(np.linalg.norm(flat_grad))

        # Store telemetry
        training_log["cost_trajectory"].append(float(cost))
        training_log["gradient_norms"].append(grad_norm)

        # Manually apply the gradient update using the optimizer's step function
        # This replaces the redundant opt.step_and_cost call
        params = opt.apply_grad(gradients, params)

        # Evaluate deterministic early stopping logic
        if cost < best_cost - min_delta:
            best_cost = cost
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= patience:
            training_log["iterations_to_convergence"] = i + 1
            training_log["stopping_reason"] = f"early_stopping_triggered_at_patience_{patience}"
            logger.debug(f"Early stopping triggered at iteration {i+1}. Best cost: {best_cost:.6f}")
            break

    training_log["final_expected_cost"] = float(best_cost)

    return params, training_log

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def train_qaoa_single_depth(p: int, dicke_vector: np.ndarray, h_c: qml.Hamiltonian, h_xy: qml.Hamiltonian, config: Dict[str, Any]) -> Tuple[List[np.ndarray], Dict[str, Any]]:
    """
    Orchestrates the Trotterized initialization and training of a single-depth QAOA circuit.

    This function executes the Variational Quantum Algorithm (VQA) loop for a specific
    circuit depth p. It generates deterministic initial parameters to mitigate Barren
    Plateaus, constructs the exact constraint-preserving quantum execution graph, and
    optimizes the parameters using Adam with rigorous early stopping. It returns the
    optimized parameters and the comprehensive telemetry required for Table II diagnostics.

    Args:
        p (int): The circuit depth to train.
        dicke_vector (np.ndarray): The canonical initial statevector.
        h_c (qml.Hamiltonian): The proxy cost Hamiltonian.
        h_xy (qml.Hamiltonian): The constraint-preserving XY mixer Hamiltonian.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[List[np.ndarray], Dict[str, Any]]: The optimized [gamma, beta] parameters and the training log.
    """
    logger.debug(f"Commencing QAOA training for depth p={p}.")

    # Extract the universe size to define the number of qubits
    N = len(config["global_setup"]["universe"])

    # Execute Step 1: Generate initial parameters via Trotterized ramp
    gamma_init, beta_init = _generate_trotterized_parameters(p, config)
    initial_params = [gamma_init, beta_init]

    # Execute Step 2: Construct the PennyLane QNode
    qnode = _construct_qaoa_qnode(dicke_vector, h_c, h_xy, p, N)

    # Execute Step 3: Run Adam optimization and collect telemetry
    optimized_params, training_log = _execute_adam_optimization(qnode, initial_params, config)

    logger.debug(f"QAOA training for p={p} completed in {training_log['iterations_to_convergence']} iterations. "
                 f"Final Cost: {training_log['final_expected_cost']:.6f}, "
                 f"Final Grad Norm: {training_log['gradient_norms'][-1]:.6e}")

    # Return the trained parameters and the diagnostic artifact
    return optimized_params, training_log


In [None]:
# Task 17 — Build QAOA readout, filtering, and classical rescoring callable (qaoa_readout_and_rescore)

# ==============================================================================
# Task 17: Build QAOA readout, filtering, and classical rescoring callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 1: Compute full probability vector Pr(x) from the trained circuit (statevector mode, no shots).
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_full_probability_vector(trained_params: List[np.ndarray], dicke_vector: np.ndarray,
                                     h_c: qml.Hamiltonian, h_xy: qml.Hamiltonian, p: int, N: int) -> np.ndarray:
    """
    Computes the exact probability distribution of the trained QAOA circuit.

    This function constructs a dedicated readout QNode that executes the trained
    ansatz and returns the full statevector probabilities. It strictly enforces the
    pinned 'statevector_full_probs' measurement mode, ensuring no shot noise
    corrupts the distribution.

    Args:
        trained_params (List[np.ndarray]): The optimized [gamma, beta] parameters.
        dicke_vector (np.ndarray): The canonical initial statevector.
        h_c (qml.Hamiltonian): The proxy cost Hamiltonian.
        h_xy (qml.Hamiltonian): The constraint-preserving XY mixer Hamiltonian.
        p (int): The circuit depth.
        N (int): The number of qubits.

    Returns:
        np.ndarray: The exact probability vector of length 2^N.

    Raises:
        ValueError: If the resulting probability vector is not normalized.
    """
    # Instantiate the statevector simulator device explicitly without shots
    dev = qml.device("default.qubit", wires=N)

    @qml.qnode(dev)
    def readout_circuit(params: List[np.ndarray]) -> np.ndarray:
        gamma, beta = params[0], params[1]

        # Prepare the initial Dicke state
        qml.QubitStateVector(dicke_vector, wires=range(N))

        # Apply p alternating layers of Cost and Mixer evolutions
        for l in range(p):
            qml.ApproxTimeEvolution(h_c, gamma[l], 1)
            qml.ApproxTimeEvolution(h_xy, beta[l], 1)

        # Return the exact probability of each computational basis state
        return qml.probs(wires=range(N))

    # Execute the circuit with the trained parameters
    prob_vector = readout_circuit(trained_params)

    # Mathematically assert that the probability vector sums to 1.0
    prob_sum = np.sum(prob_vector)
    if not math.isclose(prob_sum, 1.0, abs_tol=1e-9):
        raise ValueError(f"Quantum mechanics violation: Probability vector sum is {prob_sum:.9f}, expected 1.0.")

    return prob_vector

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 2: Apply the two-stage readout filter: feasibility |x|=K and probability Pr(x) >= 1%.
# -------------------------------------------------------------------------------------------------------------------------------

def _apply_readout_filters(prob_vector: np.ndarray, K: int, N: int, prob_threshold: float) -> List[Tuple[np.ndarray, float, int]]:
    """
    Applies the strict feasibility and probability filters using optimized bitwise operations.

    This function iterates through the 2^N Hilbert space. It enforces the hard cardinality
    constraint and the probability threshold. It replaces suboptimal string formatting with
    high-performance bitwise shifts to construct the canonical binary numpy arrays directly
    from the integer basis indices.

    Args:
        prob_vector (np.ndarray): The exact probability vector.
        K (int): The cardinality constraint.
        N (int): The number of qubits.
        prob_threshold (float): The minimum probability required for retention.

    Returns:
        List[Tuple[np.ndarray, float, int]]: A list of surviving candidates: (binary_vector, probability, basis_index).
    """
    surviving_candidates: List[Tuple[np.ndarray, float, int]] = []
    feasible_mass = 0.0
    max_feasible_prob = 0.0

    total_states = 2 ** N

    # Pre-compute the array of bit shifts required for big-endian extraction
    # e.g., for N=10: [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
    shifts = np.arange(N - 1, -1, -1, dtype=np.int32)

    for k in range(total_states):
        # Filter 1: Feasibility (Hamming weight == K)
        if k.bit_count() == K:
            prob = float(prob_vector[k])
            feasible_mass += prob

            if prob > max_feasible_prob:
                max_feasible_prob = prob

            # Filter 2: Probability threshold (>= 1%)
            if prob >= prob_threshold:
                # Construct the canonical binary vector using optimized bitwise operations
                # We right-shift the integer k by the pre-computed shifts and apply bitwise AND 1
                x_vector = (k >> shifts) & 1
                x_vector = x_vector.astype(np.int32)

                surviving_candidates.append((x_vector, prob, k))

    # Log the constraint-preservation diagnostics
    logger.debug(f"QAOA Readout: Feasible subspace mass M_K = {feasible_mass:.6f} (Expected ~1.0). "
                 f"Max feasible prob = {max_feasible_prob:.6f}.")

    return surviving_candidates

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 3: Classically rescore candidates using Eq. (1) and select minimum-cost string with deterministic tie-break.
# -------------------------------------------------------------------------------------------------------------------------------

def _classically_rescore_and_select(candidates: List[Tuple[np.ndarray, float, int]],
                                    mu_ann: np.ndarray, sigma_ann: np.ndarray, q: float) -> Tuple[np.ndarray, float, List[Dict[str, Any]]]:
    """
    Classically rescores the surviving candidates using the true Markowitz objective (Eq. 1).

    This function evaluates the full quadratic form (including diagonal variance) for
    each candidate. It selects the candidate that minimizes this true classical cost,
    employing a strict lexicographic tie-breaking rule to guarantee absolute determinism.

    Args:
        candidates (List[Tuple[np.ndarray, float, int]]): The filtered list of candidates.
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        q (float): The risk aversion parameter.

    Returns:
        Tuple[np.ndarray, float, List[Dict[str, Any]]]: The optimal binary vector, its classical cost, and the full diagnostic table.
    """
    scored_candidates = []
    diagnostic_table = []

    for x_vector, prob, k in candidates:
        # Evaluate the true classical objective (Eq. 1)
        # f(x) = q * (x^T * Sigma * x) - (1-q) * (mu^T * x)
        # This explicitly includes the diagonal elements of Sigma
        variance_term = np.dot(x_vector, np.dot(sigma_ann, x_vector))
        return_term = np.dot(mu_ann, x_vector)

        classical_cost = (q * variance_term) - ((1.0 - q) * return_term)

        scored_candidates.append((x_vector, float(classical_cost), prob, k))

        # Append to the diagnostic table for institutional auditability
        diagnostic_table.append({
            "basis_index": k,
            "probability": prob,
            "classical_cost": float(classical_cost),
            "bitstring": x_vector.tolist()
        })

    # Define a deterministic sort key: (classical_cost, lexicographic_tuple)
    def sort_key(item: Tuple[np.ndarray, float, float, int]) -> Tuple[float, Tuple[int, ...]]:
        x_vec, cost, _, _ = item
        rounded_cost = round(cost, 12)
        lexicographic_tuple = tuple(x_vec.tolist())
        return (rounded_cost, lexicographic_tuple)

    # Sort candidates to find the minimum classical cost
    scored_candidates.sort(key=sort_key)

    # Extract the optimal candidate
    optimal_x, optimal_cost, optimal_prob, _ = scored_candidates[0]

    logger.debug(f"QAOA Rescoring Complete. Selected candidate prob: {optimal_prob:.4f}, Classical Cost: {optimal_cost:.6f}")

    return optimal_x, optimal_cost, diagnostic_table

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def qaoa_readout_and_rescore(trained_params: List[np.ndarray], dicke_vector: np.ndarray,
                             h_c: qml.Hamiltonian, h_xy: qml.Hamiltonian, p: int,
                             mu_ann: np.ndarray, sigma_ann: np.ndarray, config: Dict[str, Any]) -> Tuple[Optional[np.ndarray], Optional[float], List[Dict[str, Any]]]:
    """
    Orchestrates the QAOA readout, feasibility filtering, and classical rescoring phase.

    This function bridges the quantum simulation with the classical financial objective.
    It computes the exact statevector probabilities, rigorously filters out infeasible
    or low-probability states, and evaluates the surviving candidates against the true
    Markowitz risk-return trade-off (Eq. 1). It returns the optimal deterministic selection.

    Args:
        trained_params (List[np.ndarray]): The optimized QAOA parameters.
        dicke_vector (np.ndarray): The canonical initial statevector.
        h_c (qml.Hamiltonian): The proxy cost Hamiltonian.
        h_xy (qml.Hamiltonian): The constraint-preserving XY mixer Hamiltonian.
        p (int): The circuit depth.
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[Optional[np.ndarray], Optional[float], List[Dict[str, Any]]]:
            The optimal binary vector (or None), its classical cost (or None), and the diagnostic table.

    Raises:
        ValueError: If the measurement mode is not 'statevector_full_probs'.
    """
    logger.debug(f"Commencing QAOA readout and rescoring for depth p={p}.")

    # Extract required parameters from the configuration
    measurement_mode = config["qaoa_architecture"]["qaoa_measurement_mode"]
    if measurement_mode != "statevector_full_probs":
        raise ValueError(f"Unsupported measurement mode: '{measurement_mode}'. Expected 'statevector_full_probs'.")

    K = config["objective_function"]["cardinality_K"]
    q = config["objective_function"]["risk_aversion_q"]
    prob_threshold = config["qaoa_optimization"]["qaoa_readout_filter_prob"]
    N = len(config["global_setup"]["universe"])

    # Execute Step 1: Compute the exact full probability vector
    prob_vector = _compute_full_probability_vector(trained_params, dicke_vector, h_c, h_xy, p, N)

    # Execute Step 2: Apply the feasibility and probability filters
    surviving_candidates = _apply_readout_filters(prob_vector, K, N, prob_threshold)

    # Handle the edge case: Zero candidates survived the filters
    if not surviving_candidates:
        logger.warning(f"QAOA Readout (p={p}): ZERO candidates survived the {prob_threshold*100}% probability threshold.")
        return None, None, []

    # Execute Step 3: Classically rescore and select the optimal candidate
    optimal_x, optimal_cost, diagnostic_table = _classically_rescore_and_select(surviving_candidates, mu_ann, sigma_ann, q)

    return optimal_x, optimal_cost, diagnostic_table


In [None]:
# Task 18 — Build QAOA cross-depth best-selection callable (qaoa_select_best_across_depths)

# ==============================================================================
# Task 18: Build QAOA cross-depth best-selection callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 1: Iterate over depths p in {1..6} and run training + readout-rescoring, storing full diagnostics.
# -------------------------------------------------------------------------------------------------------------------------------

def _execute_depth_scan(qaoa_depths: List[int], dicke_vector: np.ndarray, h_c: qml.Hamiltonian, h_xy: qml.Hamiltonian,
                        mu_ann: np.ndarray, sigma_ann: np.ndarray, config: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Executes the QAOA training and readout pipeline across a configured list of circuit depths.

    This function iterates through each depth p, invoking the Trotterized training loop
    (Task 16) and the classical rescoring readout (Task 17). It aggregates the optimized
    parameters, classical costs, and comprehensive telemetry into a structured diagnostic
    table required for reproducing the manuscript's Table II.

    Args:
        qaoa_depths (List[int]): The list of circuit depths to evaluate.
        dicke_vector (np.ndarray): The canonical initial statevector.
        h_c (qml.Hamiltonian): The proxy cost Hamiltonian.
        h_xy (qml.Hamiltonian): The constraint-preserving XY mixer Hamiltonian.
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        List[Dict[str, Any]]: A list of diagnostic dictionaries, one for each evaluated depth.
    """
    depth_diagnostics: List[Dict[str, Any]] = []

    for p in qaoa_depths:
        logger.debug(f"--- Initiating QAOA pipeline for depth p={p} ---")

        # Execute Task 16: Train the circuit to obtain optimized parameters and telemetry
        trained_params, training_log = train_qaoa_single_depth(p, dicke_vector, h_c, h_xy, config)

        # Execute Task 17: Measure probabilities, filter, and classically rescore
        optimal_x, optimal_cost, readout_table = qaoa_readout_and_rescore(
            trained_params, dicke_vector, h_c, h_xy, p, mu_ann, sigma_ann, config
        )

        # Extract specific metrics required for Table II
        final_expected_cost = training_log["final_expected_cost"]
        iterations = training_log["iterations_to_convergence"]
        # The gradient norm is the last element in the trajectory
        final_grad_norm = training_log["gradient_norms"][-1] if training_log["gradient_norms"] else float('nan')

        # Construct the comprehensive record for this depth
        depth_record = {
            "p": p,
            "trained_gamma": trained_params[0].tolist(),
            "trained_beta": trained_params[1].tolist(),
            "final_expected_cost": final_expected_cost,
            "iterations_to_convergence": iterations,
            "final_gradient_norm": final_grad_norm,
            "best_candidate_x": optimal_x,  # May be None if filters eliminated all states
            "best_classical_cost": optimal_cost, # May be None
            "surviving_candidate_count": len(readout_table),
            "full_training_log": training_log,
            "full_readout_table": readout_table
        }

        depth_diagnostics.append(depth_record)

    return depth_diagnostics

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 2: Select the best candidate across depths by minimum classical cost f(x); break ties by smallest depth.
# -------------------------------------------------------------------------------------------------------------------------------

def _select_optimal_depth_candidate(depth_diagnostics: List[Dict[str, Any]]) -> Tuple[Optional[int], Optional[np.ndarray], Optional[float]]:
    """
    Selects the globally optimal candidate across all evaluated circuit depths.

    This function filters out depths that failed to produce a valid candidate. It then
    sorts the successful depths primarily by their true classical cost (Eq. 1) in ascending
    order. To guarantee absolute determinism, it resolves cost ties (within a 1e-12 tolerance)
    by selecting the shallower circuit (smaller p).

    Args:
        depth_diagnostics (List[Dict[str, Any]]): The complete list of per-depth diagnostic records.

    Returns:
        Tuple[Optional[int], Optional[np.ndarray], Optional[float]]:
            The winning depth p*, the optimal binary vector x*, and its classical cost.
            Returns (None, None, None) if all depths failed.
    """
    # Filter to include only depths that produced a valid candidate
    valid_records = [record for record in depth_diagnostics if record["best_candidate_x"] is not None]

    # Handle the critical edge case: All depths failed
    if not valid_records:
        logger.warning("CRITICAL: QAOA solver produced ZERO valid candidates across all depths. "
                       "Probability threshold may be too high. Triggering HRP fallback.")
        return None, None, None

    # Define a deterministic sort key: (classical_cost, depth_p)
    # Python sorts tuples element by element, perfectly implementing our tie-break logic
    def sort_key(record: Dict[str, Any]) -> Tuple[float, int]:
        cost = record["best_classical_cost"]
        p = record["p"]
        # Round cost to 12 decimal places to group floating-point near-ties
        rounded_cost = round(cost, 12)
        return (rounded_cost, p)

    # Sort the valid records using the deterministic key
    valid_records.sort(key=sort_key)

    # The optimal record is the first element in the sorted list
    optimal_record = valid_records[0]

    p_star = optimal_record["p"]
    x_star = optimal_record["best_candidate_x"]
    cost_star = optimal_record["best_classical_cost"]

    return p_star, x_star, cost_star

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 3: Package and return x_t^{QAOA} and the complete depth-scaling diagnostics table.
# -------------------------------------------------------------------------------------------------------------------------------

def _log_diagnostics_and_format_output(p_star: Optional[int], x_star: Optional[np.ndarray], cost_star: Optional[float],
                                       universe: List[str]) -> Optional[np.ndarray]:
    """
    Logs comprehensive diagnostic telemetry and returns the canonical selection vector.

    This function maps the selected binary vector back to the canonical ticker symbols
    for institutional auditability. It logs the winning depth and its classical cost.

    Args:
        p_star (Optional[int]): The winning circuit depth.
        x_star (Optional[np.ndarray]): The selected optimal binary vector.
        cost_star (Optional[float]): The classical cost of the selected vector.
        universe (List[str]): The canonical list of asset tickers.

    Returns:
        Optional[np.ndarray]: The canonical selection vector x_QAOA.
    """
    if x_star is not None:
        # Extract the indices of the selected assets
        selected_indices = np.where(x_star == 1)[0]

        # Map indices to ticker symbols
        selected_tickers = [universe[i] for i in selected_indices]

        logger.info(f"QAOA Cross-Depth Selection Complete. Winning Depth: p*={p_star}")
        logger.info(f"QAOA Optimal Classical Cost: {cost_star:.6f}")
        logger.info(f"QAOA Selected Assets: {selected_tickers}")
    else:
        logger.info("QAOA Cross-Depth Selection failed to produce a valid candidate.")

    return x_star

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def qaoa_select_best_across_depths(dicke_vector: np.ndarray, h_c: qml.Hamiltonian, h_xy: qml.Hamiltonian,
                                   mu_ann: np.ndarray, sigma_ann: np.ndarray, config: Dict[str, Any]) -> Tuple[Optional[np.ndarray], List[Dict[str, Any]]]:
    """
    Orchestrates the QAOA depth scan and optimal candidate selection.

    This function executes the full quantum-classical hybrid loop for a configured list
    of circuit depths. It aggregates the comprehensive telemetry required for Table II
    diagnostics, deterministically selects the globally optimal candidate based on the
    true Markowitz objective, and returns the canonical selection vector alongside the
    full diagnostic table.

    Args:
        dicke_vector (np.ndarray): The canonical initial statevector.
        h_c (qml.Hamiltonian): The proxy cost Hamiltonian.
        h_xy (qml.Hamiltonian): The constraint-preserving XY mixer Hamiltonian.
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[Optional[np.ndarray], List[Dict[str, Any]]]:
            The canonical binary selection vector x_QAOA (or None), and the complete depth-scaling diagnostics table.
    """
    logger.info("Commencing QAOA cross-depth scan and optimal selection.")

    # Extract required parameters from the configuration
    qaoa_depths = config["qaoa_architecture"]["qaoa_depths_p"]
    universe = config["global_setup"]["universe"]

    # Execute Step 1: Run training and readout for all configured depths
    depth_diagnostics = _execute_depth_scan(qaoa_depths, dicke_vector, h_c, h_xy, mu_ann, sigma_ann, config)

    # Execute Step 2: Select the optimal candidate across all depths
    p_star, x_star, cost_star = _select_optimal_depth_candidate(depth_diagnostics)

    # Execute Step 3: Log diagnostics and format the final output
    final_x_qaoa = _log_diagnostics_and_format_output(p_star, x_star, cost_star, universe)

    # Return the canonical selection vector and the full Table II-grade diagnostic artifact
    return final_x_qaoa, depth_diagnostics


In [None]:
# Task 19 — Build SLSQP Sharpe-max allocation callable (allocate_sharpe_max)

# ==============================================================================
# Task 19: Build SLSQP Sharpe-max allocation callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 1: Define the constrained Sharpe-max optimization problem on the selected subset S.
# -------------------------------------------------------------------------------------------------------------------------------

def _prepare_subset_optimization_problem(x_t: np.ndarray, mu_ann: np.ndarray, sigma_ann: np.ndarray,
                                         config: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
    """
    Extracts the subset parameters and defines the objective function constants.

    This function isolates the K active assets identified by the selection vector.
    By reducing the dimensionality from N to K, it significantly improves the stability
    and speed of the continuous optimizer.

    Args:
        x_t (np.ndarray): The binary selection vector of shape (N,).
        mu_ann (np.ndarray): The full annualized expected return vector.
        sigma_ann (np.ndarray): The full annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
            The subset indices S, subset returns mu_S, subset covariance Sigma_S, and the risk-free rate.
    """
    # Extract the integer indices of the selected assets (where x_i == 1)
    S_indices = np.where(x_t == 1)[0]

    # Slice the expected return vector to include only the selected assets
    mu_S = mu_ann[S_indices]

    # Slice the covariance matrix to include only the interactions between selected assets
    # np.ix_ creates a meshgrid for advanced indexing of the 2D array
    sigma_S = sigma_ann[np.ix_(S_indices, S_indices)]

    # Extract the pinned risk-free rate from the configuration
    r_f = config["allocation"]["risk_free_rate_annual"]

    return S_indices, mu_S, sigma_S, r_f

def _negative_sharpe_objective(w_S: np.ndarray, mu_S: np.ndarray, sigma_S: np.ndarray, r_f: float) -> float:
    """
    Computes the negative Sharpe ratio for the subset portfolio.

    This is the objective function to be minimized by the SLSQP solver. It includes
    a defensive guardrail against non-positive variance, which can occur if the solver
    explores extreme or degenerate weight combinations.

    Args:
        w_S (np.ndarray): The subset weight vector.
        mu_S (np.ndarray): The subset expected return vector.
        sigma_S (np.ndarray): The subset covariance matrix.
        r_f (float): The annualized risk-free rate.

    Returns:
        float: The negative Sharpe ratio, or a massive penalty if variance is invalid.
    """
    # Compute portfolio expected return
    port_return = np.dot(w_S, mu_S)

    # Compute portfolio variance
    port_variance = np.dot(w_S, np.dot(sigma_S, w_S))

    # Defensive check: Ensure variance is strictly positive to prevent ZeroDivisionError or NaN
    if port_variance <= 1e-12:
        return 1e9  # Return a massive penalty to force the optimizer away from this region

    port_volatility = np.sqrt(port_variance)

    # Compute the Sharpe ratio
    sharpe_ratio = (port_return - r_f) / port_volatility

    # Return the negative Sharpe ratio for minimization
    return -float(sharpe_ratio)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 2: Define constraints/bounds/initialization and invoke SLSQP deterministically.
# -------------------------------------------------------------------------------------------------------------------------------

def _execute_slsqp_optimization(mu_S: np.ndarray, sigma_S: np.ndarray, r_f: float,
                                K: int, bounds_cfg: Tuple[float, float]) -> OptimizeResult:
    """
    Configures and executes the Sequential Least Squares Programming (SLSQP) solver.

    This function enforces the pinned equality constraint (sum w_i = 1) and the box
    constraints (w_min <= w_i <= w_max). It initializes the solver at the equally
    weighted portfolio to guarantee a feasible starting point.

    Args:
        mu_S (np.ndarray): The subset expected return vector.
        sigma_S (np.ndarray): The subset covariance matrix.
        r_f (float): The annualized risk-free rate.
        K (int): The cardinality constraint (number of active assets).
        bounds_cfg (Tuple[float, float]): The (min, max) weight bounds.

    Returns:
        OptimizeResult: The raw result object from the SciPy optimizer.
    """
    # Define the equality constraint: sum(w) - 1 = 0
    constraints = [
        {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}
    ]

    # Define the box bounds for each active asset
    w_min, w_max = bounds_cfg
    bounds = tuple((w_min, w_max) for _ in range(K))

    # Initialize with the equally weighted portfolio (feasible by definition)
    w_initial = np.full(K, 1.0 / K)

    # Execute the SLSQP optimization
    result = minimize(
        fun=_negative_sharpe_objective,
        x0=w_initial,
        args=(mu_S, sigma_S, r_f),
        method='SLSQP',
        bounds=bounds,
        constraints=constraints,
        options={'ftol': 1e-9, 'disp': False}
    )

    return result

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 3: Validate the optimizer output, expand to full-universe weight vector, and return or fail.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_and_expand_weights(result: OptimizeResult, S_indices: np.ndarray, N: int,
                                 bounds_cfg: Tuple[float, float]) -> Optional[np.ndarray]:
    """
    Rigorously validates the solver output and expands it to the full universe dimension.

    This function does not blindly trust the solver's success flag. It mathematically
    re-verifies the sum-to-one constraint and the box bounds within a strict tolerance.
    If valid, it scatters the subset weights back into a zero-initialized N-vector.

    Args:
        result (OptimizeResult): The raw result object from the SciPy optimizer.
        S_indices (np.ndarray): The integer indices of the selected assets.
        N (int): The total number of assets in the universe.
        bounds_cfg (Tuple[float, float]): The (min, max) weight bounds.

    Returns:
        Optional[np.ndarray]: The expanded weight vector of shape (N,), or None if validation fails.
    """
    # Check the solver's internal success flag
    if not result.success:
        logger.warning(f"SLSQP Allocation Failed: Solver non-convergence. Message: {result.message}")
        return None

    w_S = result.x

    # Check for NaN values resulting from objective function singularities
    if np.isnan(w_S).any():
        logger.warning("SLSQP Allocation Failed: Solver returned NaN weights.")
        return None

    # Re-verify the equality constraint: |sum(w) - 1| < 1e-8
    if not np.isclose(np.sum(w_S), 1.0, atol=1e-8):
        logger.warning(f"SLSQP Allocation Failed: Sum constraint violated. Sum = {np.sum(w_S):.8f}")
        return None

    # Re-verify the box bounds: w_min - tol <= w_i <= w_max + tol
    w_min, w_max = bounds_cfg
    if np.any(w_S < w_min - 1e-8) or np.any(w_S > w_max + 1e-8):
        logger.warning("SLSQP Allocation Failed: Box bounds violated.")
        return None

    # Expand the valid subset weights to the full N-dimensional universe
    w_full = np.zeros(N, dtype=np.float64)
    w_full[S_indices] = w_S

    return w_full

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def allocate_sharpe_max(x_t: Optional[np.ndarray], mu_ann: np.ndarray, sigma_ann: np.ndarray, config: Dict[str, Any]) -> Optional[np.ndarray]:
    """
    Orchestrates the continuous Sharpe-maximizing weight allocation for a selected subset.

    This function translates the discrete selection vector into a constrained continuous
    optimization problem. It isolates the active assets, executes the SLSQP solver to
    find the optimal risk-adjusted weights, rigorously validates the constraints, and
    expands the result back to the canonical universe dimension. If any step fails,
    it gracefully returns None to trigger the HRP fallback.

    Args:
        x_t (Optional[np.ndarray]): The binary selection vector, or None if selection failed.
        mu_ann (np.ndarray): The full annualized expected return vector.
        sigma_ann (np.ndarray): The full annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Optional[np.ndarray]: The canonical weight vector w_t of shape (N,), or None on failure.
    """
    # Handle the edge case where the discrete selector failed to provide a valid subset
    if x_t is None:
        logger.debug("Allocation skipped: Input selection vector is None.")
        return None

    logger.debug("Commencing SLSQP Sharpe-max allocation.")

    # Extract required parameters from the configuration
    K = config["objective_function"]["cardinality_K"]
    N = len(config["global_setup"]["universe"])
    bounds_cfg = config["capital_and_bounds"]["allocation_bounds"]

    # Defensive check: Ensure the selection vector has exactly K active assets
    if np.sum(x_t) != K:
        logger.warning(f"Allocation Failed: Selection vector does not satisfy cardinality K={K}.")
        return None

    try:
        # Execute Step 1: Extract subset parameters and define constants
        S_indices, mu_S, sigma_S, r_f = _prepare_subset_optimization_problem(x_t, mu_ann, sigma_ann, config)

        # Execute Step 2: Execute the SLSQP optimization
        result = _execute_slsqp_optimization(mu_S, sigma_S, r_f, K, bounds_cfg)

        # Execute Step 3: Validate the output and expand to the full universe
        w_full = _validate_and_expand_weights(result, S_indices, N, bounds_cfg)

        if w_full is not None:
            logger.debug("SLSQP Sharpe-max allocation completed successfully.")

        return w_full

    except Exception as e:
        # Catch any unexpected solver exceptions (e.g., LinAlgError inside SciPy)
        logger.error(f"SLSQP Allocation encountered a fatal exception: {e}")
        return None


In [None]:
# Task 20 — Build HRP baseline computation callable (compute_hrp_weights)

# ==============================================================================
# Task 20: Build HRP baseline computation callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 1: Instantiate the HRP optimizer on the monthly lookback return matrix.
# -------------------------------------------------------------------------------------------------------------------------------

def _instantiate_hrp_optimizer(returns_df: pd.DataFrame, universe: List[str]) -> HRPOpt:
    """
    Instantiates the Hierarchical Risk Parity (HRP) optimizer using the lookback returns.

    This function rigorously validates the input returns matrix to ensure it aligns
    with the canonical universe and contains no missing or infinite values. It then
    initializes the PyPortfolioOpt HRPOpt object, which will internally compute the
    distance matrix and linkage dendrogram required for clustering.

    Args:
        returns_df (pd.DataFrame): The canonical daily log-returns matrix of shape (L-1, N).
        universe (List[str]): The canonical list of asset tickers.

    Returns:
        HRPOpt: The instantiated Hierarchical Risk Parity optimizer object.

    Raises:
        ValueError: If the returns matrix is misaligned, contains NaNs, or has infinite values.
    """
    # Assert that the columns of the returns matrix exactly match the canonical universe
    if returns_df.columns.tolist() != universe:
        raise ValueError("HRP Instantiation Failed: Returns matrix columns do not match the canonical universe.")

    # Assert that the returns matrix contains exactly zero NaN values
    if returns_df.isna().sum().sum() != 0:
        raise ValueError("HRP Instantiation Failed: Returns matrix contains NaN values.")

    # Assert that all values in the returns matrix are finite real numbers
    if not np.isfinite(returns_df.values).all():
        raise ValueError("HRP Instantiation Failed: Returns matrix contains infinite values.")

    # Instantiate the HRPOpt object explicitly using the returns data
    # This ensures the internal correlation and distance matrices are computed correctly
    hrp_optimizer = HRPOpt(returns=returns_df)

    return hrp_optimizer

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 2: Compute HRP weights and convert to a canonical N-vector in universe order.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_and_align_hrp_weights(hrp_optimizer: HRPOpt, universe: List[str], expected_N: int) -> np.ndarray:
    """
    Executes the HRP optimization and aligns the resulting weights to the canonical universe.

    This function triggers the quasi-diagonalization and recursive bisection algorithms.
    It extracts the resulting ticker-to-weight dictionary and maps it into a dense,
    strictly ordered numpy array, guaranteeing perfect alignment for downstream operations.

    Args:
        hrp_optimizer (HRPOpt): The instantiated HRP optimizer object.
        universe (List[str]): The canonical list of asset tickers.
        expected_N (int): The expected number of assets in the universe.

    Returns:
        np.ndarray: The canonical HRP weight vector of shape (N,).

    Raises:
        ValueError: If the optimizer drops an asset, resulting in a missing key.
    """
    # Execute the HRP optimization algorithm to generate the weight dictionary
    raw_weights_dict = hrp_optimizer.optimize()

    # Initialize an empty list to hold the strictly ordered weights
    aligned_weights_list: List[float] = []

    # Iterate through the canonical universe to guarantee exact ordering
    for ticker in universe:
        # Attempt to retrieve the weight for the specific ticker
        if ticker not in raw_weights_dict:
            # Raise a fatal error if the optimizer silently dropped an asset
            raise ValueError(f"HRP Alignment Failed: Ticker '{ticker}' is missing from the optimizer output.")

        # Append the extracted weight to the aligned list
        aligned_weights_list.append(raw_weights_dict[ticker])

    # Convert the aligned list into a dense, double-precision numpy array
    hrp_weights_vector = np.array(aligned_weights_list, dtype=np.float64)

    # Assert the exact shape of the resulting vector
    if hrp_weights_vector.shape != (expected_N,):
        raise ValueError(f"HRP Alignment Failed: Expected vector shape ({expected_N},), "
                         f"but observed {hrp_weights_vector.shape}.")

    return hrp_weights_vector

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 3: Validate HRP weights and return as baseline and fallback artifact.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_hrp_weights(hrp_weights: np.ndarray) -> None:
    """
    Performs rigorous mathematical validation on the canonical HRP weight vector.

    This function asserts the fundamental properties of a fully invested, long-only
    portfolio: strict non-negativity, sum-to-one (within floating-point tolerance),
    and absolute finiteness.

    Args:
        hrp_weights (np.ndarray): The canonical HRP weight vector.

    Raises:
        ValueError: If non-negativity, sum-to-one, or finiteness invariants are violated.
    """
    # Assert that all elements are finite real numbers
    if not np.isfinite(hrp_weights).all():
        raise ValueError("HRP Validation Failed: Weight vector contains infinite or NaN values.")

    # Assert strict non-negativity (allowing a microscopic tolerance for numerical noise)
    if np.any(hrp_weights < -1e-10):
        raise ValueError("HRP Validation Failed: Weight vector contains negative values.")

    # Calculate the sum of the weights
    weights_sum = np.sum(hrp_weights)

    # Assert the sum-to-one constraint using a strict absolute tolerance
    if not np.isclose(weights_sum, 1.0, atol=1e-8):
        raise ValueError(f"HRP Validation Failed: Weights do not sum to 1.0. Observed sum: {weights_sum:.8f}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_hrp_weights(returns_df: pd.DataFrame, config: Dict[str, Any]) -> np.ndarray:
    """
    Orchestrates the computation of the Hierarchical Risk Parity (HRP) baseline weights.

    This function serves as the entry point for generating the HRP allocation. It
    instantiates the clustering optimizer using the causal lookback returns, executes
    the recursive bisection algorithm, aligns the output to the canonical universe
    ordering, and rigorously validates the mathematical properties of the final vector.

    Args:
        returns_df (pd.DataFrame): The canonical daily log-returns matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        np.ndarray: The validated, canonical HRP weight vector of shape (N,).

    Raises:
        RuntimeError: If the PyPortfolioOpt library encounters an internal failure.
        ValueError: If structural, alignment, or mathematical invariants are violated.
    """
    logger.debug("Commencing Hierarchical Risk Parity (HRP) weight computation.")

    # Extract the canonical universe and its expected length
    universe = config["global_setup"]["universe"]
    expected_N = len(universe)

    try:
        # Execute Step 1: Instantiate the HRP optimizer on the lookback returns
        hrp_optimizer = _instantiate_hrp_optimizer(returns_df, universe)

        # Execute Step 2: Compute the weights and align them to the canonical N-vector
        hrp_weights = _compute_and_align_hrp_weights(hrp_optimizer, universe, expected_N)

        # Execute Step 3: Validate the mathematical properties of the weight vector
        _validate_hrp_weights(hrp_weights)

        logger.debug("HRP weight computation and validation completed successfully.")

        # Return the canonical HRP weight artifact
        return hrp_weights

    except Exception as e:
        # Catch any unexpected exceptions from the underlying library and raise a descriptive error
        logger.error(f"Fatal error during HRP computation: {e}")
        raise RuntimeError(f"Failed to compute HRP baseline weights: {e}") from e


In [None]:
# Task 21 — Build allocation dispatcher with fallback callable (dispatch_allocation)

# ==============================================================================
# Task 21: Build allocation dispatcher with fallback callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 1: Attempt primary Sharpe-max allocation on the selector's chosen subset.
# -------------------------------------------------------------------------------------------------------------------------------

def _attempt_primary_allocation(x_t: Optional[np.ndarray], mu_ann: np.ndarray, sigma_ann: np.ndarray,
                                config: Dict[str, Any]) -> Tuple[Optional[np.ndarray], Optional[str]]:
    """
    Attempts to compute the primary Sharpe-maximizing continuous allocation.

    This function evaluates the discrete selection vector. If valid, it invokes the
    SLSQP optimizer (Task 19) to find the optimal weights within the [0.05, 0.50] bounds.
    If the selection is missing or the optimizer fails, it returns None alongside a
    deterministic failure reason code.

    Args:
        x_t (Optional[np.ndarray]): The binary selection vector, or None if selection failed.
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[Optional[np.ndarray], Optional[str]]:
            The intermediate weight vector (or None), and the failure reason code (or None).

    Raises:
        ValueError: If the input arrays suffer from dimensional misalignment.
    """
    # Extract the canonical universe size
    expected_N = len(config["global_setup"]["universe"])

    # Handle the case where the discrete solver failed to produce a valid subset
    if x_t is None:
        logger.debug("Primary allocation bypassed: Input selection vector is None.")
        return None, "selection_none"

    # Assert strict dimensional alignment to prevent silent dot-product corruption
    if x_t.shape != (expected_N,) or mu_ann.shape != (expected_N,) or sigma_ann.shape != (expected_N, expected_N):
        raise ValueError(f"Dimensionality violation in allocation dispatcher. Expected N={expected_N}. "
                         f"Got x_t:{x_t.shape}, mu:{mu_ann.shape}, Sigma:{sigma_ann.shape}.")

    # Invoke the primary SLSQP Sharpe-max allocator (Task 19)
    w_t = allocate_sharpe_max(x_t, mu_ann, sigma_ann, config)

    # Evaluate the result of the primary allocation
    if w_t is None:
        # The allocator failed (e.g., non-convergence, invalid variance, bound violation)
        logger.debug("Primary allocation failed during SLSQP optimization.")
        return None, "allocation_failed"

    # Allocation succeeded
    return w_t, None

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 2: Handle failure via HRP fallback.
# -------------------------------------------------------------------------------------------------------------------------------

def _handle_fallback_allocation(w_t: Optional[np.ndarray], failure_reason: Optional[str],
                                returns_df: pd.DataFrame, config: Dict[str, Any],
                                selector_name: str, rebalance_date: str) -> Tuple[np.ndarray, bool]:
    """
    Evaluates the allocation state and triggers the Hierarchical Risk Parity fallback if necessary.

    This function guarantees that the pipeline always produces a valid portfolio. If the
    primary allocation yielded None, it invokes the HRP baseline (Task 20) using the
    causal lookback returns. It logs the exact failure context for institutional auditability.

    Args:
        w_t (Optional[np.ndarray]): The intermediate weight vector from the primary allocator.
        failure_reason (Optional[str]): The reason code if the primary allocation failed.
        returns_df (pd.DataFrame): The canonical daily log-returns matrix.
        config (Dict[str, Any]): The validated master study configuration dictionary.
        selector_name (str): The identifier of the strategy (e.g., 'QAOA_XY', 'SA').
        rebalance_date (str): The ISO-formatted date of the current rebalance.

    Returns:
        Tuple[np.ndarray, bool]: The resolved weight vector and a boolean indicating if fallback was used.
    """
    if w_t is not None:
        # Primary allocation was successful; no fallback required
        return w_t, False

    # Primary allocation failed; trigger the fallback mechanism
    logger.warning(f"Fallback Triggered | Date: {rebalance_date} | Strategy: {selector_name} | "
                   f"Reason: {failure_reason}. Computing HRP baseline weights.")

    # Invoke the HRP baseline computation (Task 20)
    # This function is guaranteed to return a valid N-vector or raise a fatal RuntimeError
    hrp_weights = compute_hrp_weights(returns_df, config)

    return hrp_weights, True

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 3: Return final weights and assert validity.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_final_weights(w_t: np.ndarray, expected_N: int) -> np.ndarray:
    """
    Performs universal, rigorous mathematical validation on the final portfolio weights.

    This function asserts the fundamental invariants of the portfolio: full investment
    (sum w_i = 1) and long-only constraints (w_i >= 0). It applies microscopic clipping
    to eliminate floating-point noise below zero, ensuring pristine data for downstream
    performance accounting.

    Args:
        w_t (np.ndarray): The resolved weight vector.
        expected_N (int): The expected number of assets in the universe.

    Returns:
        np.ndarray: The validated and microscopically cleaned weight vector.

    Raises:
        ValueError: If the weights violate the sum constraint, non-negativity, or finiteness.
    """
    # Assert the exact shape of the vector
    if w_t.shape != (expected_N,):
        raise ValueError(f"Final Weight Validation Failed: Expected shape ({expected_N},), got {w_t.shape}.")

    # Assert that all elements are finite real numbers
    if not np.isfinite(w_t).all():
        raise ValueError("Final Weight Validation Failed: Weight vector contains infinite or NaN values.")

    # Assert strict non-negativity (allowing a microscopic tolerance for numerical noise)
    if np.any(w_t < -1e-10):
        raise ValueError("Final Weight Validation Failed: Weight vector contains negative values exceeding tolerance.")

    # Clip microscopic negative noise exactly to 0.0 to prevent downstream accounting bugs
    w_t_clean = np.clip(w_t, a_min=0.0, a_max=None)

    # Calculate the sum of the cleaned weights
    # Equation: sum_{i=1}^N w_i = 1
    weights_sum = np.sum(w_t_clean)

    # Assert the sum-to-one constraint using a strict absolute tolerance
    if not np.isclose(weights_sum, 1.0, atol=1e-8):
        raise ValueError(f"Final Weight Validation Failed: Weights do not sum to 1.0. Observed sum: {weights_sum:.8f}")

    # Normalize by the sum to absorb any remaining floating-point drift, ensuring exact unity
    w_t_final = w_t_clean / weights_sum

    return w_t_final

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def dispatch_allocation(x_t: Optional[np.ndarray], mu_ann: np.ndarray, sigma_ann: np.ndarray,
                        returns_df: pd.DataFrame, config: Dict[str, Any],
                        selector_name: str, rebalance_date: pd.Timestamp) -> Tuple[np.ndarray, bool]:
    """
    Orchestrates the portfolio allocation process with a robust HRP fallback mechanism.

    This function acts as the central routing hub for continuous weight allocation. It
    first attempts to solve the constrained Sharpe-maximization problem for the discrete
    subset selected by the quantum or classical solver. If the selection is invalid or
    the convex optimizer fails, it transparently degrades to the Hierarchical Risk Parity
    baseline. It rigorously validates the final portfolio invariants before returning.

    Args:
        x_t (Optional[np.ndarray]): The binary selection vector from the discrete solver.
        mu_ann (np.ndarray): The annualized expected return vector.
        sigma_ann (np.ndarray): The annualized covariance matrix.
        returns_df (pd.DataFrame): The canonical daily log-returns matrix (for HRP).
        config (Dict[str, Any]): The validated master study configuration dictionary.
        selector_name (str): The identifier of the strategy (e.g., 'QAOA_XY', 'SA').
        rebalance_date (pd.Timestamp): The timestamp of the current rebalance.

    Returns:
        Tuple[np.ndarray, bool]:
            The validated, canonical weight vector w_t of shape (N,), and a boolean
            flag indicating whether the HRP fallback was triggered.

    Raises:
        RuntimeError: If both the primary allocation and the fallback mechanism fail.
    """
    date_str = rebalance_date.date().isoformat()
    logger.debug(f"Commencing allocation dispatch for {selector_name} on {date_str}.")

    expected_N = len(config["global_setup"]["universe"])

    try:
        # Execute Step 1: Attempt the primary Sharpe-max allocation
        w_t_primary, failure_reason = _attempt_primary_allocation(x_t, mu_ann, sigma_ann, config)

        # Execute Step 2: Evaluate state and trigger HRP fallback if necessary
        w_t_resolved, fallback_used = _handle_fallback_allocation(
            w_t_primary, failure_reason, returns_df, config, selector_name, date_str
        )

        # Execute Step 3: Rigorously validate the final portfolio invariants
        w_t_final = _validate_final_weights(w_t_resolved, expected_N)

        logger.debug(f"Allocation dispatch completed for {selector_name}. Fallback used: {fallback_used}.")

        return w_t_final, fallback_used

    except Exception as e:
        # Catch any unexpected exceptions and raise a fatal error, as an unallocated month breaks the backtest
        logger.error(f"Fatal error during allocation dispatch for {selector_name} on {date_str}: {e}")
        raise RuntimeError(f"Allocation dispatch failed completely for {selector_name}: {e}") from e


In [None]:
# Task 22 — Build holding-period return and turnover computation callable (compute_holding_returns_and_turnover)

# ==============================================================================
# Task 22: Build holding-period return and turnover computation callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 1: Compute per-asset holding-period returns using the pinned "price_relative" convention.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_asset_holding_returns(cleaned_price_df: pd.DataFrame, t: pd.Timestamp, t_plus: Optional[pd.Timestamp],
                                   config: Dict[str, Any]) -> np.ndarray:
    """
    Computes the exact holding-period return for each asset between t and t^+.

    This function enforces the pinned 'price_relative' convention. It handles the
    critical edge case of the final backtest month (December) deterministically.
    It utilizes the dedicated helper function to ensure clean, DRY timezone alignment.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical, cleansed price matrix.
        t (pd.Timestamp): The current rebalance timestamp (entry).
        t_plus (Optional[pd.Timestamp]): The next rebalance timestamp (exit), or None for the last month.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        np.ndarray: The vector of asset returns r_{t -> t^+} of shape (N,).

    Raises:
        ValueError: If the required timestamps are missing from the price index.
    """
    universe = config["global_setup"]["universe"]
    target_tz = cleaned_price_df.index.tz

    # Align entry timestamp using the DRY helper function
    t_aligned = _align_timestamp_timezone(t, target_tz)

    # Resolve t_plus for the final month (December edge case)
    if t_plus is None:
        backtest_year = config["global_setup"]["backtest_year"]
        next_year_start = pd.Timestamp(year=backtest_year + 1, month=1, day=1)
        next_year_start = _align_timestamp_timezone(next_year_start, target_tz)

        valid_exit_dates = cleaned_price_df.index[cleaned_price_df.index >= next_year_start]
        if len(valid_exit_dates) == 0:
            raise ValueError(f"Data coverage error: Cannot compute final holding return. "
                             f"No trading days found on or after {next_year_start.date()}.")
        t_plus_aligned = valid_exit_dates[0]
        logger.debug(f"Resolved final month exit date (t^+) to {t_plus_aligned.date()}.")
    else:
        # Align exit timestamp using the DRY helper function
        t_plus_aligned = _align_timestamp_timezone(t_plus, target_tz)

    try:
        prices_t = cleaned_price_df.loc[t_aligned, universe].values.astype(np.float64)
        prices_t_plus = cleaned_price_df.loc[t_plus_aligned, universe].values.astype(np.float64)
    except KeyError as e:
        raise ValueError(f"Timestamp missing from price index during return computation: {e}")

    # Compute the price relative return
    asset_returns = (prices_t_plus / prices_t) - 1.0

    if not np.isfinite(asset_returns).all():
        raise ValueError("Mathematical violation: Infinite or NaN values detected in asset holding returns.")

    return asset_returns

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 2: Compute gross portfolio return as the dot product of weights and asset holding returns.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_gross_portfolio_return(w_t: np.ndarray, asset_returns: np.ndarray, expected_N: int) -> float:
    """
    Computes the gross portfolio return via the linear combination of weights and asset returns.

    This function mathematically asserts dimensional alignment before executing the
    highly optimized numpy dot product, ensuring exact operational accounting.

    Args:
        w_t (np.ndarray): The validated portfolio weight vector.
        asset_returns (np.ndarray): The vector of asset holding-period returns.
        expected_N (int): The expected number of assets in the universe.

    Returns:
        float: The scalar gross portfolio return.

    Raises:
        ValueError: If dimensional alignment invariants are violated.
    """
    # Assert strict dimensional alignment
    if w_t.shape != (expected_N,) or asset_returns.shape != (expected_N,):
        raise ValueError(f"Dimensionality violation: Expected shape ({expected_N},). "
                         f"Got w_t:{w_t.shape}, returns:{asset_returns.shape}.")

    # Compute the gross return: r^{gross}_t = sum(w_{i,t} * r_{i,t->t^+})
    gross_return = np.dot(w_t, asset_returns)

    return float(gross_return)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 3: Compute turnover as L1 norm of monthly weight change; handle the first month deterministically.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_portfolio_turnover(w_t: np.ndarray, w_t_minus_1: np.ndarray, expected_N: int) -> float:
    """
    Computes the portfolio turnover using the L1 norm of the weight vector differences.

    This function strictly implements the manuscript's turnover definition. It handles
    the initialization phase deterministically: if the previous weight vector is all zeros,
    the turnover will naturally evaluate to 1.0 (assuming w_t sums to 1), accurately
    reflecting the cost of establishing the initial positions.

    Args:
        w_t (np.ndarray): The current portfolio weight vector.
        w_t_minus_1 (np.ndarray): The previous month's portfolio weight vector.
        expected_N (int): The expected number of assets in the universe.

    Returns:
        float: The scalar portfolio turnover.

    Raises:
        ValueError: If dimensional alignment invariants are violated.
    """
    # Assert strict dimensional alignment
    if w_t.shape != (expected_N,) or w_t_minus_1.shape != (expected_N,):
        raise ValueError(f"Dimensionality violation: Expected shape ({expected_N},). "
                         f"Got w_t:{w_t.shape}, w_t_minus_1:{w_t_minus_1.shape}.")

    # Compute the L1 norm: Turnover_t = sum(|w_{i,t} - w_{i,t-1}|)
    weight_differences = w_t - w_t_minus_1
    turnover = np.sum(np.abs(weight_differences))

    return float(turnover)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_holding_returns_and_turnover(cleaned_price_df: pd.DataFrame, t: pd.Timestamp, t_plus: Optional[pd.Timestamp],
                                         w_t: np.ndarray, w_t_minus_1: np.ndarray, config: Dict[str, Any]) -> Tuple[float, np.ndarray, float]:
    """
    Orchestrates the computation of the gross holding-period return and portfolio turnover.

    This function acts as the operational accounting engine for a single rebalance period.
    It extracts the exact entry and exit prices, computes the relative asset returns,
    aggregates them into a gross portfolio return using the current weights, and calculates
    the L1 norm turnover against the previous weights. It returns the complete set of
    metrics required for net return calculation and diagnostic logging.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical, cleansed price matrix.
        t (pd.Timestamp): The current rebalance timestamp (entry).
        t_plus (Optional[pd.Timestamp]): The next rebalance timestamp (exit), or None for the last month.
        w_t (np.ndarray): The current portfolio weight vector.
        w_t_minus_1 (np.ndarray): The previous month's portfolio weight vector.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[float, np.ndarray, float]:
            The gross portfolio return, the vector of individual asset returns, and the portfolio turnover.

    Raises:
        RuntimeError: If the accounting computations fail due to missing data or dimensional errors.
    """
    logger.debug(f"Commencing holding-period return and turnover computation for period starting {t.date()}.")

    expected_N = len(config["global_setup"]["universe"])

    try:
        # Execute Step 1: Compute per-asset holding-period returns
        asset_returns = _compute_asset_holding_returns(cleaned_price_df, t, t_plus, config)

        # Execute Step 2: Compute gross portfolio return
        gross_return = _compute_gross_portfolio_return(w_t, asset_returns, expected_N)

        # Execute Step 3: Compute portfolio turnover
        turnover = _compute_portfolio_turnover(w_t, w_t_minus_1, expected_N)

        logger.debug(f"Accounting complete. Gross Return: {gross_return:.4%}, Turnover: {turnover:.4f}")

        return gross_return, asset_returns, turnover

    except Exception as e:
        # Catch any unexpected exceptions and raise a fatal error, as accounting failure invalidates the backtest
        logger.error(f"Fatal error during performance accounting for period {t.date()}: {e}")
        raise RuntimeError(f"Holding-period return computation failed: {e}") from e


In [None]:
# Task 23 — Build portfolio value path and net-return callable (`compute_net_return_and_value`)

# ==============================================================================
# Task 23: Build portfolio value path and net-return callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 1: Compute net return by applying transaction costs proportional to turnover.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_net_return(gross_return: float, turnover: float, config: Dict[str, Any]) -> float:
    """
    Computes the net portfolio return by penalizing the gross return with transaction costs.

    This function strictly implements the manuscript's friction model:
    r^{net}_t = r^{gross}_t - (tau * Turnover_t). It rigorously validates the transaction
    cost parameter to ensure it is correctly specified as a decimal rate, preventing
    catastrophic scaling errors.

    Args:
        gross_return (float): The scalar gross portfolio return.
        turnover (float): The scalar portfolio turnover (L1 norm of weight changes).
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        float: The scalar net portfolio return.

    Raises:
        ValueError: If the transaction cost parameter is invalid or mis-scaled.
    """
    # Extract the transaction cost rate from the configuration
    tau = config["evaluation_metrics"]["transaction_cost_bps"]

    # Assert that tau is a valid decimal rate (e.g., 5 bps = 0.0005)
    # If tau >= 0.01 (100 bps or 1%), it is highly likely misconfigured as an integer
    if not (0.0 <= tau < 0.01):
        raise ValueError(f"Configuration violation: transaction_cost_bps ({tau}) must be a decimal rate "
                         f"in the interval [0.0, 0.01). Check configuration scaling.")

    # Compute the transaction cost penalty
    friction_penalty = tau * turnover

    # Compute the net return
    net_return = gross_return - friction_penalty

    return float(net_return)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 2: Update portfolio value via multiplicative compounding.
# -------------------------------------------------------------------------------------------------------------------------------

def _update_portfolio_value(v_t_minus_1: float, net_return: float) -> float:
    """
    Updates the portfolio's Net Asset Value (NAV) via geometric compounding.

    This function strictly implements the wealth evolution equation:
    V_t = V_{t-1} * (1 + r^{net}_t). It preserves full floating-point precision
    without premature rounding.

    Args:
        v_t_minus_1 (float): The portfolio value at the end of the previous period.
        net_return (float): The realized net return for the current period.

    Returns:
        float: The updated portfolio value V_t.
    """
    # Compute the new portfolio value
    v_t = v_t_minus_1 * (1.0 + net_return)

    return float(v_t)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 3: Validate V_t > 0, return (r_t^{net}, V_t), and append to strategy state upstream.
# -------------------------------------------------------------------------------------------------------------------------------

def _validate_and_log_wealth_state(v_t_minus_1: float, gross_return: float, turnover: float,
                                   net_return: float, v_t: float, strategy_name: str) -> Tuple[float, float]:
    """
    Performs rigorous mathematical validation on the updated wealth state and logs telemetry.

    This function asserts the survival of the portfolio (V_t > 0) and the finiteness
    of the computed values. It logs the complete state transition for institutional
    auditability.

    Args:
        v_t_minus_1 (float): The previous portfolio value.
        gross_return (float): The gross return.
        turnover (float): The portfolio turnover.
        net_return (float): The net return.
        v_t (float): The updated portfolio value.
        strategy_name (str): The identifier of the strategy for logging context.

    Returns:
        Tuple[float, float]: The validated net return and updated portfolio value.

    Raises:
        ValueError: If the portfolio value drops to zero or becomes infinite.
    """
    # Assert that the portfolio value is a finite real number
    if not math.isfinite(v_t):
        raise ValueError(f"Wealth accounting violation: Portfolio value became infinite or NaN for {strategy_name}.")

    # Assert the survival of the portfolio
    # A value <= 0 indicates total loss of capital, invalidating further backtesting
    if v_t <= 0.0:
        raise ValueError(f"Wealth accounting violation: Portfolio wiped out (V_t <= 0) for {strategy_name}. "
                         f"V_t = {v_t:.2f}")

    # Log the complete state transition
    logger.debug(f"[{strategy_name}] Wealth Update | Prev V: ${v_t_minus_1:,.2f} | "
                 f"Gross: {gross_return:.4%} | TO: {turnover:.4f} | "
                 f"Net: {net_return:.4%} | New V: ${v_t:,.2f}")

    return net_return, v_t

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_net_return_and_value(gross_return: float, turnover: float, v_t_minus_1: float,
                                 config: Dict[str, Any], strategy_name: str = "UNKNOWN") -> Tuple[float, float]:
    """
    Orchestrates the computation of the net portfolio return and the updated wealth path.

    This function acts as the final accounting step for a single rebalance period. It
    applies the configured transaction cost model to the gross return based on realized
    turnover, geometrically compounds the previous portfolio value, and rigorously
    validates the survival and mathematical integrity of the resulting wealth state.

    Args:
        gross_return (float): The scalar gross portfolio return.
        turnover (float): The scalar portfolio turnover.
        v_t_minus_1 (float): The portfolio value at the end of the previous period.
        config (Dict[str, Any]): The validated master study configuration dictionary.
        strategy_name (str): The identifier of the strategy (e.g., 'QAOA_XY', 'SA').

    Returns:
        Tuple[float, float]:
            The validated net return r^{net}_t and the updated portfolio value V_t.

    Raises:
        RuntimeError: If the wealth accounting computations fail due to invalid inputs or state violations.
    """
    try:
        # Execute Step 1: Compute net return by applying transaction costs
        net_return = _compute_net_return(gross_return, turnover, config)

        # Execute Step 2: Update portfolio value via multiplicative compounding
        v_t = _update_portfolio_value(v_t_minus_1, net_return)

        # Execute Step 3: Validate the new wealth state and log telemetry
        validated_net_return, validated_v_t = _validate_and_log_wealth_state(
            v_t_minus_1, gross_return, turnover, net_return, v_t, strategy_name
        )

        return validated_net_return, validated_v_t

    except Exception as e:
        # Catch any unexpected exceptions and raise a fatal error, as accounting failure invalidates the backtest
        logger.error(f"Fatal error during wealth accounting for {strategy_name}: {e}")
        raise RuntimeError(f"Net return and value computation failed: {e}") from e


In [None]:
# Task 24 — Build evaluation metrics and depth-scaling diagnostics callable (compute_all_metrics)

# ==============================================================================
# Task 24: Build evaluation metrics and depth-scaling diagnostics callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 1: Compute Total Return and Annualized Volatility from monthly net return series and value path.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_return_and_volatility(net_returns: np.ndarray, value_path: np.ndarray, periods_per_year: int) -> Tuple[float, float]:
    """
    Computes the Total Return and Annualized Volatility for a given strategy.

    This function strictly implements the manuscript's performance equations. It
    enforces the use of the sample standard deviation (ddof=1) to provide an unbiased
    estimator of volatility, preventing silent drift caused by default library behaviors.

    Args:
        net_returns (np.ndarray): The 1D array of monthly net returns (length 12).
        value_path (np.ndarray): The 1D array of portfolio values (length 13, including V_0).
        periods_per_year (int): The annualization scalar (e.g., 12 for monthly).

    Returns:
        Tuple[float, float]: The Total Return and Annualized Volatility.

    Raises:
        ValueError: If the input arrays have incorrect lengths or if V_0 is zero.
    """
    # Assert the expected lengths for a 1-year monthly backtest
    if len(net_returns) != 12:
        raise ValueError(f"Metrics computation failed: Expected exactly 12 net returns, got {len(net_returns)}.")
    if len(value_path) != 13:
        raise ValueError(f"Metrics computation failed: Expected exactly 13 value path entries, got {len(value_path)}.")

    v_initial = value_path[0]
    v_final = value_path[-1]

    # Defensive check against divide-by-zero
    if v_initial <= 0.0:
        raise ValueError(f"Metrics computation failed: Initial portfolio value must be strictly positive, got {v_initial}.")

    # Compute Total Return: R_tot = (V_final / V_initial) - 1
    total_return = (v_final / v_initial) - 1.0

    # Compute Annualized Volatility: sigma_ann = std(returns) * sqrt(12)
    # We explicitly pin ddof=1 to use the unbiased sample standard deviation
    monthly_volatility = np.std(net_returns, ddof=1)
    annualized_volatility = monthly_volatility * np.sqrt(periods_per_year)

    return float(total_return), float(annualized_volatility)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 2: Compute Sharpe Ratio (manuscript policy) and Maximum Drawdown from the value path.
# -------------------------------------------------------------------------------------------------------------------------------

def _compute_sharpe_and_drawdown(net_returns: np.ndarray, value_path: np.ndarray,
                                 annualized_volatility: float, periods_per_year: int) -> Tuple[float, float]:
    """
    Computes the Sharpe Ratio and Maximum Drawdown for a given strategy.

    This function strictly implements the pinned Sharpe Ratio formula from the manuscript
    and calculates the exact peak-to-trough Maximum Drawdown using an optimized running
    maximum algorithm.

    Args:
        net_returns (np.ndarray): The 1D array of monthly net returns.
        value_path (np.ndarray): The 1D array of portfolio values.
        annualized_volatility (float): The previously computed annualized volatility.
        periods_per_year (int): The annualization scalar.

    Returns:
        Tuple[float, float]: The Sharpe Ratio and Maximum Drawdown.
    """
    # Compute Sharpe Ratio: SR = (mean(returns) * 12) / sigma_ann
    # Handle the edge case where volatility is zero (e.g., cash portfolio)
    if annualized_volatility <= 1e-12:
        logger.warning("Annualized volatility is near zero. Sharpe Ratio is undefined; returning NaN.")
        sharpe_ratio = float('nan')
    else:
        mean_monthly_return = np.mean(net_returns)
        sharpe_ratio = (mean_monthly_return * periods_per_year) / annualized_volatility

    # Compute Maximum Drawdown: MDD = min( (V_t / max_{s<=t} V_s) - 1 )
    # np.maximum.accumulate efficiently computes the running maximum of the value path
    running_max = np.maximum.accumulate(value_path)

    # Compute the pointwise drawdown at each step
    # We suppress divide-by-zero warnings here as V_t > 0 is guaranteed by Task 23
    with np.errstate(divide='ignore', invalid='ignore'):
        drawdowns = (value_path / running_max) - 1.0

    # The Maximum Drawdown is the most negative value in the drawdown array
    max_drawdown = np.min(drawdowns)

    return float(sharpe_ratio), float(max_drawdown)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 3: Assemble depth-scaling diagnostics table using a pinned aggregation rule across months.
# -------------------------------------------------------------------------------------------------------------------------------

def _aggregate_depth_diagnostics(raw_diagnostics: List[List[Dict[str, Any]]], qaoa_depths: List[int]) -> List[Dict[str, Any]]:
    """
    Aggregates the monthly QAOA depth diagnostics into a single Table II-grade summary.

    This function implements the pinned aggregation rule: for each depth p, it computes
    the arithmetic mean of the final expected cost, iterations, and gradient norm across
    all months where that depth successfully produced a candidate. It explicitly records
    failure counts to prevent survivorship bias in the reporting.

    Args:
        raw_diagnostics (List[List[Dict[str, Any]]]): The nested list of monthly diagnostics [month][depth].
        qaoa_depths (List[int]): The configured list of circuit depths to aggregate.

    Returns:
        List[Dict[str, Any]]: The aggregated depth-scaling table (one dict per depth).
    """
    aggregated_table: List[Dict[str, Any]] = []

    for p in qaoa_depths:
        costs = []
        iterations = []
        grad_norms = []
        failure_count = 0

        # Iterate through all 12 months
        for month_idx, month_records in enumerate(raw_diagnostics):
            # Find the record corresponding to depth p for this month
            record_p = next((r for r in month_records if r["p"] == p), None)

            if record_p is None or record_p.get("best_candidate_x") is None:
                failure_count += 1
                continue

            costs.append(record_p["final_expected_cost"])
            iterations.append(record_p["iterations_to_convergence"])

            # Handle potential NaN gradient norms gracefully
            gn = record_p["final_gradient_norm"]
            if not math.isnan(gn):
                grad_norms.append(gn)

        # Compute the arithmetic means, handling the case where all months failed
        if len(costs) > 0:
            mean_cost = float(np.mean(costs))
            mean_iters = float(np.mean(iterations))
            mean_grad = float(np.mean(grad_norms)) if grad_norms else float('nan')
        else:
            mean_cost = float('nan')
            mean_iters = float('nan')
            mean_grad = float('nan')

        aggregated_table.append({
            "Depth (p)": p,
            "Final Cost (Mean)": mean_cost,
            "Iter. to Conv. (Mean)": mean_iters,
            "Gradient Norm (Mean)": mean_grad,
            "Months Failed": failure_count
        })

    return aggregated_table

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_all_metrics(strategy_time_series: Dict[str, Dict[str, np.ndarray]],
                        raw_qaoa_diagnostics: List[List[Dict[str, Any]]],
                        config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the computation of all financial evaluation metrics and quantum diagnostics.

    This function processes the accumulated time-series data for all strategies (QAOA_XY, SA, HRP)
    at the conclusion of the walk-forward backtest. It computes the canonical performance
    metrics (Total Return, Volatility, Sharpe, MDD, Average Turnover) and aggregates the
    QAOA depth-scaling telemetry into structured tables ready for persistence and visualization.

    Args:
        strategy_time_series (Dict[str, Dict[str, np.ndarray]]): A dictionary mapping strategy names
            to their respective time-series arrays ('net_returns', 'value_path', 'turnovers').
        raw_qaoa_diagnostics (List[List[Dict[str, Any]]]): The nested list of monthly QAOA diagnostics.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Dict[str, Any]: A comprehensive results dictionary containing the metrics table and depth table.
    """
    logger.info("Commencing final evaluation metrics and diagnostics computation.")

    periods_per_year = config["evaluation_metrics"]["periods_per_year"]
    qaoa_depths = config["qaoa_architecture"]["qaoa_depths_p"]

    final_results: Dict[str, Any] = {
        "financial_metrics": {},
        "depth_scaling_table": []
    }

    # Process financial metrics for each strategy
    for strategy_name, series in strategy_time_series.items():
        net_returns = series["net_returns"]
        value_path = series["value_path"]
        turnovers = series["turnovers"]

        # Execute Step 1: Compute Return and Volatility
        tot_ret, ann_vol = _compute_return_and_volatility(net_returns, value_path, periods_per_year)

        # Execute Step 2: Compute Sharpe and Drawdown
        sharpe, mdd = _compute_sharpe_and_drawdown(net_returns, value_path, ann_vol, periods_per_year)

        # Compute Average Monthly Turnover
        avg_turnover = float(np.mean(turnovers))

        # Store the metrics in the final results dictionary
        final_results["financial_metrics"][strategy_name] = {
            "Total Return": tot_ret,
            "Annualized Volatility": ann_vol,
            "Sharpe Ratio": sharpe,
            "Max Drawdown": mdd,
            "Average Monthly Turnover": avg_turnover
        }

        logger.info(f"Metrics for {strategy_name}: SR={sharpe:.2f}, Ret={tot_ret:.2%}, MDD={mdd:.2%}, TO={avg_turnover:.2%}")

    # Execute Step 3: Assemble the depth-scaling diagnostics table
    if raw_qaoa_diagnostics:
        depth_table = _aggregate_depth_diagnostics(raw_qaoa_diagnostics, qaoa_depths)
        final_results["depth_scaling_table"] = depth_table
        logger.info("Depth-scaling diagnostics table aggregated successfully.")
    else:
        logger.warning("No raw QAOA diagnostics provided; skipping depth-scaling table aggregation.")

    return final_results


In [None]:
# Task 25 — Build artifact persistence callable (persist_artifacts)

# ==============================================================================
# Task 25: Build artifact persistence callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 1: Persist raw and derived data artifacts with drift-resistant formats and deterministic naming.
# -------------------------------------------------------------------------------------------------------------------------------

def _persist_data_artifacts(base_dir: Path, cleaned_price_df: pd.DataFrame, rebalance_dates: Tuple[pd.Timestamp, ...],
                            mu_history: List[np.ndarray], sigma_history: List[np.ndarray]) -> List[str]:
    """
    Serializes the foundational data artifacts to disk using exact-precision formats.

    This function writes the cleansed price matrix to Parquet (preserving float64 and DatetimeIndex),
    the rebalance schedule to JSON, and the econometric estimators to binary .npy files.
    This guarantees that the exact inputs to the solvers can be reloaded without numerical drift.

    Args:
        base_dir (Path): The root directory for the artifacts.
        cleaned_price_df (pd.DataFrame): The canonical price matrix.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        mu_history (List[np.ndarray]): The chronological list of annualized expected return vectors.
        sigma_history (List[np.ndarray]): The chronological list of annualized covariance matrices.

    Returns:
        List[str]: A list of absolute file paths written to disk.
    """
    written_files: List[str] = []
    data_dir = base_dir / "data"
    data_dir.mkdir(parents=True, exist_ok=True)

    # 1. Persist the cleansed price matrix as Parquet
    price_path = data_dir / "cleaned_prices.parquet"
    cleaned_price_df.to_parquet(price_path, engine="pyarrow")
    written_files.append(str(price_path))

    # 2. Persist the rebalance dates as ISO 8601 strings in JSON
    dates_path = data_dir / "rebalance_dates.json"
    iso_dates = [d.isoformat() for d in rebalance_dates]
    with open(dates_path, "w") as f:
        json.dump(iso_dates, f, indent=4)
    written_files.append(str(dates_path))

    # 3. Persist the econometric estimators as binary numpy arrays
    est_dir = data_dir / "estimators"
    est_dir.mkdir(parents=True, exist_ok=True)

    for i, date in enumerate(rebalance_dates):
        date_str = date.date().isoformat()

        mu_path = est_dir / f"mu_{date_str}.npy"
        np.save(mu_path, mu_history[i])
        written_files.append(str(mu_path))

        sigma_path = est_dir / f"sigma_{date_str}.npy"
        np.save(sigma_path, sigma_history[i])
        written_files.append(str(sigma_path))

    logger.debug(f"Persisted {len(written_files)} data artifacts to {data_dir}.")
    return written_files

# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 2: Persist per-strategy decision artifacts: selections, QAOA depth scans, weights, and fallback log.
# -------------------------------------------------------------------------------------------------------------------------------

def _persist_decision_artifacts(base_dir: Path, rebalance_dates: Tuple[pd.Timestamp, ...], universe: List[str],
                                selections: Dict[str, List[np.ndarray]], weights: Dict[str, List[np.ndarray]],
                                qaoa_diagnostics: List[List[Dict[str, Any]]], fallback_log: List[Dict[str, Any]]) -> List[str]:
    """
    Serializes the algorithmic decision outputs and quantum telemetry to disk.

    This function writes the discrete selections as binary arrays, the continuous weights
    as labeled Parquet DataFrames, and the highly nested QAOA depth-scan diagnostics as JSON.
    This provides the complete audit trail required to reconstruct the portfolio evolution.

    Args:
        base_dir (Path): The root directory for the artifacts.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        universe (List[str]): The canonical list of asset tickers.
        selections (Dict[str, List[np.ndarray]]): The binary selection vectors per strategy.
        weights (Dict[str, List[np.ndarray]]): The continuous weight vectors per strategy.
        qaoa_diagnostics (List[List[Dict[str, Any]]]): The nested QAOA telemetry.
        fallback_log (List[Dict[str, Any]]): The record of HRP fallback events.

    Returns:
        List[str]: A list of absolute file paths written to disk.
    """
    written_files: List[str] = []
    decisions_dir = base_dir / "decisions"
    decisions_dir.mkdir(parents=True, exist_ok=True)

    date_index = pd.DatetimeIndex(rebalance_dates)

    # 1. Persist Selections and Weights per strategy
    for strategy in ["QAOA_XY", "SA", "HRP"]:
        strat_dir = decisions_dir / strategy
        strat_dir.mkdir(parents=True, exist_ok=True)

        # HRP does not have discrete selections
        if strategy in selections and selections[strategy]:
            sel_matrix = np.vstack(selections[strategy])
            sel_df = pd.DataFrame(sel_matrix, index=date_index, columns=universe)
            sel_path = strat_dir / "selections.parquet"
            sel_df.to_parquet(sel_path, engine="pyarrow")
            written_files.append(str(sel_path))

        if strategy in weights and weights[strategy]:
            w_matrix = np.vstack(weights[strategy])
            w_df = pd.DataFrame(w_matrix, index=date_index, columns=universe)
            w_path = strat_dir / "weights.parquet"
            w_df.to_parquet(w_path, engine="pyarrow")
            written_files.append(str(w_path))

    # 2. Persist QAOA Depth Diagnostics
    if qaoa_diagnostics:
        qaoa_dir = decisions_dir / "QAOA_XY" / "depth_scans"
        qaoa_dir.mkdir(parents=True, exist_ok=True)

        for i, date in enumerate(rebalance_dates):
            date_str = date.date().isoformat()
            diag_path = qaoa_dir / f"scan_{date_str}.json"
            with open(diag_path, "w") as f:
                # The arrays inside the dicts were converted to lists in Task 18
                json.dump(qaoa_diagnostics[i], f, indent=4)
            written_files.append(str(diag_path))

    # 3. Persist Fallback Log
    fallback_path = decisions_dir / "fallback_log.json"
    with open(fallback_path, "w") as f:
        json.dump(fallback_log, f, indent=4)
    written_files.append(str(fallback_path))

    logger.debug(f"Persisted decision artifacts to {decisions_dir}.")
    return written_files

# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 3: Persist evaluation outputs and software version manifest.
# -------------------------------------------------------------------------------------------------------------------------------

def _persist_evaluation_and_manifest(base_dir: Path, final_results: Dict[str, Any],
                                     strategy_time_series: Dict[str, Dict[str, np.ndarray]],
                                     rebalance_dates: Tuple[pd.Timestamp, ...], exit_date: pd.Timestamp) -> List[str]:
    """
    Serializes the final performance metrics, time-series, and the software environment manifest.

    This function writes the human-readable CSV tables for the manuscript (Table II, Table III),
    the exact time-series DataFrames for plotting, and a JSON manifest capturing the exact
    versions of the critical libraries used, fulfilling the determinism contract.

    Args:
        base_dir (Path): The root directory for the artifacts.
        final_results (Dict[str, Any]): The aggregated metrics and depth tables.
        strategy_time_series (Dict[str, Dict[str, np.ndarray]]): The raw performance arrays.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        exit_date (pd.Timestamp): The final date for the value path.

    Returns:
        List[str]: A list of absolute file paths written to disk.
    """
    written_files: List[str] = []
    eval_dir = base_dir / "evaluation"
    eval_dir.mkdir(parents=True, exist_ok=True)

    # 1. Persist Financial Metrics Table (Table III)
    metrics_df = pd.DataFrame.from_dict(final_results["financial_metrics"], orient="index")
    metrics_path = eval_dir / "financial_metrics.csv"
    metrics_df.to_csv(metrics_path)
    written_files.append(str(metrics_path))

    # 2. Persist Depth Scaling Table (Table II)
    if final_results.get("depth_scaling_table"):
        depth_df = pd.DataFrame(final_results["depth_scaling_table"])
        depth_path = eval_dir / "depth_scaling_diagnostics.csv"
        depth_df.to_csv(depth_path, index=False)
        written_files.append(str(depth_path))

    # 3. Persist Time-Series Data
    ts_dir = eval_dir / "time_series"
    ts_dir.mkdir(parents=True, exist_ok=True)

    # The value path has 13 entries (including V_0), returns/turnovers have 12
    # We align them by padding the first month of returns/turnovers with NaN
    value_index = pd.DatetimeIndex(list(rebalance_dates) + [exit_date])

    for strategy, series in strategy_time_series.items():
        ts_df = pd.DataFrame({
            "Value_Path": series["value_path"],
            "Net_Return": np.insert(series["net_returns"], 0, np.nan),
            "Turnover": np.insert(series["turnovers"], 0, np.nan)
        }, index=value_index)

        ts_path = ts_dir / f"{strategy}_timeseries.parquet"
        ts_df.to_parquet(ts_path, engine="pyarrow")
        written_files.append(str(ts_path))

    # 4. Generate and Persist Software Manifest
    manifest = {
        "python_version": platform.python_version(),
        "os": platform.system(),
        "libraries": {}
    }

    # Create list of critical libraries
    critical_libs = ["numpy", "pandas", "scipy", "pennylane", "dimod", "dwave-neal", "PyPortfolioOpt"]
    for lib in critical_libs:
        try:
            manifest["libraries"][lib] = importlib.metadata.version(lib)
        except importlib.metadata.PackageNotFoundError:
            manifest["libraries"][lib] = "NOT_FOUND"

    manifest_path = base_dir / "software_manifest.json"
    with open(manifest_path, "w") as f:
        json.dump(manifest, f, indent=4)
    written_files.append(str(manifest_path))

    logger.debug(f"Persisted evaluation artifacts and manifest to {eval_dir}.")
    return written_files

# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def persist_artifacts(base_dir_str: str, cleaned_price_df: pd.DataFrame, rebalance_dates: Tuple[pd.Timestamp, ...],
                      exit_date: pd.Timestamp, mu_history: List[np.ndarray], sigma_history: List[np.ndarray],
                      selections: Dict[str, List[np.ndarray]], weights: Dict[str, List[np.ndarray]],
                      qaoa_diagnostics: List[List[Dict[str, Any]]], fallback_log: List[Dict[str, Any]],
                      strategy_time_series: Dict[str, Dict[str, np.ndarray]], final_results: Dict[str, Any],
                      config: Dict[str, Any]) -> List[str]:
    """
    Orchestrates the comprehensive serialization of all study artifacts to disk.

    This function freezes the entire state of the experiment. It writes the raw data,
    the intermediate econometric parameters, the algorithmic decisions (selections and weights),
    the quantum telemetry, the final performance metrics, and the software environment
    manifest into a deterministic, highly structured directory tree. This guarantees
    institutional-grade reproducibility.

    Args:
        base_dir_str (str): The root directory path for the artifacts.
        cleaned_price_df (pd.DataFrame): The canonical price matrix.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        exit_date (pd.Timestamp): The final date for the value path.
        mu_history (List[np.ndarray]): The chronological list of expected return vectors.
        sigma_history (List[np.ndarray]): The chronological list of covariance matrices.
        selections (Dict[str, List[np.ndarray]]): The binary selection vectors per strategy.
        weights (Dict[str, List[np.ndarray]]): The continuous weight vectors per strategy.
        qaoa_diagnostics (List[List[Dict[str, Any]]]): The nested QAOA telemetry.
        fallback_log (List[Dict[str, Any]]): The record of HRP fallback events.
        strategy_time_series (Dict[str, Dict[str, np.ndarray]]): The raw performance arrays.
        final_results (Dict[str, Any]): The aggregated metrics and depth tables.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        List[str]: A comprehensive list of all absolute file paths written to disk.
    """
    logger.info(f"Commencing artifact persistence to root directory: {base_dir_str}")
    base_dir = Path(base_dir_str)
    base_dir.mkdir(parents=True, exist_ok=True)

    all_written_files: List[str] = []

    try:
        # Persist the master configuration itself
        config_path = base_dir / "master_configuration.json"
        with open(config_path, "w") as f:
            json.dump(config, f, indent=4)
        all_written_files.append(str(config_path))

        # Execute Step 1: Persist Data Artifacts
        data_files = _persist_data_artifacts(base_dir, cleaned_price_df, rebalance_dates, mu_history, sigma_history)
        all_written_files.extend(data_files)

        # Execute Step 2: Persist Decision Artifacts
        universe = config["global_setup"]["universe"]
        decision_files = _persist_decision_artifacts(base_dir, rebalance_dates, universe, selections, weights, qaoa_diagnostics, fallback_log)
        all_written_files.extend(decision_files)

        # Execute Step 3: Persist Evaluation Outputs and Manifest
        eval_files = _persist_evaluation_and_manifest(base_dir, final_results, strategy_time_series, rebalance_dates, exit_date)
        all_written_files.extend(eval_files)

        logger.info(f"Artifact persistence completed successfully. Wrote {len(all_written_files)} files.")
        return all_written_files

    except Exception as e:
        logger.error(f"Fatal error during artifact persistence: {e}")
        raise RuntimeError(f"Failed to persist artifacts: {e}") from e


In [None]:
# Task 26 — Build end-to-end orchestrator callable (run_full_backtest)

# ==============================================================================
# Task 26: Build end-to-end orchestrator callable
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 1: Initialize state, seeds, and reusable quantum artifacts exactly once.
# -------------------------------------------------------------------------------------------------------------------------------

def _initialize_backtest_state(config: Dict[str, Any]) -> Tuple[Dict[str, Any], np.ndarray, Any]:
    """
    Initializes the random seeds, quantum invariants, and tracking state containers.

    This function enforces the determinism contract by setting global seeds. It computes
    the Dicke state vector and the XY mixer Hamiltonian exactly once, as they are invariant
    across the walk-forward loop. It initializes the isolated state tracking dictionaries
    for each strategy to prevent cross-contamination.

    Args:
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        Tuple[Dict[str, Any], np.ndarray, Any]:
            The state tracking dictionary, the Dicke statevector, and the XY mixer Hamiltonian.
    """
    # Enforce the determinism contract
    global_seed = config["randomness"]["global_seed"]
    random.seed(global_seed)
    np.random.seed(global_seed)
    logger.info(f"Global random seeds set to {global_seed}.")

    N = len(config["global_setup"]["universe"])
    K = config["objective_function"]["cardinality_K"]
    initial_capital = config["capital_and_bounds"]["initial_capital"]

    # Construct the quantum invariants (built once, reused across all months)
    dicke_vector = prepare_dicke_state_vector(N, K)
    h_xy = build_xy_mixer_hamiltonian(N, config)
    logger.info("Quantum invariants (Dicke state, XY mixer) initialized.")

    # Initialize isolated state containers for each strategy
    strategies = ["QAOA_XY", "SA", "HRP"]
    state: Dict[str, Any] = {
        "weights": {s: [] for s in strategies},
        "selections": {s: [] for s in ["QAOA_XY", "SA"]}, # HRP does not have discrete selections
        "net_returns": {s: [] for s in strategies},
        "turnovers": {s: [] for s in strategies},
        "value_paths": {s: [initial_capital] for s in strategies},

        # Current state variables (updated iteratively)
        "current_weights": {s: np.zeros(N, dtype=np.float64) for s in strategies},
        "current_selections": {s: None for s in ["QAOA_XY", "SA"]},

        # Diagnostic tracking
        "qaoa_diagnostics": [],
        "fallback_log": [],
        "mu_history": [],
        "sigma_history": []
    }

    return state, dicke_vector, h_xy

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 2: Execute the monthly loop exactly as Algorithm 1.
# -------------------------------------------------------------------------------------------------------------------------------

def _execute_walk_forward_loop(cleaned_price_df: pd.DataFrame, rebalance_dates: Tuple[pd.Timestamp, ...],
                               state: Dict[str, Any], dicke_vector: np.ndarray, h_xy: Any,
                               config: Dict[str, Any]) -> pd.Timestamp:
    """
    Executes the core monthly walk-forward backtest loop defined in Algorithm 1.

    This function orchestrates the sequential execution of data extraction, econometric
    estimation, discrete selection, continuous allocation, and performance accounting.
    It strictly enforces causality and updates the isolated state containers iteratively.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical price matrix.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        state (Dict[str, Any]): The initialized state tracking dictionary.
        dicke_vector (np.ndarray): The canonical initial statevector.
        h_xy (Any): The constraint-preserving XY mixer Hamiltonian.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Returns:
        pd.Timestamp: The final exit date (t^+) used for the last holding period.
    """
    L = config["global_setup"]["lookback_window_L"]
    universe = config["global_setup"]["universe"]
    N = len(universe)

    final_exit_date = None

    for m, t in enumerate(rebalance_dates):
        logger.info(f"=== Executing Rebalance Month {m+1}/12: {t.date()} ===")

        t_plus = rebalance_dates[m+1] if m < len(rebalance_dates) - 1 else None

        # 1. Data Extraction & Econometrics
        window_df = extract_lookback_window(cleaned_price_df, t, L, universe)
        returns_df = compute_daily_log_returns(window_df, config)
        mu_ann = estimate_mu_annualized(returns_df, config)
        sigma_ann = estimate_sigma_annualized(returns_df, config)

        state["mu_history"].append(mu_ann)
        state["sigma_history"].append(sigma_ann)

        # =========================================================================================
        # NOTE ON CONTINUITY STATE AND FALLBACKS:
        # The continuity indicator (s_prev) is derived from the prior month's discrete selection.
        # If a solver (e.g., QAOA) failed in month t-1 and fell back to HRP, its selection
        # state is None. Consequently, s_prev for month t will correctly initialize to a
        # zero vector, meaning no continuity bonus is applied. This strictly adheres to the
        # manuscript's logic: you cannot receive a bonus for holding an asset if the discrete
        # solver failed to select a valid subset in the prior period.
        # =========================================================================================

        # 2. Continuity State Updates
        s_prev_sa = update_continuity_state(state["current_selections"]["SA"], config, "SA")
        s_prev_qaoa = update_continuity_state(state["current_selections"]["QAOA_XY"], config, "QAOA_XY")

        # 3. Simulated Annealing (SA) Selection
        qubo = build_qubo_matrix(mu_ann, sigma_ann, config, s_prev_sa)
        sa_samples = run_sa_sampling(qubo, N, config)
        x_sa = filter_sa_samples(sa_samples, config)
        state["current_selections"]["SA"] = x_sa
        if x_sa is not None:
            state["selections"]["SA"].append(x_sa)

        # 4. QAOA-XY Selection
        h_c = build_cost_hamiltonian(mu_ann, sigma_ann, config, s_prev_qaoa)
        x_qaoa, qaoa_diags = qaoa_select_best_across_depths(dicke_vector, h_c, h_xy, mu_ann, sigma_ann, config)
        state["current_selections"]["QAOA_XY"] = x_qaoa
        state["qaoa_diagnostics"].append(qaoa_diags)
        if x_qaoa is not None:
            state["selections"]["QAOA_XY"].append(x_qaoa)

        # 5. Allocation & Accounting for each strategy
        for strategy in ["QAOA_XY", "SA", "HRP"]:
            if strategy == "HRP":
                w_t = compute_hrp_weights(returns_df, config)
                fallback_used = False
            else:
                x_t = state["current_selections"][strategy]
                w_t, fallback_used = dispatch_allocation(x_t, mu_ann, sigma_ann, returns_df, config, strategy, t)
                if fallback_used:
                    state["fallback_log"].append({"month": t.date().isoformat(), "strategy": strategy})

            state["weights"][strategy].append(w_t)

            w_t_minus_1 = state["current_weights"][strategy]
            gross_ret, asset_rets, turnover = compute_holding_returns_and_turnover(cleaned_price_df, t, t_plus, w_t, w_t_minus_1, config)

            v_t_minus_1 = state["value_paths"][strategy][-1]
            net_ret, v_t = compute_net_return_and_value(gross_return=gross_ret, turnover=turnover, v_t_minus_1=v_t_minus_1, config=config, strategy_name=strategy)

            state["current_weights"][strategy] = w_t
            state["net_returns"][strategy].append(net_ret)
            state["turnovers"][strategy].append(turnover)
            state["value_paths"][strategy].append(v_t)

        if t_plus is None:
            target_tz = cleaned_price_df.index.tz
            next_year_start = pd.Timestamp(year=config["global_setup"]["backtest_year"] + 1, month=1, day=1)
            next_year_start = _align_timestamp_timezone(next_year_start, target_tz)
            valid_exit_dates = cleaned_price_df.index[cleaned_price_df.index >= next_year_start]
            final_exit_date = valid_exit_dates[0]

    return final_exit_date

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 3: Finalize: compute metrics, persist artifacts, and return full results bundle.
# -------------------------------------------------------------------------------------------------------------------------------

def _finalize_and_persist(state: Dict[str, Any], cleaned_price_df: pd.DataFrame, rebalance_dates: Tuple[pd.Timestamp, ...],
                          final_exit_date: pd.Timestamp, config: Dict[str, Any], output_dir: str) -> Dict[str, Any]:
    """
    Finalizes the backtest by computing aggregate metrics and persisting all artifacts.

    This function packages the raw chronological lists into numpy arrays, invokes the
    metrics calculator (Task 24) to generate the final performance tables, and calls
    the persistence layer (Task 25) to freeze the experiment's state to disk.

    Args:
        state (Dict[str, Any]): The populated state tracking dictionary.
        cleaned_price_df (pd.DataFrame): The canonical price matrix.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        final_exit_date (pd.Timestamp): The final date for the value path.
        config (Dict[str, Any]): The validated master study configuration dictionary.
        output_dir (str): The target directory for artifact persistence.

    Returns:
        Dict[str, Any]: The comprehensive results bundle.
    """
    # Package time-series data for the metrics calculator
    strategy_time_series = {}
    for strategy in ["QAOA_XY", "SA", "HRP"]:
        strategy_time_series[strategy] = {
            "net_returns": np.array(state["net_returns"][strategy]),
            "value_path": np.array(state["value_paths"][strategy]),
            "turnovers": np.array(state["turnovers"][strategy])
        }

    # Compute final metrics and depth-scaling tables
    final_results = compute_all_metrics(strategy_time_series, state["qaoa_diagnostics"], config)

    # Persist all artifacts to disk
    try:
        written_files = persist_artifacts(
            base_dir_str=output_dir,
            cleaned_price_df=cleaned_price_df,
            rebalance_dates=rebalance_dates,
            exit_date=final_exit_date,
            mu_history=state["mu_history"],
            sigma_history=state["sigma_history"],
            selections=state["selections"],
            weights=state["weights"],
            qaoa_diagnostics=state["qaoa_diagnostics"],
            fallback_log=state["fallback_log"],
            strategy_time_series=strategy_time_series,
            final_results=final_results,
            config=config
        )
        final_results["persisted_files"] = written_files
    except Exception as e:
        logger.error(f"Artifact persistence failed, but returning in-memory results. Error: {e}")
        final_results["persisted_files"] = []

    # Attach raw state for downstream programmatic access
    final_results["raw_state"] = state

    return final_results

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_full_backtest(cleaned_price_df: pd.DataFrame, rebalance_dates: Tuple[pd.Timestamp, ...],
                      config: Dict[str, Any], output_dir: str = "./backtest_results") -> Dict[str, Any]:
    """
    Orchestrates the end-to-end execution of the hybrid quantum-classical backtest.

    This is the master execution function. It initializes the simulation environment,
    executes the rigorous monthly walk-forward loop defined in Algorithm 1, computes
    the final institutional-grade performance metrics, and serializes the entire
    experimental state to disk for absolute reproducibility.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical, cleansed price matrix.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen, causally validated rebalance schedule.
        config (Dict[str, Any]): The validated master study configuration dictionary.
        output_dir (str): The target directory for artifact persistence.

    Returns:
        Dict[str, Any]: The comprehensive results bundle containing metrics, diagnostics, and raw state.

    Raises:
        RuntimeError: If any critical failure occurs during the walk-forward loop.
    """
    logger.info("=================================================================")
    logger.info("COMMENCING END-TO-END HYBRID QUANTUM-CLASSICAL BACKTEST")
    logger.info("=================================================================")

    try:
        # Execute Step 1: Initialize state, seeds, and quantum invariants
        state, dicke_vector, h_xy = _initialize_backtest_state(config)

        # Execute Step 2: Run the monthly walk-forward loop
        final_exit_date = _execute_walk_forward_loop(
            cleaned_price_df, rebalance_dates, state, dicke_vector, h_xy, config
        )

        # Execute Step 3: Finalize metrics and persist artifacts
        results_bundle = _finalize_and_persist(
            state, cleaned_price_df, rebalance_dates, final_exit_date, config, output_dir
        )

        logger.info("=================================================================")
        logger.info("BACKTEST COMPLETED SUCCESSFULLY")
        logger.info("=================================================================")

        return results_bundle

    except Exception as e:
        logger.error(f"FATAL ERROR: Backtest aborted due to unrecoverable exception: {e}")
        raise RuntimeError(f"End-to-end backtest failed: {e}") from e


In [None]:
# Task 27 — Execute final fidelity assertions (run_fidelity_assertions)

# ==============================================================================
# Task 27: Execute final fidelity assertions
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 1: Feasibility assertions: selections are exactly K-hot; allocations satisfy constraints.
# -------------------------------------------------------------------------------------------------------------------------------

def _assert_feasibility_invariants(raw_state: Dict[str, Any], config: Dict[str, Any]) -> None:
    """
    Mathematically asserts the feasibility invariants for all selections and allocations.

    This function iterates through the entire chronological state of the backtest. It
    verifies that the discrete solvers strictly obeyed the K-hot cardinality constraint,
    and that the continuous allocators strictly obeyed the sum-to-one and box constraints
    (within a microscopic floating-point tolerance).

    Args:
        raw_state (Dict[str, Any]): The raw state tracking dictionary from the orchestrator.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Raises:
        AssertionError: If any selection or weight vector violates the mathematical constraints.
    """
    K = config["objective_function"]["cardinality_K"]
    w_min, w_max = config["capital_and_bounds"]["allocation_bounds"]
    tol = 1e-8

    # 1. Assert QAOA and SA Selections are exactly K-hot
    for strategy in ["QAOA_XY", "SA"]:
        selections = raw_state["selections"][strategy]
        for m, x_t in enumerate(selections):
            if x_t is None:
                continue # Handled by fallback logic, but if present, must be valid

            actual_k = np.sum(x_t)
            if actual_k != K:
                raise AssertionError(f"Feasibility Violation: {strategy} selection at month {m+1} "
                                     f"has Hamming weight {actual_k}, expected {K}.")

    # 2. Assert Allocation Weights satisfy constraints
    for strategy in ["QAOA_XY", "SA", "HRP"]:
        weights = raw_state["weights"][strategy]
        for m, w_t in enumerate(weights):
            # Assert Sum-to-One
            weight_sum = np.sum(w_t)
            if not np.isclose(weight_sum, 1.0, atol=tol):
                raise AssertionError(f"Feasibility Violation: {strategy} weights at month {m+1} "
                                     f"sum to {weight_sum:.8f}, expected 1.0.")

            # Assert Non-negativity
            if np.any(w_t < -tol):
                raise AssertionError(f"Feasibility Violation: {strategy} weights at month {m+1} "
                                     f"contain negative values.")

            # Strategy-specific constraints (QAOA and SA only)
            if strategy in ["QAOA_XY", "SA"]:
                # Assert exactly K non-zero weights
                non_zeros = np.count_nonzero(w_t > tol)
                if non_zeros != K:
                    raise AssertionError(f"Feasibility Violation: {strategy} weights at month {m+1} "
                                         f"have {non_zeros} active assets, expected {K}.")

                # Assert Box Constraints for active assets
                active_weights = w_t[w_t > tol]
                if np.any(active_weights < w_min - tol) or np.any(active_weights > w_max + tol):
                    raise AssertionError(f"Feasibility Violation: {strategy} weights at month {m+1} "
                                         f"violate box bounds [{w_min}, {w_max}].")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 2: Causality assertions: estimation windows end strictly before rebalance timestamps.
# -------------------------------------------------------------------------------------------------------------------------------

def _assert_causality_invariants(cleaned_price_df: pd.DataFrame, rebalance_dates: Tuple[pd.Timestamp, ...],
                                 config: Dict[str, Any]) -> None:
    """
    Mathematically asserts the strict causality of the walk-forward methodology.

    This function verifies that for every rebalance date, the required historical
    lookback window exists entirely and strictly prior to the rebalance timestamp,
    guaranteeing zero look-ahead bias in the econometric estimation phase.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical price matrix.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Raises:
        AssertionError: If causality is violated or the schedule is not monotonic.
    """
    L = config["global_setup"]["lookback_window_L"]
    price_index = cleaned_price_df.index
    target_tz = price_index.tz

    # Assert monotonicity of the rebalance schedule
    for i in range(1, len(rebalance_dates)):
        if rebalance_dates[i] <= rebalance_dates[i-1]:
            raise AssertionError("Causality Violation: Rebalance dates are not strictly monotonically increasing.")

    # Assert strict causal boundary for each window
    for m, t in enumerate(rebalance_dates):
        t_aligned = t.tz_convert(target_tz) if t.tz is not None and target_tz is not None else (t.tz_localize(target_tz) if target_tz is not None else t.tz_localize(None))

        # The mask used in Task 5: tau < t
        causal_mask = price_index < t_aligned
        causal_history = price_index[causal_mask]

        if len(causal_history) < L:
            raise AssertionError(f"Causality Violation: Month {m+1} ({t.date()}) has only {len(causal_history)} "
                                 f"prior trading days, expected at least {L}.")

        # The last timestamp in the causal history MUST be strictly less than t
        window_end = causal_history[-1]
        if window_end >= t_aligned:
            raise AssertionError(f"FATAL Causality Violation: Window end {window_end} is >= rebalance date {t_aligned}.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 3: Determinism and completeness assertions: exactly 12 records, correct initial capital, complete metrics.
# -------------------------------------------------------------------------------------------------------------------------------

def _assert_completeness_invariants(results_bundle: Dict[str, Any], config: Dict[str, Any]) -> None:
    """
    Asserts the structural completeness and determinism of the final results bundle.

    This function verifies that the backtest produced exactly the expected number of
    monthly records, that the value paths originated from the correct initial capital,
    and that the final metrics and depth-scaling tables conform exactly to the schemas
    required for manuscript reproduction.

    Args:
        results_bundle (Dict[str, Any]): The comprehensive results dictionary from Task 26.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Raises:
        AssertionError: If any structural or completeness invariant is violated.
    """
    raw_state = results_bundle["raw_state"]
    metrics = results_bundle["financial_metrics"]
    depth_table = results_bundle.get("depth_scaling_table", [])

    initial_capital = config["capital_and_bounds"]["initial_capital"]
    expected_months = 12

    # 1. Assert Time-Series Completeness and Initial Capital
    for strategy in ["QAOA_XY", "SA", "HRP"]:
        net_returns = raw_state["net_returns"][strategy]
        value_path = raw_state["value_paths"][strategy]

        if len(net_returns) != expected_months:
            raise AssertionError(f"Completeness Violation: {strategy} has {len(net_returns)} return records, expected {expected_months}.")

        if len(value_path) != expected_months + 1:
            raise AssertionError(f"Completeness Violation: {strategy} has {len(value_path)} value path records, expected {expected_months + 1}.")

        if not math.isclose(value_path[0], initial_capital, abs_tol=1e-4):
            raise AssertionError(f"Completeness Violation: {strategy} initial capital is {value_path[0]}, expected {initial_capital}.")

    # 2. Assert Metrics Table Schema
    expected_metrics = ["Total Return", "Annualized Volatility", "Sharpe Ratio", "Max Drawdown", "Average Monthly Turnover"]
    for strategy in ["QAOA_XY", "SA", "HRP"]:
        if strategy not in metrics:
            raise AssertionError(f"Completeness Violation: Metrics missing for strategy {strategy}.")
        for metric in expected_metrics:
            if metric not in metrics[strategy]:
                raise AssertionError(f"Completeness Violation: Metric '{metric}' missing for strategy {strategy}.")

    # 3. Assert Depth-Scaling Table Schema
    if len(depth_table) != 6:
        raise AssertionError(f"Completeness Violation: Depth scaling table has {len(depth_table)} rows, expected exactly 6.")

    expected_depth_cols = ["Depth (p)", "Final Cost (Mean)", "Iter. to Conv. (Mean)", "Gradient Norm (Mean)", "Months Failed"]
    for row in depth_table:
        for col in expected_depth_cols:
            if col not in row:
                raise AssertionError(f"Completeness Violation: Depth table missing column '{col}'.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_fidelity_assertions(results_bundle: Dict[str, Any], cleaned_price_df: pd.DataFrame,
                            rebalance_dates: Tuple[pd.Timestamp, ...], config: Dict[str, Any]) -> None:
    """
    Orchestrates the final, rigorous fidelity assertions on the completed backtest.

    This function acts as the ultimate gatekeeper for institutional research validity.
    It mathematically proves that the entire walk-forward execution strictly obeyed
    the K-hot feasibility constraints, maintained absolute causality (zero look-ahead bias),
    and produced a structurally complete set of artifacts ready for publication.

    Args:
        results_bundle (Dict[str, Any]): The comprehensive results dictionary from Task 26.
        cleaned_price_df (pd.DataFrame): The canonical price matrix.
        rebalance_dates (Tuple[pd.Timestamp, ...]): The frozen rebalance schedule.
        config (Dict[str, Any]): The validated master study configuration dictionary.

    Raises:
        RuntimeError: If any fidelity assertion fails, invalidating the study.
    """
    logger.info("Commencing final institutional fidelity assertions.")

    try:
        # Execute Step 1: Assert Feasibility Invariants (K-hot, Bounds, Sum-to-One)
        _assert_feasibility_invariants(results_bundle["raw_state"], config)
        logger.info("Feasibility assertions PASSED.")

        # Execute Step 2: Assert Causality Invariants (Zero Look-Ahead Bias)
        _assert_causality_invariants(cleaned_price_df, rebalance_dates, config)
        logger.info("Causality assertions PASSED.")

        # Execute Step 3: Assert Completeness and Determinism Invariants
        _assert_completeness_invariants(results_bundle, config)
        logger.info("Completeness assertions PASSED.")

        logger.info("=================================================================")
        logger.info("ALL FIDELITY VERIFICATIONS PASSED. RESULTS ARE PUBLICATION-GRADE.")
        logger.info("=================================================================")

    except AssertionError as e:
        logger.error(f"FIDELITY VERIFICATION FAILED: {e}")
        raise RuntimeError(f"The backtest results are invalid due to a fidelity violation: {e}") from e


In [None]:
# Top-Level Orchestrator Callables

# ==============================================================================
# Top-Level Orchestrator: Execute Quantum-Hybrid Portfolio Optimization
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Top-Level Step 1: Execute Phase 1 (Data Engineering & Validation - Tasks 1 to 4)
# -------------------------------------------------------------------------------------------------------------------------------

def _execute_data_engineering_phase(raw_price_df: pd.DataFrame, raw_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Executes the rigorous data engineering and validation phase (Tasks 1-4).

    This function acts as the first major checkpoint in the pipeline. It strictly
    validates the configuration schema, verifies the integrity of the raw price data,
    applies the deterministic missing-data policy, and constructs the causal rebalance
    calendar. It guarantees that downstream solvers only operate on pristine data.

    Args:
        raw_price_df (pd.DataFrame): The raw, unvalidated time-series price matrix.
        raw_config (Dict[str, Any]): The raw, unvalidated master study configuration.

    Returns:
        Dict[str, Any]: A dictionary containing the validated configuration, the data
                        validation report, the cleansed price matrix, and the frozen schedule.

    Raises:
        ValueError: If any configuration, structural, or numerical invariant is violated.
        TypeError: If data types do not match the required schema.
    """
    logger.info("--- PHASE 1: DATA ENGINEERING & VALIDATION ---")

    # Task 1: Validate the master configuration dictionary
    validated_config = validate_master_config(raw_config)
    logger.info("Task 1 Complete: Master configuration validated.")

    # Task 2: Validate the raw price DataFrame schema and integrity
    data_validation_report = validate_raw_price_df(raw_price_df, validated_config)
    logger.info("Task 2 Complete: Raw price data integrity verified.")

    # Task 3: Cleanse and normalize the price data
    cleaned_price_df = cleanse_price_data(raw_price_df, validated_config)
    logger.info("Task 3 Complete: Price data cleansed and normalized.")

    # Task 4: Construct the deterministic, causal rebalance calendar
    rebalance_dates = build_rebalance_calendar(cleaned_price_df, validated_config)
    logger.info("Task 4 Complete: Causal rebalance calendar frozen.")

    return {
        "validated_config": validated_config,
        "data_validation_report": data_validation_report,
        "cleaned_price_df": cleaned_price_df,
        "rebalance_dates": rebalance_dates
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Top-Level Step 2: Execute Phase 2 (Backtest Execution & Fidelity Verification - Tasks 26 & 27)
# -------------------------------------------------------------------------------------------------------------------------------

def _execute_optimization_and_verification_phase(cleaned_price_df: pd.DataFrame,
                                                 rebalance_dates: tuple,
                                                 validated_config: Dict[str, Any],
                                                 output_dir: str) -> Dict[str, Any]:
    """
    Executes the hybrid quantum-classical backtest and mathematically verifies the results (Tasks 26-27).

    This function drives the core computational engine. It invokes the end-to-end walk-forward
    loop (Algorithm 1), which handles the QAOA-XY and SA optimizations, continuous weight
    allocations, and performance accounting. Immediately upon completion, it subjects the
    results to strict institutional fidelity assertions to prove constraint adherence and causality.

    Args:
        cleaned_price_df (pd.DataFrame): The canonical, cleansed price matrix.
        rebalance_dates (tuple): The frozen, causally validated rebalance schedule.
        validated_config (Dict[str, Any]): The validated master study configuration.
        output_dir (str): The target directory for artifact persistence.

    Returns:
        Dict[str, Any]: A dictionary containing the comprehensive results bundle and fidelity status.

    Raises:
        RuntimeError: If the backtest fails or if any fidelity assertion is violated.
    """
    logger.info("--- PHASE 2: HYBRID OPTIMIZATION & FIDELITY VERIFICATION ---")

    # Task 26: Execute the end-to-end walk-forward backtest and persist artifacts
    results_bundle = run_full_backtest(
        cleaned_price_df=cleaned_price_df,
        rebalance_dates=rebalance_dates,
        config=validated_config,
        output_dir=output_dir
    )
    logger.info("Task 26 Complete: End-to-end backtest executed and artifacts persisted.")

    # Task 27: Execute final institutional fidelity assertions
    run_fidelity_assertions(
        results_bundle=results_bundle,
        cleaned_price_df=cleaned_price_df,
        rebalance_dates=rebalance_dates,
        config=validated_config
    )
    logger.info("Task 27 Complete: All fidelity assertions passed. Results are mathematically sound.")

    return {
        "results_bundle": results_bundle,
        "fidelity_verified": True
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Top-Level Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def execute_quantum_hybrid_portfolio_optimization(raw_price_df: pd.DataFrame,
                                                  raw_config: Dict[str, Any],
                                                  output_dir: str = "./quantum_portfolio_artifacts") -> Dict[str, Any]:
    """
    The master entry point for the "Constrained Portfolio Optimization via QAOA-XY" study.

    This elite, implementation-grade orchestrator encapsulates the entire 27-task pipeline.
    It sequentially drives the data engineering, quantum circuit simulation, classical convex
    optimization, performance accounting, artifact persistence, and mathematical fidelity
    verification. It guarantees that the final output is a publication-ready, institutionally
    auditable representation of the hybrid quantum-classical methodology.

    Args:
        raw_price_df (pd.DataFrame): The raw, unvalidated time-series price matrix fetched from the data provider.
        raw_config (Dict[str, Any]): The raw, unvalidated master study configuration dictionary.
        output_dir (str, optional): The root directory where all serialized artifacts will be saved.
                                    Defaults to "./quantum_portfolio_artifacts".

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing all generated artifacts, including:
            - 'validated_config': The strictly typed configuration.
            - 'data_validation_report': Telemetry from the data cleansing phase.
            - 'cleaned_price_df': The canonical price matrix used for the study.
            - 'rebalance_dates': The frozen temporal schedule.
            - 'results_bundle': The final metrics, time-series, and quantum diagnostics.
            - 'fidelity_verified': Boolean confirming all mathematical constraints were upheld.

    Raises:
        ValueError: If data or configuration invariants are violated during Phase 1.
        TypeError: If data types are incorrect during Phase 1.
        RuntimeError: If the optimization loop fails or fidelity assertions are violated in Phase 2.
    """
    logger.info("===============================================================================")
    logger.info("INITIATING QUANTUM-ACCELERATED CONSTRAINED PORTFOLIO OPTIMIZATION PIPELINE")
    logger.info("===============================================================================")

    try:
        # Execute Top-Level Step 1: Data Engineering & Validation (Tasks 1-4)
        phase_1_artifacts = _execute_data_engineering_phase(raw_price_df, raw_config)

        # Extract the canonical artifacts required for Phase 2
        cleaned_price_df = phase_1_artifacts["cleaned_price_df"]
        rebalance_dates = phase_1_artifacts["rebalance_dates"]
        validated_config = phase_1_artifacts["validated_config"]

        # Execute Top-Level Step 2: Backtest Execution & Fidelity Verification (Tasks 26-27)
        phase_2_artifacts = _execute_optimization_and_verification_phase(
            cleaned_price_df=cleaned_price_df,
            rebalance_dates=rebalance_dates,
            validated_config=validated_config,
            output_dir=output_dir
        )

        # Compile the final comprehensive output dictionary
        final_output = {
            **phase_1_artifacts,
            **phase_2_artifacts
        }

        logger.info("===============================================================================")
        logger.info("PIPELINE EXECUTION COMPLETE. ALL ARTIFACTS GENERATED AND VERIFIED.")
        logger.info("===============================================================================")

        return final_output

    except Exception as e:
        # Catch any exception across the entire pipeline, log it as a critical failure, and re-raise
        logger.critical(f"PIPELINE ABORTED DUE TO FATAL ERROR: {e}")
        raise RuntimeError(f"Quantum-Hybrid Portfolio Optimization Pipeline failed: {e}") from e
