### **`README.md`**

# STRAPSim: A Portfolio Similarity Metric for ETF Alignment and Portfolio Trades

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![arXiv](https://img.shields.io/badge/arXiv-2509.24151-b31b1b.svg)](https://arxiv.org/abs/2509.24151)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Discipline](https://img.shields.io/badge/Discipline-Quantitative%20Finance-00529B)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Primary Data](https://img.shields.io/badge/Data-Corporate%20Bond%20ETF%20Holdings-lightgrey)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Ground Truth](https://img.shields.io/badge/Ground%20Truth-Monthly%20Total%20Returns-lightgrey)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Core Algorithm](https://img.shields.io/badge/Algorithm-STRAPSim-orange)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Similarity Method](https://img.shields.io/badge/Similarity-Random%20Forest%20Proximity-red)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Evaluation Metric](https://img.shields.io/badge/Evaluation-Spearman%20Rank%20Correlation-yellow)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Baselines](https://img.shields.io/badge/Baselines-Jaccard%20%7C%20BERTScore-blueviolet)](https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Scikit-learn](https://img.shields.io/badge/scikit--learn-%23F7931E.svg?style=flat&logo=scikit-learn&logoColor=white)](https://scikit-learn.org/)
[![Numba](https://img.shields.io/badge/Numba-00A3E0.svg?style=flat&logo=Numba&logoColor=white)](https://numba.pydata.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
--

**Repository:** `https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"STRAPSim: A Portfolio Similarity Metric for ETF Alignment and Portfolio Trades"** by:

*   Mingshu Li
*   Dhruv Desai
*   Jerinsh Jeyapaulraj
*   Philip Sommer
*   Riya Jain
*   Peter Chu
*   Dhagash Mehta

The project provides a complete, end-to-end computational framework for replicating the paper's novel portfolio similarity metric, STRAPSim. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous data validation and feature engineering, through the training of a supervised similarity model and the implementation of the core STRAPSim algorithm, to the final statistical evaluation and a comprehensive suite of robustness checks.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callables](#key-callables)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "STRAPSim: A Portfolio Similarity Metric for ETF Alignment and Portfolio Trades." The core of this repository is the iPython Notebook `strapsim_portfolio_similarity_metric_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation to the final generation of the summary evaluation table (Table 4) and a full suite of sensitivity analyses.

The paper introduces **STRAPSim (Semantic, Two-level, Residual-Aware Portfolio Similarity)**, a novel method for measuring the similarity between structured asset baskets like ETFs. It addresses the key limitations of traditional metrics by incorporating learned, constituent-level similarity and a dynamic, weight-aware matching process. This codebase operationalizes the paper's advanced approach, allowing users to:
-   Rigorously validate and cleanse institutional-grade fixed-income portfolio data.
-   Train a Random Forest model to learn a "semantic" similarity metric between individual bonds based on their financial characteristics.
-   Generate a pairwise proximity matrix for all bonds in the universe.
-   Compute the STRAPSim score between any two portfolios using the novel greedy, residual-aware matching algorithm.
-   Benchmark STRAPSim's performance against standard metrics (Jaccard, Weighted Jaccard) and an adapted BERTScore.
-   Evaluate all metrics against a market-based ground truth (return correlation) using Spearman rank correlation.
-   Systematically test the stability of the findings across a wide array of robustness checks.

## Theoretical Background

The implemented methods are grounded in machine learning, algorithm design, and quantitative finance.

**1. Supervised Similarity Learning:**
The foundation of the method is a constituent-level similarity score, $S_{ij}$, between any two bonds, $i$ and $j$. Instead of relying on simple distance metrics, the paper learns this similarity in a supervised fashion. A Random Forest model is trained to predict key financial characteristics of bonds (OAS and Yield) from their features (issuer, maturity, rating, etc.). The "proximity" between two bonds is then defined as the fraction of trees in the forest where they fall into the same terminal leaf node. This creates a nuanced, non-linear similarity measure that is tailored to the financial properties of the assets.
$$
\text{Proximity}(x_i, x_j) = \frac{1}{T} \sum_{t=1}^{T} I[\text{Leaf}_t(x_i) = \text{Leaf}_t(x_j)]
$$

**2. The STRAPSim Algorithm:**
STRAPSim is a greedy, bipartite matching algorithm that computes the similarity between two portfolios, X and Y, defined by their constituents and weights. It iteratively matches the pair of constituents $(x_i, y_j)$ with the highest remaining similarity score $S_{ij}$. The key innovation is its **residual-aware** dynamic: after each match, the weights of the involved constituents are decremented by the amount of weight transferred (the minimum of the two). This prevents a single, high-weight constituent from being "re-used" in multiple matches and ensures the algorithm correctly models the one-to-one nature of portfolio alignment.
$$
\text{STRAPSim}(x, y) = \sum_{i,j=\text{argsort}(S)} S_{ij} \min(w_x^{(t)}(i), w_y^{(t)}(j))
$$
where weights $w^{(t)}$ are updated at each step $t$.

**3. Evaluation via Rank Correlation:**
The performance of STRAPSim and the baseline metrics is evaluated by comparing their ability to rank portfolio pairs in a way that aligns with a market-based measure of similarity. The paper uses the historical monthly return correlation between ETFs as this "ground truth." The primary evaluation metric is the **Spearman Rank Correlation Coefficient ($\rho_s$)**, which measures the monotonic relationship between the ranking produced by a similarity metric and the ranking produced by the return correlations. A higher $\rho_s$ indicates a better alignment with market behavior.

## Features

The provided iPython Notebook (`strapsim_portfolio_similarity_metric_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Phase Architecture:** The entire pipeline is broken down into 28 distinct, modular tasks, each with its own orchestrator function, covering validation, cleansing, feature engineering, modeling, computation, evaluation, and robustness testing.
-   **Configuration-Driven Design:** All methodological and computational parameters are managed in an external `config.yaml` file, allowing for easy customization and scenario testing without code changes.
-   **Advanced Similarity Learning:** A complete pipeline for training a multi-output Random Forest Regressor, including cross-validated hyperparameter tuning, to serve as the engine for the similarity metric.
-   **Optimized Algorithm Implementation:** A high-performance, memory-efficient implementation of the core STRAPSim algorithm and all baseline metrics (Jaccard, Weighted Jaccard, adapted BERTScore with residuals).
-   **Rigorous Evaluation Framework:** A systematic process for transforming similarity and correlation matrices into ranks and computing the Spearman rank correlation and p-values for every method and every reference ETF.
-   **Comprehensive Robustness Suite:** A powerful, extensible set of orchestrators for running a full suite of sensitivity analyses on model hyperparameters, data partitioning, and metric-specific parameters.
-   **Automated Reporting:** Programmatic generation of the final summary table (replicating Table 4 from the paper) and detailed logs for all analysis and robustness check results.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Cleansing (Tasks 1-8):** Ingests and rigorously validates all raw data (holdings, features, returns) and the `config.yaml` file, performs a deep data quality audit, and standardizes all data into an analysis-ready format.
2.  **Feature Engineering (Tasks 9-11):** Prepares the bond feature data for machine learning, including one-hot encoding of categorical variables and max-scaling of numerical variables, and partitions the data into training and testing sets.
3.  **Model Training & Proximity Generation (Tasks 12-14):** Performs a grid search to find optimal Random Forest hyperparameters, trains the final model, validates its performance against the paper's benchmarks, and uses it to generate the crucial N x N bond proximity matrix.
4.  **Similarity Computation (Tasks 15-21):** Prepares the portfolio data structures and then computes the full N x N similarity matrices for STRAPSim and all baseline methods, as well as the ground-truth return correlation matrix.
5.  **Evaluation & Reporting (Tasks 22-24):** Prepares the ranked data, computes the Spearman rank correlation for all methods, and aggregates the results into the final summary table.
6.  **Robustness Analysis (Tasks 25-28):** Orchestrates the entire suite of sensitivity checks.

## Core Components (Notebook Structure)

The `strapsim_portfolio_similarity_metric_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callables

The project is designed around two primary user-facing interface functions:

1.  **`run_strapsim_pipeline` (or its variant `run_strapsim_pipeline_for_robustness`):** This function executes the core research pipeline from end-to-end, producing the main findings of the study and all the necessary data artifacts (e.g., the trained model, the proximity matrix) required for deeper analysis.

2.  **`run_full_study`:** This is the top-level orchestrator. It first calls the main pipeline function to generate the core results and artifacts, and then passes these artifacts to the robustness analysis suite (`run_robustness_analysis_suite`) to perform a complete validation of the study's conclusions. A single call to this function reproduces the entire project.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `scikit-learn`, `pyyaml`, `tqdm`, `numba`, `jsonschema`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric.git
    cd strapsim_portfolio_similarity_metric
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install -r requirements.txt
    ```

## Input Data Structure

The pipeline requires three `pandas.DataFrame`s and a `config.yaml` file. The schemas for these DataFrames are rigorously defined and validated by the pipeline.
-   **ETF Holdings DataFrame:** Contains portfolio composition data, including `etf_id`, `cusip`, and `weight`.
-   **Bond Features DataFrame:** The security master file, containing financial characteristics for every bond, indexed by `cusip`.
-   **Monthly Returns DataFrame:** A time-series DataFrame with a `date` column and one column for each ETF's monthly total returns.

## Usage

The `strapsim_portfolio_similarity_metric_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to call the top-level orchestrator:

```python
# This single call runs the entire project, including the main analysis
# and all robustness checks.
full_results = run_full_study(
    etf_holdings_df=my_holdings_data,
    bond_features_df=my_features_data,
    monthly_returns_df=my_returns_data,
    config=my_config_dict
)
```

## Output Structure

The `run_full_study` function returns a comprehensive nested dictionary containing all results and artifacts:

```
{
    "main_analysis_artifacts": {
        "config": {...},
        "trained_model": <RandomForestRegressor object>,
        "proximity_matrix_df": <DataFrame>,
        "similarity_matrices": {
            "STRAPSim": <DataFrame>,
            "Jaccard": <DataFrame>,
            ...
        },
        "final_summary_table": <DataFrame>,
        ...
    },
    "robustness_analysis_results": {
        "hyperparameter_sensitivity": <DataFrame>,
        "data_split_sensitivity": <DataFrame>,
        "metric_component_sensitivity": <DataFrame>
    }
}
```

## Project Structure

```
strapsim_portfolio_similarity_metric/
│
├── strapsim_portfolio_similarity_metric_draft.ipynb   # Main implementation notebook
├── config.yaml                                        # Master configuration file
├── requirements.txt                                   # Python package dependencies
├── LICENSE                                            # MIT license file
└── README.md                                          # This documentation file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can easily modify all methodological parameters, such as data paths, feature definitions, model hyperparameters, and evaluation settings, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Alternative Similarity Models:** Implementing other supervised models (e.g., Gradient Boosting, Neural Networks) to generate the constituent-level proximity matrix.
-   **Optimal Transport Baselines:** Adding Wasserstein distance (Optimal Transport) as a more advanced baseline for comparing weighted distributions.
-   **Performance Optimization:** For extremely large universes, the STRAPSim algorithm could be further accelerated using more advanced data structures or a compiled language extension.
-   **Application Modules:** Building specific application modules on top of the STRAPSim score, such as a portfolio recommendation engine or a tool for optimizing portfolio trades.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{li2025strapsim,
  title={{STRAPSim: A Portfolio Similarity Metric for ETF Alignment and Portfolio Trades}},
  author={Li, Mingshu and Desai, Dhruv and Jeyapaulraj, Jerinsh and Sommer, Philip and Jain, Riya and Chu, Peter and Mehta, Dhagash},
  journal={arXiv preprint arXiv:2509.24151},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Professional-Grade Implementation of the STRAPSim Framework.
GitHub repository: https://github.com/chirindaopensource/strapsim_portfolio_similarity_metric
```

## Acknowledgments

-   Credit to **Mingshu Li, Dhruv Desai, Jerinsh Jeyapaulraj, Philip Sommer, Riya Jain, Peter Chu, and Dhagash Mehta** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, Scikit-learn, Numba, and Jupyter**, whose work makes complex computational analysis accessible and robust.

--

*This README was generated based on the structure and content of `strapsim_portfolio_similarity_metric_draft.ipynb` and follows best practices for research software documentation.*

# Paper

Title: "*STRAPSim: A Portfolio Similarity Metric for ETF Alignment and Portfolio Trades*"

Authors: Mingshu Li, Dhruv Desai, Jerinsh Jeyapaulraj, Philip Sommer, Riya Jain, Peter Chu, Dhagash Mehta

E-Journal Submission Date: 29 September 2025

Link: https://arxiv.org/abs/2509.24151

Abstract:

Accurately measuring portfolio similarity is critical for a wide range of financial applications, including Exchange-traded Fund (ETF) recommendation, portfolio trading, and risk alignment. Existing similarity measures often rely on exact asset overlap or static distance metrics, which fail to capture similarities among the constituents (e.g., securities within the portfolio) as well as nuanced relationships between partially overlapping portfolios with heterogeneous weights. We introduce STRAPSim (Semantic, Two-level, Residual-Aware Portfolio Similarity), a novel method that computes portfolio similarity by matching constituents based on semantic similarity, weighting them according to their portfolio share, and aggregating results via residual-aware greedy alignment. We benchmark our approach against Jaccard, weighted Jaccard, as well as BERTScore-inspired variants across public classification, regression, and recommendation tasks, as well as on corporate bond ETF datasets. Empirical results show that our method consistently outperforms baselines in predictive accuracy and ranking alignment, achieving the highest Spearman correlation with return-based similarity. By leveraging constituent-aware matching and dynamic reweighting, portfolio similarity offers a scalable, interpretable framework for comparing structured asset baskets, demonstrating its utility in ETF benchmarking, portfolio construction, and systematic execution.


# Summary

### Summary of STRAPSim: A Portfolio Similarity Metric

#### The Core Problem and Its Financial Context

The paper begins by identifying a critical deficiency in existing methods for measuring the similarity between financial portfolios, such as Exchange-Traded Funds (ETFs) or custom trading baskets. The authors argue that current approaches are inadequate for several reasons:

*   **Set-Based Metrics (e.g., Jaccard Index):** These metrics only account for the exact overlap of assets (constituents). They fail to recognize that two different, but highly similar, corporate bonds (e.g., from the same issuer with close maturities) contribute to portfolio similarity. They also ignore the *weights* of these assets, treating a 5% holding identically to a 0.1% holding.
*   **High-Level Metrics (e.g., Return Correlation, Sector Exposure):** These methods operate on aggregate portfolio characteristics. While useful, they can be noisy, backward-looking (especially return correlation), and often miss the granular, constituent-level drivers of similarity. For illiquid assets like many corporate bonds, reliable return data can be sparse, making correlation an unstable measure.

This problem is particularly acute in the growing field of **portfolio trading**, where an entire basket of securities is executed as a single transaction. The ability to accurately match a custom, illiquid basket to a liquid benchmark ETF is crucial for efficient pricing, hedging, and risk management.

#### Introduction of the Proposed Metric: STRAPSim

To address these shortcomings, the authors propose a new metric named **STRAPSim (Semantic, Two-level, Residual-Aware Portfolio Similarity)**. The methodology is designed to be both granular and flexible. The core algorithm can be broken down as follows:

1.  **Constituent-Level Semantic Similarity:** First, a pairwise similarity score, `S_ij`, is computed for every asset `i` in Portfolio X and every asset `j` in Portfolio Y. This is a "pluggable" component; the authors use a random forest proximity score based on bond characteristics (issuer, maturity, rating, etc.) for their main experiment. This step moves beyond exact matches to capture the *semantic* closeness of non-identical assets.

2.  **Greedy Matching:** The algorithm iteratively finds the pair of unmatched (or partially matched) assets `(i, j)` with the highest similarity score `S_ij`.

3.  **Weight-Aware Aggregation:** For the matched pair, the contribution to the total similarity score is calculated as `S_ij * min(w_x(i), w_y(j))`, where `w_x(i)` and `w_y(j)` are the current available weights of the assets in their respective portfolios. This ensures that the match is limited by the smaller of the two holdings.

4.  **Residual-Aware Weight Update:** This is the most critical innovation. After a match, the algorithm updates the weights of both assets by subtracting the matched amount: `min(w_x(i), w_y(j))`. An asset whose weight is reduced to zero is considered fully "consumed" and is removed from future matching rounds. This prevents a single, highly versatile asset in one portfolio from being repeatedly matched to multiple assets in the other, thereby avoiding overcounting of similarity.

The final STRAPSim score is the sum of all these weighted similarity contributions.

#### Theoretical Distinctions and Benchmarks

The paper astutely positions STRAPSim against other methodologies:

*   **Jaccard & Weighted Jaccard:** STRAPSim is superior as it incorporates semantic similarity for non-identical assets and handles partial overlaps in weights more dynamically.
*   **BERTScore-inspired Metric:** This is a key conceptual benchmark. BERTScore, from Natural Language Processing, also uses greedy matching based on semantic similarity. However, it is **not residual-aware**. It would match each asset in Portfolio X to its best counterpart in Y independently, without "using up" the weight of the matched asset. STRAPSim's residual-aware dynamic prevents this, providing a more accurate, one-to-one mapping analogous to a flow or transport problem.
*   **Optimal Transport (OT):** While conceptually similar to OT in that it matches distributions, STRAPSim uses a computationally efficient and interpretable greedy algorithm rather than solving a global linear program, making it more scalable for this application.

#### Empirical Validation and Experimental Design

The authors validate their metric through a rigorous, two-part empirical study:

1.  **Public "Toy" Datasets:** They first demonstrate the general applicability of STRAPSim on standard machine learning datasets (e.g., Iris, Breast Cancer) for classification and regression tasks. By treating data instances as "portfolios" and features as "constituents," they use the STRAPSim score within a k-Nearest Neighbors (k-NN) framework.
2.  **Corporate Bond ETF Dataset:** This is the core financial experiment. They compare 20 corporate bond ETFs. The crucial step here is defining a "ground truth" for economic similarity. They use **historical monthly return correlation** as this benchmark, based on the sound financial principle that portfolios with similar economic exposures should exhibit highly correlated returns.

The primary evaluation metric is the **Spearman rank correlation** between the similarity scores produced by each metric (STRAPSim, Jaccard, etc.) and the benchmark return correlations. A higher Spearman correlation indicates that the metric is better at ranking portfolio pairs in a way that aligns with their real-world market behavior.

#### Key Results and Findings

The empirical results are compelling and consistent across all tests:

*   On the public datasets, STRAPSim consistently outperforms the baselines in both classification accuracy and regression error, demonstrating its robustness as a general-purpose weighted set similarity metric.
*   In the crucial corporate bond ETF experiment, **STRAPSim achieves the highest Spearman correlation (0.6783) with return-based similarity**. This result was also the most statistically significant (lowest p-value).
*   Visualizations (heatmaps) confirm that the similarity matrix generated by STRAPSim more closely resembles the structure of the return correlation matrix compared to the blockier, less nuanced matrices from Jaccard-based methods.

#### Conclusion and Implications

The paper concludes that STRAPSim provides a superior framework for measuring portfolio similarity due to its unique combination of three features:
1.  **Constituent-aware semantic matching.**
2.  **Weight-sensitive aggregation.**
3.  **Residual-aware dynamics.**

This makes it a highly practical tool for financial applications. For institutional finance, it can directly enhance portfolio trading workflows by enabling more accurate and automated matching of custom baskets to liquid ETFs. This improves pricing, hedging, and overall execution efficiency. The metric's interpretability—the ability to inspect which constituents were matched and which remain as residual—is a significant advantage in high-stakes financial decision-making.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================
#
#  STRAPSim: A Portfolio Similarity Metric for ETF Alignment and Portfolio Trades
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "STRAPSim: A Portfolio Similarity Metric
#  for ETF Alignment and Portfolio Trades" by Li et al. (2025). It delivers a
#  computationally efficient and interpretable system for quantifying the
#  economic and functional substitutability between complex, structured asset
#  baskets, such as Exchange-Traded Funds (ETFs) and bespoke bond portfolios.
#
#  Core Methodological Components:
#  • Supervised learning of constituent-level similarity via Random Forest proximities.
#  • The STRAPSim Algorithm: A novel, greedy bipartite matching approach that is:
#      - Semantic: Matches constituents based on learned similarity, not just identity.
#      - Weight-Aware: Accounts for the economic significance of each holding.
#      - Residual-Aware: Dynamically updates weights to prevent over-counting.
#  • Comprehensive benchmarking against standard (Jaccard, Weighted Jaccard) and
#    advanced (adapted BERTScore) similarity metrics.
#  • Rigorous evaluation framework using Spearman rank correlation against a
#    market-based ground truth (historical return correlation).
#
#  Technical Implementation Features:
#  • End-to-end, modular pipeline from data validation to final results aggregation.
#  • Robust data cleansing and feature engineering for institutional fixed-income data.
#  • Cross-validated hyperparameter optimization for the Random Forest model.
#  • Memory-efficient and computationally optimized algorithm implementations.
#  • A full suite of robustness checks, including sensitivity analyses for
#    hyperparameters, data partitioning, and metric-specific parameters.
#
#  Paper Reference:
#  Li, M., Desai, D., Jeyapaulraj, J., Sommer, P., Jain, R., Chu, P., & Mehta, D.
#  (2025). STRAPSim: A Portfolio Similarity Metric for ETF Alignment and
#  Portfolio Trades. arXiv preprint arXiv:2509.24151.
#  https://arxiv.org/abs/2509.24151
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================

# Standard Library Imports
import copy
import itertools
import logging
import math
import time
from typing import Any, Dict, List, Set, Tuple, Union

# Third-Party Library Imports
# Core data manipulation and numerical computation
import numpy as np
import pandas as pd

# Machine learning and statistical analysis
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import GridSearchCV, train_test_split
from scipy.stats import spearmanr

# Performance and utility
import jsonschema
from numba import njit
from tqdm import tqdm

# Define custom type hints for complex data structures to improve readability
Portfolio = Dict[str, Union[List[str], np.ndarray]]
PortfolioData = Dict[str, Portfolio]
BERTScoreFullResult = Tuple[float, float, float, float, float, float]
STRAPSimResult = Tuple[float, float, float]

# Configure logging for the entire application
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Implementation

## Draft 1


## **Discussion of Inputs, Processes and Outputs of Key Callables**
### **Input Validation and Data Quality Assurance (Tasks 1-5)**

#### **`validate_configuration` (Task 1 Orchestrator)**
*   **Inputs:** The raw `config` dictionary.
*   **Processes:** Orchestrates a three-part validation. It first calls `_validate_config_schema` to check the entire nested structure and data types against a rigorous `jsonschema`. It then calls the `_validate_config_dates` to verify the exact string values, format, and business-day logic of all date parameters. Finally, it calls the `_validate_config_values` to exhaustively check every other parameter's value against the study's hardcoded specifications.
*   **Outputs:** A tuple `(bool, List[str])` indicating pass/fail status and a list of detailed error messages.
*   **Transformation:** This function transforms a potentially invalid configuration dictionary into a boolean validation result.
*   **Role in Research:** This callable serves as the foundational gatekeeper for the entire study. It ensures that the experiment is run with the exact parameters specified by the authors, which is the first and most critical step in achieving a faithful replication. It enforces the methodological constants of the experiment.

#### **`validate_etf_holdings_dataframe` (Task 2 Orchestrator)**
*   **Inputs:** The raw `etf_holdings_df` and the validated `config` dictionary.
*   **Processes:** Orchestrates a three-part validation. It checks the DataFrame's structure (column names, ETF count). It verifies the data type of each column. Finally, it calls the `_validate_holdings_quality` to perform deep content checks, including identifier lengths, numerical ranges, per-ETF weight summation to 1.0, and the crucial CUSIP check digit validation.
*   **Outputs:** A tuple `(bool, List[str])` indicating pass/fail status and a list of errors.
*   **Transformation:** Transforms a raw holdings DataFrame into a boolean validation result.
*   **Role in Research:** This function ensures the integrity of the portfolio composition data. The per-ETF weight summation check is particularly critical, as the entire study is predicated on portfolios being valid, weighted distributions of assets.

#### **`validate_bond_features_dataframe` (Task 3 Orchestrator)**
*   **Inputs:** The raw `bond_features_df` and the validated `config` dictionary.
*   **Processes:** Orchestrates a three-part validation. It checks the structure (column names, unique bond count, CUSIP primary key integrity). It performs a rigorous dtype check, paying special attention to the `bond_rating_composite` to ensure it is an *ordered* categorical type, which is essential for the Random Forest model to potentially leverage this ordinal information. Finally, it validates all numerical ranges and enforces the "no missing values" constraint from the paper.
*   **Outputs:** A tuple `(bool, List[str])` indicating pass/fail status and a list of errors.
*   **Transformation:** Transforms a raw features DataFrame into a boolean validation result.
*   **Role in Research:** This function ensures the quality of the raw data that will be used to train the core machine learning model. The integrity of these features is paramount, as they are the basis for learning the constituent-level similarity metric.

#### **`validate_monthly_returns_dataframe` (Task 4 Orchestrator)**
*   **Inputs:** The raw `monthly_returns_df`, the validated `etf_holdings_df`, and the `config` dictionary.
*   **Processes:** Orchestrates a three-part validation. It checks the structure (ETF column count and names against holdings data, number of observations). It validates the temporal integrity of the 'date' column (chronological, unique, month-end business day). Finally, it calls the `_validate_returns_quality` to check for missing values, decimal format, and the required 4-decimal-place precision.
*   **Outputs:** A tuple `(bool, List[str])` indicating pass/fail status and a list of errors.
*   **Transformation:** Transforms a raw returns DataFrame into a boolean validation result.
*   **Role in Research:** This function ensures the integrity of the "ground truth" data. As the paper states, "we use historical monthly return correlations as a calibration benchmark." Flaws in this data would invalidate the entire evaluation of STRAPSim's performance.

#### **`validate_cross_dataset_consistency` (Task 5 Orchestrator)**
*   **Inputs:** The three raw DataFrames (`etf_holdings_df`, `bond_features_df`, `monthly_returns_df`) and the `config` dictionary.
*   **Processes:** Orchestrates three referential integrity checks. It verifies that every CUSIP in the holdings data exists in the features data. It ensures a perfect one-to-one match between the ETF identifiers in the holdings and returns data. Finally, it confirms that the valuation date of the holdings aligns perfectly with the end date of the return series.
*   **Outputs:** A tuple `(bool, List[str])` indicating pass/fail status and a list of errors.
*   **Transformation:** Transforms three separate DataFrames into a single boolean validation of their mutual consistency.
*   **Role in Research:** This is the final gatekeeper of the validation phase. It ensures that the three disparate sources of data can be safely joined and analyzed as a single, coherent universe, preventing errors from misaligned identifiers or dates.

--

### **Data Cleansing (Tasks 6-8)**

#### **`cleanse_etf_holdings_dataframe` (Task 6 Orchestrator)**
*   **Inputs:** The validated raw `etf_holdings_df` and `config`.
*   **Processes:** Orchestrates a multi-step cleansing pipeline. It standardizes all identifier strings (whitespace, casing). It then aggregates any duplicate holdings for a given CUSIP within a single ETF, summing their weights. Critically, it then re-normalizes the weights for every ETF to ensure they sum exactly to 1.0 after the aggregation.
*   **Outputs:** A cleansed `pandas.DataFrame` with standardized identifiers, unique holdings per ETF, and perfectly normalized weights.
*   **Transformation:** Transforms a validated but potentially messy DataFrame into a canonical, analytically pure representation.
*   **Role in Research:** This function prepares the portfolio data for the similarity algorithms, ensuring that each portfolio is a unique, valid probability distribution over its constituents.

#### **`cleanse_bond_features_dataframe` (Task 7 Orchestrator)**
*   **Inputs:** The validated raw `bond_features_df` and `config`.
*   **Processes:** Standardizes all categorical text fields. It then applies specific, rule-based corrections to numerical data (e.g., setting minimum `age_days` to 1, imputing invalid `coupon_frequency` with the mode). Finally, it performs a hard validation to ensure there are no missing values in the target variable columns, raising an error if any are found, as per the paper's claim: "As the dataset contained no missing values, no imputation was required."
*   **Outputs:** A cleansed `pandas.DataFrame` with standardized features and validated targets.
*   **Transformation:** Transforms a raw features DataFrame into a clean, canonical format ready for feature engineering.
*   **Role in Research:** This function prepares the predictor and target variables for the machine learning model, ensuring consistency and adherence to logical business rules.

#### **`cleanse_monthly_returns_dataframe` (Task 8 Orchestrator)**
*   **Inputs:** The validated raw `monthly_returns_df` and `config`.
*   **Processes:** Standardizes the time-series structure by setting a clean, sorted `DatetimeIndex`. It enforces the "no missing values" policy by raising an error if any are found. Finally, it formats the numerical return values by rounding them to the specified 4-decimal-place precision.
*   **Outputs:** A cleansed `pandas.DataFrame` with a `DatetimeIndex` and consistently formatted return values.
*   **Transformation:** Transforms a raw returns DataFrame into a clean, properly indexed time-series object.
*   **Role in Research:** This function prepares the ground-truth data for the final correlation calculation, ensuring it is in the optimal format for econometric analysis.

--

### **Feature Engineering and Model Training (Tasks 9-14)**

#### **`prepare_categorical_features` (Task 9 Orchestrator)**
*   **Inputs:** The cleansed `bond_features_df` and `config`.
*   **Processes:** Identifies the categorical columns to be encoded based on the configuration. It analyzes and reports their cardinality. It then applies one-hot encoding using `pandas.get_dummies` to transform these columns into a wide matrix of binary numerical features.
*   **Outputs:** A `pandas.DataFrame` containing only the new one-hot encoded features, indexed by CUSIP.
*   **Transformation:** Transforms a set of categorical columns into a high-dimensional numerical representation.
*   **Role in Research:** This function implements the feature engineering step for categorical variables described in Section 3.2: "Categorical features... were transformed using one-hot encoding."

#### **`prepare_numerical_features` (Task 10 Orchestrator)**
*   **Inputs:** The cleansed `bond_features_df`, `config`, and an optional `scaling_params` Series.
*   **Processes:** Operates in two modes. In "training" mode (`scaling_params` is `None`), it identifies the numerical columns, computes their maximum values, and stores these as the scaling parameters. It then scales the features by dividing each by its corresponding maximum. In "inference" mode, it uses the provided `scaling_params` to scale the data.
*   **Outputs:** A tuple containing the scaled numerical `pandas.DataFrame` and the `pandas.Series` of scaling parameters used.
*   **Transformation:** Transforms numerical features from their original scale to a [0, 1] range.
*   **Role in Research:** This function implements the feature scaling mentioned in Section 4.1: "feature values are first scaled by their maximum to ensure comparability across dimensions." The two-mode design is a best practice to prevent data leakage from the test set into the training process.

#### **`construct_and_split_data` (Task 11 Orchestrator)**
*   **Inputs:** The cleansed `bond_features_df`, the `categorical_features_encoded` DataFrame, the `numerical_features_scaled` DataFrame, and `config`.
*   **Processes:** Merges the categorical and numerical feature sets into a single complete feature matrix, `X`. It then extracts the target variables (`oas_bp`, `yield_to_maturity`) and ensures their row order is perfectly aligned with `X`. Finally, it uses `sklearn.model_selection.train_test_split` to partition both `X` and `Y` into training and testing sets according to the configuration (90/10 split, fixed random state).
*   **Outputs:** A tuple of four DataFrames: `(X_train, X_test, y_train, y_test)`.
*   **Transformation:** Transforms the complete, aligned datasets into the standard partitioned format required for supervised machine learning.
*   **Role in Research:** This function implements the data partitioning described in Section 3.2: "The data were randomly shuffled and split into 90% for training and 10% for testing."

#### **`optimize_random_forest_hyperparameters` (Task 12 Orchestrator)**
*   **Inputs:** `X_train`, `y_train`, and `config`.
*   **Processes:** Implements a grid search with 5-fold cross-validation to find the optimal `n_estimators` and `max_depth` for the `RandomForestRegressor`. It systematically trains and evaluates models for each combination of hyperparameters in the search space, using Root Mean Squared Error (RMSE) as the evaluation metric.
*   **Outputs:** A dictionary containing the best-performing hyperparameter values.
*   **Transformation:** Transforms a training dataset and a parameter search space into a single, optimal set of parameters.
*   **Role in Research:** This function implements the hyperparameter tuning process described in Section 3.2: "We applied 5-fold cross-validation on the training set to perform hyperparameter tuning, specifically optimizing the number of trees and maximum depth of the model."

#### **`train_final_model_and_evaluate` (Task 13 Orchestrator)**
*   **Inputs:** The four data splits (`X_train`, `y_train`, `X_test`, `y_test`), the `optimal_params` dictionary, and `config`.
*   **Processes:** Instantiates a new `RandomForestRegressor` with the optimal hyperparameters and trains it on the *entire* training dataset. It then generates predictions on both the train and test sets and calculates the final RMSE and MAPE, comparing them against the benchmark values reported in the paper to validate the model's fidelity.
*   **Outputs:** The final, trained `sklearn.ensemble.RandomForestRegressor` object.
*   **Transformation:** Transforms the training data and optimal parameters into a trained, predictive model.
*   **Role in Research:** This function produces the final model that will be used to generate the constituent-level similarity scores. The evaluation step confirms that our model has a similar performance profile to the one in the paper, as reported in Section 3.2: "On the training data, the model achieved an RMSE of 0.21... On the testing data, the RMSE... were 0.51..."

#### **`generate_proximity_matrix` (Task 14 Orchestrator)**
*   **Inputs:** The `final_model` object and the complete, unsplit feature matrix `X_complete`.
*   **Processes:** This function operationalizes the concept of Random Forest proximity. It uses the trained model's `.apply()` method to find the terminal leaf node for every bond in every tree. It then systematically counts, for every pair of bonds, how many times they landed in the same leaf node. This count is divided by the total number of trees to get a proximity score between 0 and 1.
*   **Outputs:** A square, symmetric `pandas.DataFrame` (the proximity matrix `S`), where `S[i, j]` is the proximity score between bond `i` and bond `j`.
*   **Transformation:** Transforms a trained model and a feature matrix into a pairwise similarity matrix.
*   **Role in Research:** This function is the direct implementation of the constituent-level similarity method described in Section 2.1.1 and Equation (2):
    $$
    \text{Proximity}(x_i, x_j) = \frac{1}{T} \sum_{t=1}^{T} I[\text{Leaf}_t(x_i) = \text{Leaf}_t(x_j)]
    $$
    This matrix is the fundamental input for the STRAPSim and adapted BERTScore algorithms.

--

### **Similarity Computation and Evaluation (Tasks 15-24)**

#### **`extract_portfolio_level_data` (Task 15 Orchestrator)**
*   **Inputs:** The cleansed `etf_holdings_df` and the `proximity_matrix_df`.
*   **Processes:** Transforms the flat holdings DataFrame into a structured, nested dictionary. For each ETF, it creates an entry containing its constituent CUSIPs, their weights, and, critically, their integer indices corresponding to the rows/columns of the proximity matrix.
*   **Outputs:** A `PortfolioData` dictionary, optimized for use in the similarity algorithms.
*   **Transformation:** Transforms a DataFrame into a structured dictionary with pre-computed indices for efficient lookups.
*   **Role in Research:** This is a crucial data preparation step that structures the portfolio data in a way that makes the subsequent, computationally intensive similarity calculations much more efficient.

#### **`compute_strapsim_between_pair` (Task 16 Core Algorithm)**
*   **Inputs:** Two portfolio data dictionaries, the `proximity_matrix` as a NumPy array, and `config`.
*   **Processes:** This is the core implementation of the STRAPSim algorithm. It performs a greedy matching process by iterating through all pairs of constituents (pre-sorted by their proximity score). In each step, it transfers the minimum available weight between the best-matched pair, updates a cumulative similarity score, and decrements the residual weights of the involved constituents.
*   **Outputs:** A tuple containing the final STRAPSim score and the final residual weights for both portfolios.
*   **Transformation:** Transforms two weighted sets and a similarity matrix into a single scalar similarity score.
*   **Role in Research:** This is the direct and complete implementation of the novel algorithm proposed in the paper, as defined by Equations (1a) and (1b):
    $$
    \text{STRAPSim}(x, y) = \sum_{i,j=\text{argsort}(S)} S_{ij} \min(w_x^{(t)}(i), w_y^{(t)}(j))
    $$
    $$
    \begin{aligned}
    w_x^{(t+1)}(i) &= w_x^{(t)}(i) - \min(w_x^{(t)}(i), w_y^{(t)}(j)) \\
    w_y^{(t+1)}(j) &= w_y^{(t)}(j) - \min(w_x^{(t)}(i), w_y^{(t)}(j))
    \end{aligned}
    $$

#### **`compute_jaccard_matrix` (Task 18 Orchestrator)**
*   **Inputs:** The `PortfolioData` dictionary and `config`.
*   **Processes:** For each unique pair of portfolios, it converts their constituent lists to sets and computes the ratio of the size of their intersection to the size of their union.
*   **Outputs:** A square `pandas.DataFrame` of pairwise Jaccard similarity scores.
*   **Transformation:** Transforms the portfolio data into a matrix of set-based similarity scores.
*   **Role in Research:** Implements the first baseline metric, the Jaccard Index, as defined in Equation (3):
    $$
    J(X, Y) = \frac{|X \cap Y|}{|X \cup Y|}
    $$

#### **`compute_weighted_jaccard_matrix` (Task 19 Orchestrator)**
*   **Inputs:** The `PortfolioData` dictionary and `config`.
*   **Processes:** For each unique pair of portfolios, it computes the weighted intersection (sum of minimum weights for common assets) and the weighted union (sum of maximum weights for all assets) and finds their ratio.
*   **Outputs:** A square `pandas.DataFrame` of pairwise Weighted Jaccard scores.
*   **Transformation:** Transforms the portfolio data into a matrix of weight-aware, set-based similarity scores.
*   **Role in Research:** Implements the second baseline metric, the Weighted Jaccard Index, as defined in Equation (4):
    $$
    J_w(X, Y) = \frac{\sum_{k \in X \cap Y} \min(w_x(k), w_y(k))}{\sum_{k \in X \cup Y} \max(w_x(k), w_y(k))}
    $$

#### **`compute_bertscore_matrix` (Task 20 Orchestrator, Remediated)**
*   **Inputs:** The `PortfolioData` dictionary, `proximity_matrix`, `config`, and a `weight_scheme`.
*   **Processes:** For each unique pair of portfolios, it calls the remediated `_compute_bertscore_for_pair` helper. This helper calculates the Recall (weighted average of best matches from X to Y), Precision (weighted average of best matches from Y to X), and their harmonic mean (F1 score). It also calculates the corresponding residuals.
*   **Outputs:** A tuple of six `pandas.DataFrame`s for Recall, Precision, F1, and their respective residuals.
*   **Transformation:** Transforms the portfolio data and proximity matrix into six matrices of semantic similarity scores and residuals.
*   **Role in Research:** Implements the third baseline, the adapted BERTScore, as defined in Equations (6a, 6b, 6c) and the residual Equations (7a, 7b, 7c).

#### **`compute_return_correlation_matrix` (Task 21 Orchestrator)**
*   **Inputs:** The cleansed `monthly_returns_df` and `config`.
*   **Processes:** Uses the highly optimized `.corr()` method to compute the entire pairwise Pearson correlation matrix for all ETF return series.
*   **Outputs:** A square `pandas.DataFrame` of pairwise return correlations.
*   **Transformation:** Transforms a time-series DataFrame into a correlation matrix.
*   **Role in Research:** This function computes the "calibration benchmark" or "ground truth" matrix, as described in Section 4.3. This matrix represents the market-based measure of economic similarity that the model-based metrics aim to approximate.

#### **`prepare_data_for_spearman_analysis` (Task 22 Orchestrator)**
*   **Inputs:** A dictionary of all computed similarity matrices and the `correlation_matrix`.
*   **Processes:** Transforms all the wide-format (N x N) matrices into a single, long-format "tidy" DataFrame. It then performs a grouped ranking operation, calculating the ranks of the similarity scores and correlation values independently for each reference ETF.
*   **Outputs:** A single, tidy `pandas.DataFrame` containing the raw scores and the calculated ranks, ready for statistical testing.
*   **Transformation:** Transforms multiple wide matrices into a single, long, ranked DataFrame.
*   **Role in Research:** This is the final data preparation step before the evaluation. It structures the data in the precise format required to perform the main statistical test of the paper: comparing the *rankings* produced by each method.

#### **`compute_spearman_rank_correlation` (Task 23 Orchestrator)**
*   **Inputs:** The tidy `analysis_df` from the previous step and `config`.
*   **Processes:** Groups the data by `['reference_etf', 'method']` and applies the `scipy.stats.spearmanr` test to the rank columns of each group. This computes the Spearman correlation coefficient ($\rho_s$) and its p-value for every method from the perspective of every ETF.
*   **Outputs:** A `pandas.DataFrame` summarizing the results (correlation and p-value) for each of the 80 individual tests.
*   **Transformation:** Transforms the ranked data into a set of statistical test results.
*   **Role in Research:** This function performs the primary statistical evaluation of the paper, as described in Section 4.3: "we compute the Spearman rank correlation between similarity scores and return correlations across all ETF pairs."

#### **`aggregate_spearman_results` (Task 24 Orchestrator)**
*   **Inputs:** The `spearman_results_df` from the previous step and `config`.
*   **Processes:** Groups the detailed test results by `method` and calculates the final summary statistics: average correlation, average p-value, and the percentage of tests that were statistically significant at the 5% and 10% levels.
*   **Outputs:** A final, formatted `pandas.DataFrame` that is a direct replication of Table 4 from the paper.
*   **Transformation:** Transforms the detailed test results into the final, aggregated summary table.
*   **Role in Research:** This function produces the final, conclusive result of the study, allowing for a direct comparison of STRAPSim's performance against the baselines, as presented in Table 4: "Statistics of Spearman's Rank Correlation."

--

### **Robustness Analysis (Tasks 25-28)**

#### **`run_full_study` (Top-Level Orchestrator)**
*   **Inputs:** The three raw DataFrames and the `config`.
*   **Processes:** This is the highest-level orchestrator. It first calls `run_strapsim_pipeline_for_robustness` to execute the main analysis once and generate a comprehensive dictionary of all necessary intermediate data artifacts. It then passes this entire dictionary of artifacts to `run_robustness_analysis_suite`.
*   **Outputs:** A nested dictionary containing both the main pipeline artifacts and the summary results from all robustness checks.
*   **Transformation:** Transforms the raw data into the complete, final set of results for the entire study, including all sensitivity analyses.
*   **Role in Research:** This function provides a single, convenient entry point to run the entire replication from start to finish, embodying the principles of reproducibility and ease of use.

#### **`run_robustness_analysis_suite` (Robustness Master Orchestrator)**
*   **Inputs:** The `pipeline_artifacts` dictionary.
*   **Processes:** This function acts as a sub-orchestrator for the robustness phase. It unpacks the necessary artifacts and calls the three main sensitivity analysis functions in sequence: `run_hyperparameter_sensitivity_analysis`, `run_data_split_sensitivity_analysis`, and the remediated `run_metric_component_sensitivity_analysis`.
*   **Outputs:** A dictionary where each key is the name of a sensitivity analysis and each value is the corresponding summary DataFrame.
*   **Transformation:** Transforms the set of main pipeline artifacts into a set of summary tables for each robustness check.
*   **Role in Research:** This function encapsulates the entire suite of validation checks performed to test the stability and reliability of the main findings, as described in Tasks 26, 27, and 28.


<br><br>

## **Usage Example**

### **Example Usage: Executing the End-to-End STRAPSim Study**

This script serves as a practical guide for executing the complete research pipeline developed to replicate the "STRAPSim" paper. It follows a logical, three-step process:

1.  **Data and Configuration Loading:** It begins by loading the three required raw datasets (ETF holdings, bond features, and monthly returns) into `pandas` DataFrames. It also loads the master configuration from the `config.yaml` file. This separation of data, configuration, and code is a critical best practice.
2.  **Main Pipeline Execution:** It then calls the primary orchestrator, `run_full_study`. This single function call triggers the entire end-to-end process: data validation, cleansing, feature engineering, model training, similarity computation, and final evaluation.
3.  **Robustness Analysis Execution:** The `run_full_study` function internally passes the artifacts from the main pipeline to the `run_robustness_analysis_suite`, which then conducts all the specified sensitivity tests.
4.  **Results Inspection:** Finally, the script demonstrates how to access and inspect the key outputs from the returned results dictionary, such as the final summary table that replicates Table 4 from the paper.

This example assumes that all the modular functions developed in Tasks 1 through 28 are organized into appropriate modules (e.g., `validation.py`, `pipeline.py`, `robustness.py`) and are importable.


```python
#!/usr/bin/env python3
# ==============================================================================
#
#  Example Execution Script for the STRAPSim Research Pipeline
#
#  This script demonstrates the end-to-end usage of the STRAPSim analytical
#  framework. It simulates a real-world workflow where a researcher loads the
#  raw data and a configuration file, and then executes the entire study with a
#  single command.
#
# ==============================================================================

import pandas as pd
import yaml
import logging
from typing import Any, Dict

# Assume all the orchestrator functions are located in a module named 'strapsim_pipeline'
# from strapsim_pipeline import run_full_study

# --- Helper function to create realistic dummy data for the example ---
# In a real scenario, this data would be loaded from CSV files, a database,
# or a data provider API (e.g., Bloomberg, Refinitiv).

def create_dummy_data() -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Creates realistic, but simplified, dummy data for demonstration."""
    # Holdings Data
    holdings_data = {
        'etf_id': ['ETF_A', 'ETF_A', 'ETF_B', 'ETF_B'],
        'etf_name': ['ETF A', 'ETF A', 'ETF B', 'ETF B'],
        'cusip': ['CUSIP001', 'CUSIP002', 'CUSIP001', 'CUSIP003'],
        'isin': ['ISIN001', 'ISIN002', 'ISIN001', 'ISIN003'],
        'sedol': ['SEDOL01', 'SEDOL02', 'SEDOL01', 'SEDOL03'],
        'weight': [0.6, 0.4, 0.5, 0.5],
        'market_value_usd': [600000, 400000, 500000, 500000],
        'shares_held': [600, 400, 500, 500],
        'as_of_date': pd.to_datetime('2024-03-31')
    }
    etf_holdings_df = pd.DataFrame(holdings_data)

    # Features Data (simplified for brevity)
    features_data = {
        'cusip': ['CUSIP001', 'CUSIP002', 'CUSIP003'],
        'issuer': ['Issuer X', 'Issuer Y', 'Issuer X'],
        'industry': ['Tech', 'Finance', 'Tech'],
        'days_to_maturity': [1825, 3650, 2190],
        'age_days': [730, 365, 1095],
        'bond_rating_composite': ['AA', 'A', 'AA-'],
        'country_of_risk': ['USA', 'USA', 'CAN'],
        'coupon_rate': [2.5, 3.0, 2.75],
        'coupon_frequency': [2, 2, 2],
        'amount_issued_usd': [1e9, 5e8, 7.5e8],
        'rule_144a_flag': [False, False, True],
        'oas_bp': [50.0, 75.0, 60.0],
        'yield_to_maturity': [4.5, 5.0, 4.8]
    }
    bond_features_df = pd.DataFrame(features_data)
    
    # This is a simplified example. The real data would have 6870 bonds.
    # The rating column would need to be an ordered categorical.
    rating_order = [
        'AAA', 'AA+', 'AA', 'AA-', 'A+', 'A', 'A-', 'BBB+', 'BBB', 'BBB-',
        'BB+', 'BB', 'BB-', 'B+', 'B', 'B-', 'CCC+', 'CCC', 'CCC-', 'CC', 'C', 'D'
    ]
    bond_features_df['bond_rating_composite'] = pd.Categorical(
        bond_features_df['bond_rating_composite'], categories=rating_order, ordered=True
    )


    # Returns Data
    dates = pd.to_datetime(['2024-02-29', '2024-03-31']) # Simplified time series
    returns_data = {
        'date': dates,
        'ETF_A': [0.005, -0.002],
        'ETF_B': [0.006, -0.001]
    }
    monthly_returns_df = pd.DataFrame(returns_data)
    
    # In the real run, the data would have 20 ETFs and 26 months of returns.
    
    return etf_holdings_df, bond_features_df, monthly_returns_df


if __name__ == '__main__':
    # ==========================================================================
    # Step 1: Load Data and Configuration
    # ==========================================================================
    
    # --- Load Configuration from YAML file ---
    # In a real application, the YAML file is the single source of truth for all parameters.
    # This decouples the code from the experimental setup.
    try:
        # Open and read the YAML configuration file.
        with open('config.yaml', 'r') as f:
            # Use the safe_load method to parse the YAML into a Python dictionary.
            config: Dict[str, Any] = yaml.safe_load(f)
        logging.info("Successfully loaded configuration from 'config.yaml'.")
    except FileNotFoundError:
        logging.error("FATAL: 'config.yaml' not found. The pipeline cannot run without its configuration.")
        exit()
    except yaml.YAMLError as e:
        logging.error(f"FATAL: Error parsing 'config.yaml': {e}")
        exit()

    # --- Load Raw DataFrames ---
    # This section simulates loading the three required raw data files.
    # In a production environment, these would be read from CSVs, Parquet files,
    # or a database connection.
    logging.info("Loading raw data files...")
    
    # For this example, we use a dummy data generator.
    # Replace this with your actual data loading logic, e.g.:
    # etf_holdings_df = pd.read_csv('data/etf_holdings.csv', parse_dates=['as_of_date'])
    # bond_features_df = pd.read_csv('data/bond_features.csv')
    # monthly_returns_df = pd.read_csv('data/monthly_returns.csv', parse_dates=['date'])
    
    # NOTE: The dummy data is extremely simplified and will not produce the same
    # numerical results as the paper. Its purpose is to demonstrate the API
    # and the data flow of the pipeline. To replicate the paper, you must use
    # the full datasets with 20 ETFs and 6870 bonds.
    etf_holdings_df, bond_features_df, monthly_returns_df = create_dummy_data()
    
    logging.info("Raw data loaded successfully.")

    # ==========================================================================
    # Step 2: Execute the Full Research Study
    # ==========================================================================
    
    # This is the main entry point to the entire analytical framework.
    # The `run_full_study` function orchestrates both the core analysis and
    # the subsequent suite of robustness checks.
    
    # It takes the raw data and the configuration as input.
    # It returns a comprehensive dictionary containing all results and artifacts.
    try:
        # Execute the entire study.
        full_study_results = run_full_study(
            etf_holdings_df=etf_holdings_df,
            bond_features_df=bond_features_df,
            monthly_returns_df=monthly_returns_df,
            config=config
        )
        logging.info("Full study, including main analysis and robustness checks, completed successfully.")
        
    except Exception as e:
        # If any part of the pipeline fails, the error will be caught and logged.
        logging.error(f"The research pipeline failed with a critical error: {e}", exc_info=True)
        exit()

    # ==========================================================================
    # Step 3: Inspect the Results
    # ==========================================================================
    
    # The returned object is a nested dictionary containing all artifacts.
    # We can now inspect the key outputs.
    
    logging.info("\n" + "="*80)
    logging.info("Inspecting Key Results from the Study")
    logging.info("="*80)

    # --- Main Analysis Results ---
    # The main pipeline's final summary table (replication of Table 4).
    # Note: This is not produced by the artifact-generating pipeline, but is
    # implicitly calculated within the robustness checks. We can extract it
    # from the first robustness run for inspection.
    main_summary_table = full_study_results['main_analysis_artifacts'].get('final_summary_table')
    if main_summary_table is not None:
        print("\n--- Main Analysis Summary (Table 4 Replication) ---")
        print(main_summary_table.to_string())

    # The STRAPSim similarity matrix.
    strapsim_matrix = full_study_results['main_analysis_artifacts']['similarity_matrices']['STRAPSim']
    print("\n--- STRAPSim Similarity Matrix (Head) ---")
    print(strapsim_matrix.head())

    # --- Robustness Analysis Results ---
    # The summary table from the hyperparameter sensitivity analysis.
    hyperparam_sensitivity_results = full_study_results['robustness_analysis_results']['hyperparameter_sensitivity']
    print("\n--- Hyperparameter Sensitivity Analysis Summary ---")
    print(hyperparam_sensitivity_results.to_string())
    
    # The summary table from the data split sensitivity analysis.
    split_sensitivity_results = full_study_results['robustness_analysis_results']['data_split_sensitivity']
    print("\n--- Data Split Sensitivity Analysis Summary ---")
    print(split_sensitivity_results.to_string())
```



### Core Pipeline

In [None]:
# Task 1: Configuration Parameter Validation

def _create_validation_schema() -> Dict[str, Any]:
    """
    Creates the comprehensive jsonschema validation schema for the configuration.

    This function defines the precise, exhaustive structure, data types, and
    constraints for the entire configuration object, ensuring strict adherence
    to the research paper's specifications. This remediated version provides a
    complete definition for all nested objects, including the `baseline_methods`
    section, leaving no part of the configuration unspecified.

    Returns:
        Dict[str, Any]: A dictionary representing the complete and rigorous
                        jsonschema for the configuration dictionary.
    """
    # Define the jsonschema for the configuration dictionary.
    # This schema is exhaustive, enforcing the exact structure, types, and
    # required keys for all top-level and nested objects.
    # 'additionalProperties': False is used globally to ensure no unspecified
    # keys are permitted anywhere in the configuration.
    schema = {
        "type": "object",
        "additionalProperties": False,
        "required": [
            'data_setup', 'feature_engineering', 'random_forest_config',
            'strapsim_algorithm', 'baseline_methods', 'evaluation_config',
            'expected_results', 'computational_config'
        ],
        "properties": {
            "data_setup": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'valuation_date', 'return_series_start_date',
                    'return_series_end_date', 'return_frequency',
                    'total_corporate_bond_etfs', 'total_unique_bonds',
                    'average_bonds_per_etf', 'return_type',
                    'currency_normalization'
                ],
                "properties": {
                    "valuation_date": {"type": "string", "format": "date"},
                    "return_series_start_date": {"type": "string", "format": "date"},
                    "return_series_end_date": {"type": "string", "format": "date"},
                    "return_frequency": {"type": "string", "enum": ["M"]},
                    "total_corporate_bond_etfs": {"type": "integer", "minimum": 1},
                    "total_unique_bonds": {"type": "integer", "minimum": 1},
                    "average_bonds_per_etf": {"type": "integer", "minimum": 1},
                    "return_type": {"type": "string"},
                    "currency_normalization": {"type": "string"}
                }
            },
            "feature_engineering": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'categorical_features_to_one_hot_encode',
                    'numerical_features', 'target_variables',
                    'training_set_fraction', 'testing_set_fraction',
                    'random_state_for_shuffle', 'shuffle_before_split',
                    'missing_value_handling', 'validate_no_missing_values'
                ],
                "properties": {
                    "categorical_features_to_one_hot_encode": {
                        "type": "array", "items": {"type": "string"}
                    },
                    "numerical_features": {
                        "type": "array", "items": {"type": "string"}
                    },
                    "target_variables": {
                        "type": "array", "items": {"type": "string"}
                    },
                    "training_set_fraction": {"type": "number", "minimum": 0.0, "maximum": 1.0},
                    "testing_set_fraction": {"type": "number", "minimum": 0.0, "maximum": 1.0},
                    "random_state_for_shuffle": {"type": "integer"},
                    "shuffle_before_split": {"type": "boolean"},
                    "missing_value_handling": {"type": "string"},
                    "validate_no_missing_values": {"type": "boolean"}
                }
            },
            "random_forest_config": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'model_type', 'cross_validation_folds',
                    'cv_scoring_metric', 'hyperparameters_to_tune',
                    'tuning_method', 'tuning_objective',
                    'expected_training_rmse', 'expected_training_mape',
                    'expected_testing_rmse', 'expected_testing_mape',
                    'random_state', 'n_jobs', 'multi_output_strategy'
                ],
                "properties": {
                    "model_type": {"type": "string"},
                    "cross_validation_folds": {"type": "integer", "minimum": 2},
                    "cv_scoring_metric": {"type": "string"},
                    "hyperparameters_to_tune": {
                        "type": "object",
                        "additionalProperties": False,
                        "required": ["n_estimators", "max_depth"],
                        "properties": {
                            "n_estimators": {
                                "type": "array", "items": {"type": "integer"}
                            },
                            "max_depth": {
                                "type": "array",
                                "items": {"type": ["integer", "null"]}
                            }
                        }
                    },
                    "tuning_method": {"type": "string"},
                    "tuning_objective": {"type": "string"},
                    "expected_training_rmse": {"type": "number"},
                    "expected_training_mape": {"type": "number"},
                    "expected_testing_rmse": {"type": "number"},
                    "expected_testing_mape": {"type": "number"},
                    "random_state": {"type": "integer"},
                    "n_jobs": {"type": "integer"},
                    "multi_output_strategy": {"type": "string"}
                }
            },
            "strapsim_algorithm": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'convergence_threshold', 'max_iterations',
                    'similarity_threshold', 'weight_precision_tolerance',
                    'similarity_function_type', 'proximity_normalization',
                    'residual_tracking', 'weight_conservation_validation',
                    'greedy_selection_strategy', 'return_similarity_score',
                    'return_residuals', 'return_matching_details',
                    'precision_digits'
                ],
                "properties": {
                    "convergence_threshold": {"type": "number"},
                    "max_iterations": {"type": "integer"},
                    "similarity_threshold": {"type": "number"},
                    "weight_precision_tolerance": {"type": "number"},
                    "similarity_function_type": {"type": "string"},
                    "proximity_normalization": {"type": "boolean"},
                    "residual_tracking": {"type": "boolean"},
                    "weight_conservation_validation": {"type": "boolean"},
                    "greedy_selection_strategy": {"type": "string"},
                    "return_similarity_score": {"type": "boolean"},
                    "return_residuals": {"type": "boolean"},
                    "return_matching_details": {"type": "boolean"},
                    "precision_digits": {"type": "integer"}
                }
            },
            "baseline_methods": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'jaccard_config', 'weighted_jaccard_config',
                    'bertscore_config'
                ],
                "properties": {
                    "jaccard_config": {
                        "type": "object",
                        "additionalProperties": False,
                        "required": ["exact_match_required", "identifier_field", "case_sensitive_matching", "compute_residual"],
                        "properties": {
                            "exact_match_required": {"type": "boolean"},
                            "identifier_field": {"type": "string"},
                            "case_sensitive_matching": {"type": "boolean"},
                            "compute_residual": {"type": "boolean"}
                        }
                    },
                    "weighted_jaccard_config": {
                        "type": "object",
                        "additionalProperties": False,
                        "required": ["weight_field", "min_weight_intersection", "max_weight_union", "weight_normalization_check", "exact_match_required", "compute_residual"],
                        "properties": {
                            "weight_field": {"type": "string"},
                            "min_weight_intersection": {"type": "boolean"},
                            "max_weight_union": {"type": "boolean"},
                            "weight_normalization_check": {"type": "boolean"},
                            "exact_match_required": {"type": "boolean"},
                            "compute_residual": {"type": "boolean"}
                        }
                    },
                    "bertscore_config": {
                        "type": "object",
                        "additionalProperties": False,
                        "required": ["compute_recall", "compute_precision", "compute_f1", "similarity_function", "weight_scheme", "bidirectional_matching", "compute_residuals", "residual_aggregation"],
                        "properties": {
                            "compute_recall": {"type": "boolean"},
                            "compute_precision": {"type": "boolean"},
                            "compute_f1": {"type": "boolean"},
                            "similarity_function": {"type": "string"},
                            "weight_scheme": {"type": "string"},
                            "bidirectional_matching": {"type": "boolean"},
                            "compute_residuals": {"type": "boolean"},
                            "residual_aggregation": {"type": "string"}
                        }
                    }
                }
            },
            "evaluation_config": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'ground_truth_metric', 'return_correlation_method',
                    'primary_evaluation_metric', 'correlation_test_type',
                    'significance_levels', 'multiple_testing_correction',
                    'compute_average_correlation', 'compute_average_pvalue',
                    'compute_significance_percentage',
                    'exclude_self_comparison', 'ranking_method',
                    'handle_ranking_ties', 'minimum_etf_pairs_for_analysis'
                ],
                "properties": {
                    "ground_truth_metric": {"type": "string"},
                    "return_correlation_method": {"type": "string"},
                    "primary_evaluation_metric": {"type": "string"},
                    "correlation_test_type": {"type": "string"},
                    "significance_levels": {
                        "type": "array", "items": {"type": "number"}
                    },
                    "multiple_testing_correction": {"type": ["string", "null"]},
                    "compute_average_correlation": {"type": "boolean"},
                    "compute_average_pvalue": {"type": "boolean"},
                    "compute_significance_percentage": {"type": "boolean"},
                    "exclude_self_comparison": {"type": "boolean"},
                    "ranking_method": {"type": "string"},
                    "handle_ranking_ties": {"type": "string"},
                    "minimum_etf_pairs_for_analysis": {"type": "integer"}
                }
            },
            "expected_results": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'strapsim_expected_correlation',
                    'strapsim_expected_pvalue',
                    'strapsim_expected_significance_5pct',
                    'strapsim_expected_significance_10pct',
                    'jaccard_expected_correlation',
                    'weighted_jaccard_expected_correlation',
                    'bertscore_expected_correlation',
                    'correlation_tolerance', 'pvalue_tolerance',
                    'significance_percentage_tolerance'
                ],
                "properties": {
                    "strapsim_expected_correlation": {"type": "number"},
                    "strapsim_expected_pvalue": {"type": "number"},
                    "strapsim_expected_significance_5pct": {"type": "integer"},
                    "strapsim_expected_significance_10pct": {"type": "integer"},
                    "jaccard_expected_correlation": {"type": "number"},
                    "weighted_jaccard_expected_correlation": {"type": "number"},
                    "bertscore_expected_correlation": {"type": "number"},
                    "correlation_tolerance": {"type": "number"},
                    "pvalue_tolerance": {"type": "number"},
                    "significance_percentage_tolerance": {"type": "integer"}
                }
            },
            "computational_config": {
                "type": "object",
                "additionalProperties": False,
                "required": [
                    'n_jobs', 'parallel_backend', 'float_precision',
                    'similarity_score_precision', 'weight_precision',
                    'correlation_precision', 'global_random_seed',
                    'ensure_reproducible_results',
                    'cache_similarity_matrices',
                    'garbage_collection_enabled', 'efficient_data_types'
                ],
                "properties": {
                    "n_jobs": {"type": "integer"},
                    "parallel_backend": {"type": "string"},
                    "float_precision": {"type": "string"},
                    "similarity_score_precision": {"type": "integer"},
                    "weight_precision": {"type": "integer"},
                    "correlation_precision": {"type": "integer"},
                    "global_random_seed": {"type": "integer"},
                    "ensure_reproducible_results": {"type": "boolean"},
                    "cache_similarity_matrices": {"type": "boolean"},
                    "garbage_collection_enabled": {"type": "boolean"},
                    "efficient_data_types": {"type": "boolean"}
                }
            }
        }
    }
    return schema


def _validate_config_schema(
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the structure and types of the config dict against a schema.

    This function serves as the first line of defense, ensuring the provided
    configuration dictionary has the correct nested structure, keys, and data
    types before proceeding to more specific value-based validation.

    Args:
        config (Dict[str, Any]): The configuration dictionary to validate.

    Returns:
        List[str]: A list of error messages. An empty list indicates success.
    """
    # Initialize a list to store validation error messages.
    errors = []
    try:
        # Create the validation schema.
        schema = _create_validation_schema()
        # Validate the provided config dictionary against the schema.
        # This will raise a ValidationError if the config is invalid.
        jsonschema.validate(instance=config, schema=schema)
    except jsonschema.ValidationError as e:
        # If validation fails, format a detailed error message.
        # The message includes the invalid value and the path to it.
        errors.append(
            f"Schema validation failed at '{'.'.join(map(str, e.path))}': {e.message}"
        )
    except jsonschema.SchemaError as e:
        # If the schema itself is invalid, this indicates a programming error.
        errors.append(f"Internal schema definition error: {e.message}")

    # Return the list of accumulated errors.
    return errors


def _validate_config_dates(
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates date-related parameters for value, format, logic, and business rules.

    This remediated function performs a complete and rigorous validation of all
    temporal parameters within the configuration. It ensures:
    1.  **Exact Value Compliance**: The date strings match the hardcoded values
        required for a precise replication of the study.
    2.  **Format Integrity**: The date strings are correctly formatted as 'YYYY-MM-DD'.
    3.  **Chronological Logic**: The return series start date precedes the end date.
    4.  **Business Day Convention**: All dates fall on the last business day of
        their respective months, a critical requirement for financial time-series.

    Args:
        config (Dict[str, Any]): The configuration dictionary to validate.

    Returns:
        List[str]: A list of all date-related validation error messages. An
                   empty list signifies that all temporal parameters are valid.
    """
    # Initialize a list to store validation error messages.
    errors: List[str] = []

    # Define the expected hardcoded date values for the study.
    expected_dates: Dict[str, str] = {
        'valuation_date': '2024-03-31',
        'return_series_start_date': '2022-02-28',
        'return_series_end_date': '2024-03-31'
    }

    # --- Step 1: Exact Value Validation ---
    # First, check if the raw string values match the study's specification.
    for key, expected_val in expected_dates.items():
        actual_val = config['data_setup'].get(key)
        if actual_val != expected_val:
            errors.append(
                f"Parameter 'data_setup.{key}' has incorrect value. Expected "
                f"'{expected_val}', but found '{actual_val}'."
            )

    # If there are already errors in the fundamental values, return early.
    if errors:
        return errors

    # --- Step 2: Format and Logic Validation ---
    # Define the expected date format for robust parsing.
    date_format = "%Y-%m-%d"
    # Define paths to the date parameters for iteration.
    date_keys: List[str] = list(expected_dates.keys())

    # Dictionary to store parsed date objects for subsequent logical checks.
    parsed_dates: Dict[str, Union[pd.Timestamp, None]] = {key: None for key in date_keys}

    # Loop through each date key to parse and validate the format.
    for key in date_keys:
        try:
            # Access the date string from the nested dictionary.
            date_str = config['data_setup'][key]
            # Attempt to parse the date string into a pandas Timestamp object.
            # This will raise a ValueError if the format is incorrect.
            parsed_dates[key] = pd.to_datetime(date_str, format=date_format)
        except ValueError:
            # This error should ideally not be reached if the schema is correct,
            # but it serves as a robust fallback.
            errors.append(
                f"Parameter 'data_setup.{key}' ('{date_str}') is not a valid "
                f"date in 'YYYY-MM-DD' format."
            )

    # If any date failed to parse, return the errors found so far.
    if any(d is None for d in parsed_dates.values()):
        return errors

    # --- Step 3: Chronological and Business Day Convention Validation ---
    # Proceed with logical checks now that all dates are parsed successfully.

    # Extract the parsed Timestamp objects for convenience.
    start_date = parsed_dates['return_series_start_date']
    end_date = parsed_dates['return_series_end_date']

    # Check 3a: Chronological order of the return series.
    if start_date >= end_date:
        errors.append(
            "'return_series_start_date' must be strictly before 'return_series_end_date'."
        )

    # Check 3b: Validate that each date is a month-end business day.
    for key, date_obj in parsed_dates.items():
        # is_on_offset checks if the date is on the specified frequency anchor.
        # MonthEnd(0) represents the last business day of the month.
        if not pd.tseries.offsets.MonthEnd(0).is_on_offset(date_obj):
            errors.append(
                f"Parameter 'data_setup.{key}' with value '{date_obj.date()}' is not a "
                "month-end business day."
            )

    # Return the list of all accumulated errors.
    return errors


def _validate_config_values(
    config: Dict[str, Any]
) -> List[str]:
    """
    Performs an exhaustive validation of specific parameter values in the config.

    This remediated function is a complete and rigorous implementation that
    checks every single value-based constraint specified in the instructions for
    Task 1, Step 3. It ensures that the configuration is not only structurally
    correct but also semantically identical to the one required for a precise
    replication of the study. It validates simple values, list contents, and
    numerical relationships across all sections of the configuration.

    Args:
        config (Dict[str, Any]): The configuration dictionary to validate.

    Returns:
        List[str]: A list of all value-related validation error messages. An
                   empty list signifies that all values are correct.
    """
    # Initialize a list to store validation error messages.
    errors: List[str] = []

    # Helper function for generating consistent error messages.
    def check_value(path: str, actual: Any, expected: Any):
        if actual != expected:
            errors.append(f"Parameter '{path}' has incorrect value. Expected '{expected}', but found '{actual}'.")

    def check_list_content(path: str, actual: List[Any], expected: List[Any]):
        # Use set comparison for order-agnostic check of list contents.
        if set(actual) != set(expected):
            errors.append(f"Parameter '{path}' has incorrect list contents. Expected elements: {set(expected)}, but found {set(actual)}.")

    # --- Data Setup Validations ---
    check_value('data_setup.total_corporate_bond_etfs', config['data_setup']['total_corporate_bond_etfs'], 20)
    check_value('data_setup.total_unique_bonds', config['data_setup']['total_unique_bonds'], 6870)
    check_value('data_setup.average_bonds_per_etf', config['data_setup']['average_bonds_per_etf'], 511)
    check_value('data_setup.return_type', config['data_setup']['return_type'], 'total_return')
    check_value('data_setup.currency_normalization', config['data_setup']['currency_normalization'], 'USD')

    # --- Feature Engineering Validations ---
    check_list_content(
        'feature_engineering.categorical_features_to_one_hot_encode',
        config['feature_engineering']['categorical_features_to_one_hot_encode'],
        ['issuer', 'industry', 'bond_rating', 'market', 'currency', 'country', 'flag_144a']
    )
    check_list_content(
        'feature_engineering.numerical_features',
        config['feature_engineering']['numerical_features'],
        ['days_to_maturity', 'age', 'coupon', 'amount_issued', 'coupon_frequency']
    )
    check_list_content(
        'feature_engineering.target_variables',
        config['feature_engineering']['target_variables'],
        ['oas', 'yield']
    )
    check_value('feature_engineering.training_set_fraction', config['feature_engineering']['training_set_fraction'], 0.90)
    check_value('feature_engineering.testing_set_fraction', config['feature_engineering']['testing_set_fraction'], 0.10)
    # Use math.isclose for robust floating-point comparison.
    train_frac = config['feature_engineering']['training_set_fraction']
    test_frac = config['feature_engineering']['testing_set_fraction']
    if not math.isclose(train_frac + test_frac, 1.0, abs_tol=1e-9):
        errors.append("Sum of 'training_set_fraction' and 'testing_set_fraction' must be 1.0.")
    check_value('feature_engineering.random_state_for_shuffle', config['feature_engineering']['random_state_for_shuffle'], 42)
    check_value('feature_engineering.shuffle_before_split', config['feature_engineering']['shuffle_before_split'], True)
    check_value('feature_engineering.missing_value_handling', config['feature_engineering']['missing_value_handling'], 'none_required')
    check_value('feature_engineering.validate_no_missing_values', config['feature_engineering']['validate_no_missing_values'], True)

    # --- Random Forest Config Validations ---
    rf_config = config['random_forest_config']
    check_value('random_forest_config.model_type', rf_config['model_type'], 'RandomForestRegressor')
    check_value('random_forest_config.cross_validation_folds', rf_config['cross_validation_folds'], 5)
    check_value('random_forest_config.cv_scoring_metric', rf_config['cv_scoring_metric'], 'neg_root_mean_squared_error')
    check_list_content(
        'random_forest_config.hyperparameters_to_tune.n_estimators',
        rf_config['hyperparameters_to_tune']['n_estimators'],
        [50, 100, 200, 500]
    )
    # Special handling for list containing None.
    expected_depths: Set[Union[int, None]] = {5, 10, 15, 20, None}
    actual_depths: Set[Union[int, None]] = set(rf_config['hyperparameters_to_tune']['max_depth'])
    if actual_depths != expected_depths:
        errors.append(f"Parameter 'random_forest_config.hyperparameters_to_tune.max_depth' has incorrect list contents. Expected elements: {expected_depths}, but found {actual_depths}.")
    check_value('random_forest_config.tuning_method', rf_config['tuning_method'], 'grid_search')
    check_value('random_forest_config.tuning_objective', rf_config['tuning_objective'], 'minimize_rmse')
    check_value('random_forest_config.random_state', rf_config['random_state'], 42)
    check_value('random_forest_config.n_jobs', rf_config['n_jobs'], -1)
    check_value('random_forest_config.multi_output_strategy', rf_config['multi_output_strategy'], 'direct')

    # --- STRAPSim Algorithm Validations ---
    strapsim_config = config['strapsim_algorithm']
    check_value('strapsim_algorithm.convergence_threshold', strapsim_config['convergence_threshold'], 1e-6)
    check_value('strapsim_algorithm.max_iterations', strapsim_config['max_iterations'], 10000)
    check_value('strapsim_algorithm.similarity_threshold', strapsim_config['similarity_threshold'], 0.0)
    check_value('strapsim_algorithm.weight_precision_tolerance', strapsim_config['weight_precision_tolerance'], 1e-8)
    check_value('strapsim_algorithm.similarity_function_type', strapsim_config['similarity_function_type'], 'random_forest_proximity')
    check_value('strapsim_algorithm.residual_tracking', strapsim_config['residual_tracking'], True)
    check_value('strapsim_algorithm.weight_conservation_validation', strapsim_config['weight_conservation_validation'], True)
    check_value('strapsim_algorithm.greedy_selection_strategy', strapsim_config['greedy_selection_strategy'], 'max_similarity')

    # --- Baseline Methods Validations ---
    # This confirms the sub-dictionaries contain the required boolean/string parameters.
    # The schema validation already confirms their existence.
    jaccard_conf = config['baseline_methods']['jaccard_config']
    check_value('baseline_methods.jaccard_config.exact_match_required', jaccard_conf.get('exact_match_required'), True)

    wj_conf = config['baseline_methods']['weighted_jaccard_config']
    check_value('baseline_methods.weighted_jaccard_config.min_weight_intersection', wj_conf.get('min_weight_intersection'), True)

    bert_conf = config['baseline_methods']['bertscore_config']
    check_value('baseline_methods.bertscore_config.compute_f1', bert_conf.get('compute_f1'), True)

    # Return the list of all accumulated errors.
    return errors


def validate_configuration(
    config: Dict[str, Any]
) -> Tuple[bool, List[str]]:
    """
    Orchestrates the complete validation of the research configuration.

    This function serves as the main entry point for configuration validation.
    It executes a series of checks in a specific order:
    1.  Structural and Type Validation: Ensures the configuration dictionary
        has the correct keys, nested structure, and data types.
    2.  Date Validation: Verifies the format, logic, and business day rules
        for all date-related parameters.
    3.  Value Validation: Checks specific numerical and list-based constraints
        that are critical for the study's methodology.

    The function aggregates all errors from these steps into a single,
    comprehensive report, providing a clear pass/fail status.

    Args:
        config (Dict[str, Any]): The complete configuration dictionary for the
                                 STRAPSim research pipeline.

    Returns:
        Tuple[bool, List[str]]: A tuple containing:
        - bool: True if the configuration is valid, False otherwise.
        - List[str]: A list of detailed error messages. If the list is
                     empty, the configuration is valid.
    """
    # Input validation for the function itself.
    if not isinstance(config, dict):
        return False, ["Configuration must be a dictionary."]

    # --- Step 1: Validate the overall structure and data types using a schema ---
    # This is the most robust way to check the nested structure.
    schema_errors = _validate_config_schema(config)
    # If the fundamental structure is wrong, we stop here as subsequent checks
    # might fail with KeyErrors.
    if schema_errors:
        return False, schema_errors

    # --- Step 2: Validate date formats, logic, and business rules ---
    # This requires custom logic that jsonschema cannot handle.
    date_errors = _validate_config_dates(config)

    # --- Step 3: Validate specific numerical values and list contents ---
    # This also requires custom logic for constraints like sums and set equality.
    value_errors = _validate_config_values(config)

    # --- Aggregation of all errors ---
    # Combine errors from all validation steps into a single list.
    all_errors = schema_errors + date_errors + value_errors

    # --- Final Verdict ---
    # The configuration is valid if and only if the list of errors is empty.
    is_valid = not all_errors

    # Return the final validation status and the list of all found errors.
    return is_valid, all_errors


In [None]:
# Task 2: ETF Holdings DataFrame Validation

def _validate_holdings_structure(
    etf_holdings_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the structural integrity of the ETF holdings DataFrame.

    This function performs the initial, most fundamental checks on the holdings
    DataFrame to ensure it conforms to the expected schema before any deeper
    analysis is attempted. It verifies:
    1.  The DataFrame is not empty.
    2.  It contains the exact set of required columns, with no extras.
    3.  The number of unique ETFs matches the study's configuration.

    Args:
        etf_holdings_df (pd.DataFrame): The DataFrame containing ETF holdings data.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of error messages describing structural violations.
                   An empty list signifies that the structure is valid.
    """
    # Initialize a list to store validation error messages.
    errors: List[str] = []

    # Check 1: Verify the DataFrame is not empty.
    # A non-empty DataFrame is a prerequisite for all other checks.
    if etf_holdings_df.empty:
        # If it's empty, add a critical error and return immediately.
        errors.append("ETF holdings DataFrame is empty.")
        return errors

    # Check 2: Verify the DataFrame contains the exact required columns.
    # Define the expected set of columns as per the specification.
    expected_columns: Set[str] = {
        'etf_id', 'etf_name', 'cusip', 'isin', 'sedol', 'weight',
        'market_value_usd', 'shares_held', 'as_of_date'
    }
    # Get the actual set of columns from the input DataFrame.
    actual_columns: Set[str] = set(etf_holdings_df.columns)

    # Compare the actual and expected sets.
    if actual_columns != expected_columns:
        # If they don't match, identify missing and unexpected columns.
        missing_cols = expected_columns - actual_columns
        extra_cols = actual_columns - expected_columns
        # Format a detailed error message.
        error_msg = "ETF holdings DataFrame column mismatch."
        if missing_cols:
            error_msg += f" Missing columns: {sorted(list(missing_cols))}."
        if extra_cols:
            error_msg += f" Extra columns: {sorted(list(extra_cols))}."
        errors.append(error_msg)

    # Check 3: Verify the number of unique ETFs matches the configuration.
    # Access the expected number from the config dictionary.
    expected_etf_count = config['data_setup']['total_corporate_bond_etfs']
    # Calculate the actual number of unique ETFs in the 'etf_id' column.
    actual_etf_count = etf_holdings_df['etf_id'].nunique()

    # Compare the actual count to the expected count.
    if actual_etf_count != expected_etf_count:
        # If they differ, format a precise error message.
        errors.append(
            f"Expected {expected_etf_count} unique ETFs based on config, but "
            f"found {actual_etf_count} in 'etf_id' column."
        )

    # Return the list of all accumulated structural errors.
    return errors


def _validate_holdings_dtypes(
    etf_holdings_df: pd.DataFrame
) -> List[str]:
    """
    Validates the data types of each column in the ETF holdings DataFrame.

    Ensuring correct data types is crucial for preventing runtime errors and
    ensuring the correctness of numerical and logical operations downstream.
    This function checks each column against its specified, expected data type.

    Args:
        etf_holdings_df (pd.DataFrame): The DataFrame containing ETF holdings data.

    Returns:
        List[str]: A list of error messages for any column with an incorrect
                   data type. An empty list signifies all dtypes are correct.
    """
    # Initialize a list to store validation error messages.
    errors: List[str] = []

    # Define the expected data type for each column.
    # Using string representations allows for flexibility (e.g., 'int64').
    expected_dtypes: Dict[str, str] = {
        'etf_id': 'object',
        'etf_name': 'object',
        'cusip': 'object',
        'isin': 'object',
        'sedol': 'object',
        'weight': 'float64',
        'market_value_usd': 'float64',
        'shares_held': 'int64',
        'as_of_date': 'datetime64[ns]'
    }

    # Iterate through the expected dtypes dictionary to check each column.
    for col, expected_dtype in expected_dtypes.items():
        # Check if the column exists in the DataFrame to avoid KeyErrors.
        if col in etf_holdings_df.columns:
            # Get the actual data type of the column.
            actual_dtype = etf_holdings_df[col].dtype
            # Compare the actual dtype string representation with the expected one.
            if str(actual_dtype) != expected_dtype:
                # If they don't match, format a detailed error message.
                errors.append(
                    f"Column '{col}' has incorrect dtype. Expected "
                    f"'{expected_dtype}', but found '{actual_dtype}'."
                )

    # Return the list of all accumulated data type errors.
    return errors


def _is_valid_cusip(cusip: str) -> bool:
    """
    Validates a single CUSIP identifier using its check digit algorithm.

    The CUSIP check digit is calculated using a variant of the Luhn algorithm.
    This function implements that algorithm precisely to verify the integrity
    of a given CUSIP string.

    The algorithm is as follows:
    1.  Convert all letters to their numeric value (A=10, B=11, ...).
    2.  Process the first 8 characters. For each character's numeric value:
        a. Multiply the value by 2 if it is in an even position (2nd, 4th, ...).
        b. If the result of the multiplication is a two-digit number, sum those
           two digits.
    3.  Sum the results of all 8 processed character values.
    4.  The check digit is `(10 - (total_sum % 10)) % 10`.
    5.  Compare this calculated digit to the 9th digit of the CUSIP.

    Args:
        cusip (str): The 9-character CUSIP string to validate.

    Returns:
        bool: True if the CUSIP's check digit is valid, False otherwise.
    """
    # Pre-computation check: CUSIP must be a 9-character string.
    if not isinstance(cusip, str) or len(cusip) != 9:
        return False

    # CUSIP character value mapping.
    # ord(c) - ord('A') + 10 maps A->10, B->11, etc.
    # Special characters '*' -> 36, '@' -> 37, '#' -> 38.
    def get_value(char: str) -> int:
        if '0' <= char <= '9':
            return int(char)
        if 'A' <= char <= 'Z':
            return ord(char) - ord('A') + 10
        if char == '*':
            return 36
        if char == '@':
            return 37
        if char == '#':
            return 38
        return -1 # Invalid character

    total_sum = 0
    # Iterate through the first 8 characters of the CUSIP.
    for i, char in enumerate(cusip[:8]):
        # Get the numeric value of the character.
        val = get_value(char)
        if val == -1:
            return False # Invalid character found.

        # Multiply the value by 2 for even positions (1-indexed, so 0-indexed i=1,3,5,7).
        if (i + 1) % 2 == 0:
            val *= 2

        # Sum the digits of the resulting value (e.g., 14 becomes 1 + 4 = 5).
        # This is equivalent to val // 10 + val % 10.
        total_sum += (val // 10) + (val % 10)

    # Calculate the expected check digit.
    # The formula is (10 - (sum mod 10)) mod 10.
    expected_check_digit = (10 - (total_sum % 10)) % 10

    # Get the actual check digit from the CUSIP.
    actual_check_digit = get_value(cusip[8])

    # Compare the calculated check digit with the actual one.
    return expected_check_digit == actual_check_digit


def _validate_holdings_quality(
    etf_holdings_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates data quality and business rules in the ETF holdings DataFrame.

    This remediated function performs deep, content-level validation, now
    including the critical CUSIP check digit validation. It checks:
    1.  Identifier formats (length and check digit integrity).
    2.  Numerical constraints (e.g., weights are in [0, 1]).
    3.  Portfolio-level aggregations (e.g., weights for each ETF sum to 1.0).
    4.  Temporal consistency (all holdings share the same valuation date).

    Args:
        etf_holdings_df (pd.DataFrame): The DataFrame containing ETF holdings data.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of detailed error messages for data quality violations.
    """
    # Initialize a list to store validation error messages.
    errors: List[str] = []

    # --- Identifier Length and Null Checks ---
    id_lengths = {'cusip': 9, 'isin': 12, 'sedol': 7}
    for col, length in id_lengths.items():
        if etf_holdings_df[col].isnull().any():
            errors.append(f"Column '{col}' contains missing (null) values.")
        incorrect_length_mask = (etf_holdings_df[col].str.len() != length) & (etf_holdings_df[col].notna())
        if incorrect_length_mask.any():
            count = incorrect_length_mask.sum()
            errors.append(f"Found {count} records in column '{col}' with incorrect length (expected {length}).")

    # --- CUSIP Check Digit Validation ---
    # Apply the check digit validation function to all non-null CUSIPs.
    valid_cusip_mask = etf_holdings_df['cusip'].notna()
    # The .apply() method iterates through the Series, applying the function to each element.
    is_valid_series = etf_holdings_df.loc[valid_cusip_mask, 'cusip'].apply(_is_valid_cusip)
    # Identify the CUSIPs that failed the validation.
    invalid_cusips = etf_holdings_df.loc[valid_cusip_mask, 'cusip'][~is_valid_series]
    if not invalid_cusips.empty:
        count = len(invalid_cusips)
        errors.append(
            f"Found {count} CUSIPs with invalid check digits. "
            f"Examples: {invalid_cusips.unique().tolist()[:5]}"
        )

    # --- Numerical Column Constraints ---
    if not etf_holdings_df['weight'].between(0.0, 1.0).all():
        errors.append("Column 'weight' contains values outside the [0.0, 1.0] range.")
    if (etf_holdings_df['market_value_usd'] < 0).any():
        errors.append("Column 'market_value_usd' contains negative values.")
    if (etf_holdings_df['shares_held'] <= 0).any():
        errors.append("Column 'shares_held' contains non-positive values.")

    # --- Group-level Validation: Per-ETF Weight Summation ---
    weight_sums = etf_holdings_df.groupby('etf_id')['weight'].sum()
    tolerance = 1e-6
    failing_etfs = weight_sums[~np.isclose(weight_sums, 1.0, atol=tolerance)]
    if not failing_etfs.empty:
        for etf_id, total_weight in failing_etfs.items():
            errors.append(f"ETF '{etf_id}' weights do not sum to 1.0 (sum: {total_weight:.8f}).")

    # --- Temporal Consistency Check ---
    if etf_holdings_df['as_of_date'].nunique() != 1:
        errors.append("Column 'as_of_date' contains multiple different dates.")
    else:
        actual_date = etf_holdings_df['as_of_date'].iloc[0]
        expected_date = pd.to_datetime(config['data_setup']['valuation_date'])
        if actual_date != expected_date:
            errors.append(f"'as_of_date' ({actual_date.date()}) does not match the configured 'valuation_date' ({expected_date.date()}).")

    # Return the list of all accumulated data quality errors.
    return errors


def validate_etf_holdings_dataframe(
    etf_holdings_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[bool, List[str]]:
    """
    Orchestrates the complete validation of the ETF holdings DataFrame.

    This master function provides a single entry point to rigorously validate
    the ETF holdings data against the study's specifications. It executes a
    sequence of validation steps, ensuring that the data is structurally sound,
    has the correct data types, and adheres to all financial and logical
    business rules before being used in the research pipeline.

    The validation proceeds in three phases:
    1.  **Structure Validation**: Checks column names, DataFrame shape, and ETF count.
    2.  **Data Type Validation**: Verifies that each column has the expected dtype.
    3.  **Data Quality Validation**: Performs deep content checks on identifiers,
        numerical ranges, and portfolio-level constraints.

    Args:
        etf_holdings_df (pd.DataFrame): The DataFrame containing ETF portfolio
                                        holdings data to be validated.
        config (Dict[str, Any]): The validated global configuration dictionary,
                                 which provides ground-truth parameters for
                                 validation (e.g., number of ETFs).

    Returns:
        Tuple[bool, List[str]]: A tuple containing:
        - bool: True if the DataFrame is valid, False otherwise.
        - List[str]: A comprehensive list of all validation errors found. An
                     empty list indicates that the DataFrame passed all checks.
    """
    # Perform initial input type check.
    if not isinstance(etf_holdings_df, pd.DataFrame):
        return False, ["Input 'etf_holdings_df' must be a pandas DataFrame."]
    if not isinstance(config, dict):
        return False, ["Input 'config' must be a dictionary."]

    # --- Step 1: Validate the DataFrame's structure ---
    # This is a prerequisite for subsequent checks. If the structure is wrong,
    # other checks might fail with unexpected errors.
    structure_errors = _validate_holdings_structure(etf_holdings_df, config)
    if structure_errors:
        # If critical structural errors are found, return immediately.
        return False, structure_errors

    # --- Step 2: Validate the data types of all columns ---
    # This ensures that numerical operations and date logic will execute correctly.
    dtype_errors = _validate_holdings_dtypes(etf_holdings_df)

    # --- Step 3: Perform deep data quality and business rule checks ---
    # This is the most intensive step, verifying the content of the data.
    quality_errors = _validate_holdings_quality(etf_holdings_df, config)

    # --- Aggregation of all errors ---
    # Combine errors from all validation steps into a single report.
    all_errors = dtype_errors + quality_errors

    # --- Final Verdict ---
    # The DataFrame is considered valid if and only if no errors were found.
    is_valid = not all_errors

    # Return the final validation status and the comprehensive list of errors.
    return is_valid, all_errors


In [None]:
# Task 3: Bond Features DataFrame Validation

def _validate_features_structure(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the structural integrity of the bond features DataFrame.

    This function ensures the DataFrame has the correct fundamental structure:
    1.  It is not empty.
    2.  It contains the exact set of required feature columns.
    3.  The number of unique securities (bonds) matches the configuration.

    Args:
        bond_features_df (pd.DataFrame): The DataFrame with bond feature data.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of error messages for structural violations.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Check 1: Ensure the DataFrame is not empty.
    if bond_features_df.empty:
        errors.append("Bond features DataFrame is empty.")
        return errors

    # Check 2: Verify the exact set of columns.
    # This ensures all features needed for the model are present and no extra
    # columns exist that could interfere with processing.
    expected_columns: Set[str] = {
        'cusip', 'issuer', 'industry', 'days_to_maturity', 'age_days',
        'bond_rating_composite', 'country_of_risk', 'coupon_rate',
        'coupon_frequency', 'amount_issued_usd', 'rule_144a_flag',
        'oas_bp', 'yield_to_maturity'
    }
    actual_columns: Set[str] = set(bond_features_df.columns)

    if actual_columns != expected_columns:
        missing_cols = expected_columns - actual_columns
        extra_cols = actual_columns - expected_columns
        error_msg = "Bond features DataFrame column mismatch."
        if missing_cols:
            error_msg += f" Missing columns: {sorted(list(missing_cols))}."
        if extra_cols:
            error_msg += f" Extra columns: {sorted(list(extra_cols))}."
        errors.append(error_msg)

    # Check 3: Verify the number of unique bonds.
    # This confirms the dataset scope aligns with the study's specification.
    expected_bond_count = config['data_setup']['total_unique_bonds']
    actual_bond_count = bond_features_df['cusip'].nunique()

    if actual_bond_count != expected_bond_count:
        errors.append(
            f"Expected {expected_bond_count} unique bonds based on config, but "
            f"found {actual_bond_count} unique CUSIPs."
        )

    # Check 4: Verify CUSIP column is unique (primary key constraint).
    if bond_features_df['cusip'].duplicated().any():
        duplicate_count = bond_features_df['cusip'].duplicated().sum()
        errors.append(f"Primary key 'cusip' is not unique. Found {duplicate_count} duplicates.")

    return errors


def _validate_features_dtypes(
    bond_features_df: pd.DataFrame
) -> List[str]:
    """
    Validates the data types of columns in the bond features DataFrame.

    This function performs rigorous data type validation, including special
    handling for ordered categorical data, which is essential for models that
    can leverage ordinal information (like Random Forests).

    Args:
        bond_features_df (pd.DataFrame): The DataFrame with bond feature data.

    Returns:
        List[str]: A list of error messages for data type violations.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Define expected dtypes for all columns.
    expected_dtypes: Dict[str, str] = {
        'cusip': 'object',
        'issuer': 'object',
        'industry': 'category',
        'days_to_maturity': 'int32',
        'age_days': 'int32',
        'bond_rating_composite': 'category',
        'country_of_risk': 'category',
        'coupon_rate': 'float64',
        'coupon_frequency': 'int8',
        'amount_issued_usd': 'float64',
        'rule_144a_flag': 'bool',
        'oas_bp': 'float64',
        'yield_to_maturity': 'float64'
    }

    # Define the required order for the bond rating categories.
    # This is critical for ensuring the model correctly interprets credit risk.
    rating_order: List[str] = [
        'AAA', 'AA+', 'AA', 'AA-', 'A+', 'A', 'A-', 'BBB+', 'BBB', 'BBB-',
        'BB+', 'BB', 'BB-', 'B+', 'B', 'B-', 'CCC+', 'CCC', 'CCC-', 'CC', 'C', 'D'
    ]

    # Iterate through columns and their expected dtypes.
    for col, expected_dtype in expected_dtypes.items():
        if col in bond_features_df.columns:
            actual_dtype = bond_features_df[col].dtype

            # Special validation for the ordered categorical 'bond_rating_composite'.
            if col == 'bond_rating_composite':
                if not pd.api.types.is_categorical_dtype(actual_dtype):
                    errors.append(f"Column '{col}' is not a categorical dtype.")
                # Check if the categories are correctly ordered.
                elif not actual_dtype.ordered:
                    errors.append(f"Categorical column '{col}' is not ordered.")
                # Check if the set of categories matches the expected order exactly.
                elif list(actual_dtype.categories) != rating_order:
                    errors.append(
                        f"Column '{col}' has incorrect category order or items."
                    )
            # General dtype check for all other columns.
            elif str(actual_dtype) != expected_dtype:
                errors.append(
                    f"Column '{col}' has incorrect dtype. Expected "
                    f"'{expected_dtype}', but found '{actual_dtype}'."
                )

    return errors


def _validate_features_quality(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates data quality and business rules for the bond features DataFrame.

    This function executes a suite of content-level checks to ensure the data
    is financially and logically sound. It validates numerical ranges, set
    membership for categorical codes, and the absence of missing values as
    per the study's strict requirements.

    Args:
        bond_features_df (pd.DataFrame): The DataFrame with bond feature data.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of error messages for data quality violations.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # --- Numerical Range Validations ---
    # Each check uses a vectorized boolean mask for efficiency.
    range_checks = {
        'days_to_maturity': (1, 10950),
        'age_days': (1, 7300),
        'coupon_rate': (0.0, 8.5),
        'amount_issued_usd': (100_000_000, 50_000_000_000),
        'oas_bp': (-10, 3000),
        'yield_to_maturity': (0.125, 12.875)
    }
    for col, (min_val, max_val) in range_checks.items():
        if not bond_features_df[col].between(min_val, max_val).all():
            # Find the number of violating entries for a more informative error.
            violations = bond_features_df[~bond_features_df[col].between(min_val, max_val)]
            errors.append(
                f"Column '{col}' has {len(violations)} values outside the expected "
                f"range [{min_val}, {max_val}]. Example violation: "
                f"{violations[col].iloc[0]}"
            )

    # --- Set Membership Validation ---
    # Check 'coupon_frequency' against a set of valid integer codes.
    valid_frequencies = {1, 2, 4, 12}
    if not bond_features_df['coupon_frequency'].isin(valid_frequencies).all():
        violations = bond_features_df[~bond_features_df['coupon_frequency'].isin(valid_frequencies)]
        errors.append(
            f"Column 'coupon_frequency' contains invalid values. "
            f"Expected one of {valid_frequencies}. Found {len(violations)} violations. "
            f"Example violation: {violations['coupon_frequency'].iloc[0]}"
        )

    # --- Missing Value Validation ---
    # This check is conditional on the configuration flag.
    if config['feature_engineering']['validate_no_missing_values']:
        # Calculate the total number of missing values in the entire DataFrame.
        missing_count = bond_features_df.isnull().sum().sum()
        if missing_count > 0:
            # If missing values exist, report the count and columns affected.
            missing_per_col = bond_features_df.isnull().sum()
            affected_cols = missing_per_col[missing_per_col > 0].to_dict()
            errors.append(
                f"Found {missing_count} total missing values, but config "
                f"requires none. Affected columns: {affected_cols}."
            )

    return errors


def validate_bond_features_dataframe(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[bool, List[str]]:
    """
    Orchestrates the complete validation of the bond features DataFrame.

    This master function provides a single, reliable entry point to validate
    the bond security master data. It ensures the data used to train the
    constituent similarity model is of the highest quality and conforms
    precisely to the research methodology.

    The validation proceeds in three phases:
    1.  **Structure Validation**: Checks column names, DataFrame shape, and the
        uniqueness and count of the primary key (CUSIP).
    2.  **Data Type Validation**: Verifies dtypes, with special attention to
        the correct implementation of ordered categorical data for credit ratings.
    3.  **Data Quality Validation**: Enforces strict numerical ranges, set
        membership rules, and the critical no-missing-values constraint.

    Args:
        bond_features_df (pd.DataFrame): The DataFrame containing bond feature
                                          data to be validated.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[bool, List[str]]: A tuple containing:
        - bool: True if the DataFrame is valid, False otherwise.
        - List[str]: A comprehensive list of all validation errors found.
    """
    # Perform initial input type checks.
    if not isinstance(bond_features_df, pd.DataFrame):
        return False, ["Input 'bond_features_df' must be a pandas DataFrame."]
    if not isinstance(config, dict):
        return False, ["Input 'config' must be a dictionary."]

    # --- Step 1: Validate the DataFrame's structure ---
    structure_errors = _validate_features_structure(bond_features_df, config)
    if structure_errors:
        # Halt validation if the basic structure is incorrect.
        return False, structure_errors

    # --- Step 2: Validate the data types of all columns ---
    dtype_errors = _validate_features_dtypes(bond_features_df)

    # --- Step 3: Perform deep data quality and business rule checks ---
    quality_errors = _validate_features_quality(bond_features_df, config)

    # --- Aggregation of all errors ---
    all_errors = dtype_errors + quality_errors

    # --- Final Verdict ---
    is_valid = not all_errors

    return is_valid, all_errors


In [None]:
# Task 4: Monthly Returns DataFrame Validation

def _validate_returns_structure(
    monthly_returns_df: pd.DataFrame,
    etf_holdings_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the structure and dimensions of the monthly returns DataFrame.

    This function ensures the returns DataFrame has the correct shape, columns,
    and temporal length, and that its ETF identifiers are perfectly consistent
    with the holdings data. It verifies:
    1.  The DataFrame is not empty and contains a 'date' column.
    2.  The number of ETF columns matches the configuration.
    3.  The set of ETF columns exactly matches the ETF identifiers in holdings.
    4.  The number of rows (observations) matches the expected time series length.

    Args:
        monthly_returns_df (pd.DataFrame): DataFrame of monthly ETF returns.
        etf_holdings_df (pd.DataFrame): Validated DataFrame of ETF holdings.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of error messages for structural violations.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Check 1: Verify the DataFrame is not empty and has a 'date' column.
    if monthly_returns_df.empty:
        errors.append("Monthly returns DataFrame is empty.")
        return errors
    if 'date' not in monthly_returns_df.columns:
        errors.append("Monthly returns DataFrame is missing the 'date' column.")
        return errors

    # Isolate the ETF return columns (all columns except 'date').
    etf_return_columns = sorted([c for c in monthly_returns_df.columns if c != 'date'])

    # Check 2: Verify the number of ETF columns.
    expected_etf_count = config['data_setup']['total_corporate_bond_etfs']
    if len(etf_return_columns) != expected_etf_count:
        errors.append(
            f"Expected {expected_etf_count} ETF return columns, but found "
            f"{len(etf_return_columns)}."
        )
        # Return early if column count is wrong, as consistency check will fail.
        return errors

    # Check 3: Verify consistency of ETF identifiers with holdings data.
    etf_ids_in_holdings = sorted(list(etf_holdings_df['etf_id'].unique()))
    if etf_return_columns != etf_ids_in_holdings:
        # Use set operations for a detailed comparison.
        set_returns = set(etf_return_columns)
        set_holdings = set(etf_ids_in_holdings)
        missing_in_returns = set_holdings - set_returns
        extra_in_returns = set_returns - set_holdings
        error_msg = "ETF identifiers in returns do not match holdings."
        if missing_in_returns:
            error_msg += f" Missing from returns: {sorted(list(missing_in_returns))}."
        if extra_in_returns:
            error_msg += f" Extra in returns: {sorted(list(extra_in_returns))}."
        errors.append(error_msg)

    # Check 4: Verify the number of monthly observations.
    # Generate the expected date range to determine the number of observations.
    expected_dates = pd.date_range(
        start=config['data_setup']['return_series_start_date'],
        end=config['data_setup']['return_series_end_date'],
        freq='M'
    )
    expected_obs_count = len(expected_dates)
    actual_obs_count = len(monthly_returns_df)
    if actual_obs_count != expected_obs_count:
        errors.append(
            f"Expected {expected_obs_count} monthly observations, but found "
            f"{actual_obs_count} rows."
        )

    return errors


def _validate_returns_temporal_integrity(
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the temporal integrity of the monthly returns DataFrame.

    This function focuses on the 'date' column, ensuring it forms a valid,
    clean, and consistent time axis for the analysis. It verifies:
    1.  The 'date' column is a proper datetime type.
    2.  The dates are unique and sorted chronologically.
    3.  Each date is the last business day of its month.
    4.  The start and end dates match the configuration exactly.

    Args:
        monthly_returns_df (pd.DataFrame): DataFrame of monthly ETF returns.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of error messages for temporal violations.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Make a copy to avoid modifying the original DataFrame.
    df = monthly_returns_df.copy()

    # Check 1: Ensure 'date' column can be parsed as datetime.
    try:
        df['date'] = pd.to_datetime(df['date'])
    except Exception as e:
        errors.append(f"Could not parse 'date' column as datetime: {e}")
        return errors # Halt if dates are unparsable.

    # Check 2: Verify dates are unique and sorted.
    if not df['date'].is_unique:
        errors.append("Dates in 'date' column are not unique.")
    if not df['date'].is_monotonic_increasing:
        errors.append("Dates in 'date' column are not sorted chronologically.")

    # Check 3: Verify each date is a month-end business day.
    # This is a strict check for financial time-series data.
    month_end_checker = pd.tseries.offsets.MonthEnd(0)
    non_monthend_dates = [
        d.date() for d in df['date'] if not month_end_checker.is_on_offset(d)
    ]
    if non_monthend_dates:
        errors.append(
            f"Found {len(non_monthend_dates)} dates that are not the last "
            f"business day of the month. Examples: {non_monthend_dates[:3]}"
        )

    # Check 4: Verify start and end dates match the configuration.
    expected_start = pd.to_datetime(config['data_setup']['return_series_start_date'])
    expected_end = pd.to_datetime(config['data_setup']['return_series_end_date'])
    if df['date'].iloc[0] != expected_start:
        errors.append(
            f"First date '{df['date'].iloc[0].date()}' does not match expected "
            f"start date '{expected_start.date()}'."
        )
    if df['date'].iloc[-1] != expected_end:
        errors.append(
            f"Last date '{df['date'].iloc[-1].date()}' does not match expected "
            f"end date '{expected_end.date()}'."
        )

    return errors


def _validate_returns_quality(
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the quality of the numerical return values.

    This remediated function inspects the return data itself, ensuring it is
    clean, complete, and correctly formatted with the required precision. It verifies:
    1.  There are absolutely no missing (NaN) values.
    2.  The data types of all return columns are float64.
    3.  The return values are in decimal format (e.g., 0.01), not percentage.
    4.  **[NEW]** The return values adhere to the specified 4-decimal-place precision.

    Args:
        monthly_returns_df (pd.DataFrame): DataFrame of monthly ETF returns.
        config (Dict[str, Any]): The research configuration dictionary.

    Returns:
        List[str]: A list of error messages for data quality violations.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Isolate the ETF return columns for quality checks.
    etf_return_columns = [c for c in monthly_returns_df.columns if c != 'date']
    returns_data = monthly_returns_df[etf_return_columns]

    # Check 1: Verify no missing values exist anywhere in the return data.
    # This is a strict requirement of the paper's methodology.
    if returns_data.isnull().values.any():
        missing_counts = returns_data.isnull().sum()
        affected_cols = missing_counts[missing_counts > 0].to_dict()
        errors.append(f"Missing values found in return data. Columns: {affected_cols}.")

    # Check 2: Verify data types of all return columns are float.
    for col in etf_return_columns:
        if not pd.api.types.is_float_dtype(returns_data[col]):
            errors.append(
                f"Return column '{col}' is not a float dtype "
                f"(found {returns_data[col].dtype})."
            )

    # If there are non-float columns, subsequent checks may fail. Return early.
    if errors:
        return errors

    # Check 3: Validate that returns are in decimal format.
    # A robust heuristic is to check if the absolute maximum return is within a
    # plausible range for decimal returns (e.g., < 50% or 0.5).
    max_abs_return = returns_data.abs().max().max()
    if max_abs_return > 0.5:
        errors.append(
            f"Maximum absolute return is {max_abs_return:.2%}, which suggests "
            "data may be in percentage format instead of decimal."
        )

    # --- Check 4: Validate 4-decimal-place precision ---
    # This check ensures the data conforms to the specified numerical precision.
    # Get the required precision for correlation calculations from the config.
    precision = config['computational_config']['correlation_precision']
    if precision != 4:
        errors.append(f"Configuration expects correlation_precision of 4, but is {precision}. Precision check may be invalid.")

    # Compare the original data to the data rounded to the specified precision.
    # Using numpy.allclose is essential for robust floating-point comparison,
    # avoiding issues with representation errors. A very small tolerance
    # effectively checks for equality for numbers with limited precision.
    returns_rounded = returns_data.round(precision)
    if not np.allclose(returns_data, returns_rounded, atol=1e-9, equal_nan=True):
        # Find the first cell that violates the precision for an actionable error message.
        diff_mask = ~np.isclose(returns_data, returns_rounded, atol=1e-9, equal_nan=True)
        first_violation_col = diff_mask.any(axis=0).idxmax()
        first_violation_row = diff_mask[first_violation_col].idxmax()
        original_val = returns_data.loc[first_violation_row, first_violation_col]

        errors.append(
            f"Return values do not adhere to the required {precision}-decimal-place "
            f"precision. Example violation in column '{first_violation_col}' "
            f"at index '{first_violation_row}' with value {original_val}."
        )

    return errors


def validate_monthly_returns_dataframe(
    monthly_returns_df: pd.DataFrame,
    etf_holdings_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[bool, List[str]]:
    """
    Orchestrates the complete validation of the monthly returns DataFrame.

    This master function provides a single entry point to rigorously validate
    the time-series return data, which serves as the ground truth for the
    study's final evaluation. A failure in this data's integrity would
    invalidate all results.

    The validation proceeds in three phases:
    1.  **Structure Validation**: Checks column names, ETF consistency with
        holdings, and the exact number of time-series observations.
    2.  **Temporal Integrity Validation**: Ensures the date index is clean,
        chronological, and adheres to the month-end business day convention.
    3.  **Data Quality Validation**: Verifies the absence of missing values and
        the correct decimal format of the return figures.

    Args:
        monthly_returns_df (pd.DataFrame): The DataFrame containing monthly
                                           return data to be validated.
        etf_holdings_df (pd.DataFrame): The validated DataFrame of ETF holdings,
                                        used for cross-referencing ETF identifiers.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[bool, List[str]]: A tuple containing:
        - bool: True if the DataFrame is valid, False otherwise.
        - List[str]: A comprehensive list of all validation errors found.
    """
    # Perform initial input type checks.
    if not isinstance(monthly_returns_df, pd.DataFrame):
        return False, ["Input 'monthly_returns_df' must be a pandas DataFrame."]
    if not isinstance(etf_holdings_df, pd.DataFrame):
        return False, ["Input 'etf_holdings_df' must be a pandas DataFrame."]
    if not isinstance(config, dict):
        return False, ["Input 'config' must be a dictionary."]

    # --- Step 1: Validate the DataFrame's structure and consistency ---
    structure_errors = _validate_returns_structure(
        monthly_returns_df, etf_holdings_df, config
    )
    if structure_errors:
        # Halt if the basic structure is incorrect.
        return False, structure_errors

    # --- Step 2: Validate the temporal integrity of the date index ---
    temporal_errors = _validate_returns_temporal_integrity(
        monthly_returns_df, config
    )

    # --- Step 3: Perform deep data quality checks on return values ---
    quality_errors = _validate_returns_quality(monthly_returns_df)

    # --- Aggregation of all errors ---
    all_errors = temporal_errors + quality_errors

    # --- Final Verdict ---
    is_valid = not all_errors

    return is_valid, all_errors


In [None]:
# Task 5: Cross-Dataset Consistency Validation

def _validate_cusip_consistency(
    etf_holdings_df: pd.DataFrame,
    bond_features_df: pd.DataFrame
) -> List[str]:
    """
    Validates the consistency of the CUSIP universe across datasets.

    This function ensures that the universe of securities is coherent between the
    portfolio holdings and the security master feature data. It enforces the
    critical rule that every bond listed in a portfolio must have a corresponding
    entry in the bond features DataFrame.

    Args:
        etf_holdings_df (pd.DataFrame): The validated DataFrame of ETF holdings.
        bond_features_df (pd.DataFrame): The validated DataFrame of bond features.

    Returns:
        List[str]: A list of error messages related to CUSIP universe mismatches.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Extract unique CUSIPs from both DataFrames into sets for efficient comparison.
    # Sets provide highly optimized operations for membership and difference checks.
    cusips_in_holdings: Set[str] = set(etf_holdings_df['cusip'].unique())
    cusips_in_features: Set[str] = set(bond_features_df['cusip'].unique())

    # Check 1: Every CUSIP in holdings must exist in the features master file.
    # This is a non-negotiable referential integrity constraint.
    # We check if the holdings set is a subset of the features set.
    if not cusips_in_holdings.issubset(cusips_in_features):
        # If not, identify the specific CUSIPs that are missing from the features data.
        missing_cusips = cusips_in_holdings - cusips_in_features
        # Format a detailed and actionable error message.
        errors.append(
            f"Found {len(missing_cusips)} CUSIPs in the holdings data that are "
            f"missing from the bond features data. Examples: "
            f"{sorted(list(missing_cusips))[:5]}"
        )

    return errors


def _validate_temporal_alignment(
    etf_holdings_df: pd.DataFrame,
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the temporal (date) alignment across all datasets.

    This function ensures that the portfolio snapshot (holdings) is perfectly
    aligned in time with the final observation of the performance data (returns),
    a cornerstone of point-in-time financial analysis.

    Args:
        etf_holdings_df (pd.DataFrame): The validated DataFrame of ETF holdings.
        monthly_returns_df (pd.DataFrame): The validated DataFrame of returns.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of error messages for temporal misalignments.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Check 1: Verify the holdings data corresponds to a single valuation date.
    # The unique 'as_of_date' from holdings is the anchor point for our analysis.
    holdings_date = etf_holdings_df['as_of_date'].unique()
    if len(holdings_date) != 1:
        # This was already checked in Task 2, but is re-verified here for completeness.
        errors.append("Holdings data must contain only a single 'as_of_date'.")
        return errors # Halt if this fundamental assumption is violated.

    # Extract the single holdings date and the expected valuation date from config.
    holdings_date = pd.to_datetime(holdings_date[0])
    expected_valuation_date = pd.to_datetime(config['data_setup']['valuation_date'])

    # Check 2: Ensure the holdings date matches the configuration.
    if holdings_date != expected_valuation_date:
        errors.append(
            f"Holdings 'as_of_date' ({holdings_date.date()}) does not match "
            f"configured 'valuation_date' ({expected_valuation_date.date()})."
        )

    # Check 3: Ensure the final date in the returns series aligns with the holdings date.
    # The ground truth evaluation period must end at the same time as the portfolio snapshot.
    last_return_date = pd.to_datetime(monthly_returns_df['date'].iloc[-1])
    if last_return_date != holdings_date:
        errors.append(
            f"The last date in the monthly returns series ({last_return_date.date()}) "
            f"does not match the holdings 'as_of_date' ({holdings_date.date()})."
        )

    return errors


def _validate_etf_id_consistency(
    etf_holdings_df: pd.DataFrame,
    monthly_returns_df: pd.DataFrame
) -> List[str]:
    """
    Validates the consistency of the ETF identifier universe across datasets.

    This function ensures a perfect one-to-one correspondence between the ETFs
    defined in the holdings data and the ETFs for which return series are
    provided. This prevents misalignments where an ETF might have holdings but
    no returns, or vice-versa.

    Args:
        etf_holdings_df (pd.DataFrame): The validated DataFrame of ETF holdings.
        monthly_returns_df (pd.DataFrame): The validated DataFrame of returns.

    Returns:
        List[str]: A list of error messages for ETF universe inconsistencies.
    """
    # Initialize a list to hold validation errors.
    errors: List[str] = []

    # Extract the universe of ETF identifiers from the holdings data.
    etf_ids_in_holdings: Set[str] = set(etf_holdings_df['etf_id'].unique())

    # Extract the universe of ETF identifiers from the returns data columns.
    etf_ids_in_returns: Set[str] = set(
        [c for c in monthly_returns_df.columns if c != 'date']
    )

    # Check 1: The two sets of ETF identifiers must be identical.
    # This is a strict equality check, ensuring no discrepancies.
    if etf_ids_in_holdings != etf_ids_in_returns:
        # If they are not identical, perform a detailed diagnosis.
        missing_from_returns = etf_ids_in_holdings - etf_ids_in_returns
        missing_from_holdings = etf_ids_in_returns - etf_ids_in_holdings
        error_msg = "ETF identifier universe is inconsistent between holdings and returns."
        if missing_from_returns:
            error_msg += (
                f" ETFs in holdings but not in returns: {sorted(list(missing_from_returns))}."
            )
        if missing_from_holdings:
            error_msg += (
                f" ETFs in returns but not in holdings: {sorted(list(missing_from_holdings))}."
            )
        errors.append(error_msg)

    return errors


def validate_cross_dataset_consistency(
    etf_holdings_df: pd.DataFrame,
    bond_features_df: pd.DataFrame,
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[bool, List[str]]:
    """
    Orchestrates the validation of referential integrity across all datasets.

    This master function serves as the final gate in the data validation phase.
    It ensures that the three core datasets (holdings, features, returns) are
    not only internally consistent but also perfectly aligned with each other.
    This cross-validation is essential for the integrity of the entire
    research pipeline.

    The validation proceeds in three phases:
    1.  **CUSIP Consistency**: Verifies that all securities in portfolios have
        corresponding feature data.
    2.  **Temporal Alignment**: Ensures the portfolio snapshot date aligns with
        the returns data and configuration.
    3.  **ETF ID Consistency**: Confirms a perfect one-to-one mapping of ETFs
        across the holdings and returns datasets.

    Args:
        etf_holdings_df (pd.DataFrame): The validated ETF holdings DataFrame.
        bond_features_df (pd.DataFrame): The validated bond features DataFrame.
        monthly_returns_df (pd.DataFrame): The validated monthly returns DataFrame.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[bool, List[str]]: A tuple containing:
        - bool: True if all datasets are consistent with each other, False otherwise.
        - List[str]: A comprehensive list of all consistency errors found.
    """
    # Perform initial input type checks to ensure all inputs are DataFrames.
    if not all(isinstance(df, pd.DataFrame) for df in [etf_holdings_df, bond_features_df, monthly_returns_df]):
        return False, ["All data inputs must be pandas DataFrames."]
    if not isinstance(config, dict):
        return False, ["Input 'config' must be a dictionary."]

    # --- Step 1: Validate the CUSIP universe consistency ---
    cusip_errors = _validate_cusip_consistency(etf_holdings_df, bond_features_df)

    # --- Step 2: Validate the temporal (date) alignment ---
    temporal_errors = _validate_temporal_alignment(
        etf_holdings_df, monthly_returns_df, config
    )

    # --- Step 3: Validate the ETF identifier universe consistency ---
    etf_id_errors = _validate_etf_id_consistency(
        etf_holdings_df, monthly_returns_df
    )

    # --- Aggregation of all errors ---
    all_errors = cusip_errors + temporal_errors + etf_id_errors

    # --- Final Verdict ---
    is_valid = not all_errors

    return is_valid, all_errors


In [None]:
# Task 6: ETF Holdings Data Cleansing

def _cleanse_holdings_identifiers(
    etf_holdings_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Standardizes string identifier fields in the ETF holdings DataFrame.

    This function performs essential cleaning on security and fund identifiers
    to ensure they are in a canonical format for reliable matching and joining.
    It applies two main operations:
    1.  Removes leading and trailing whitespace.
    2.  Converts identifiers to uppercase for case-insensitive consistency.

    Args:
        etf_holdings_df (pd.DataFrame): The ETF holdings DataFrame with raw
                                        identifier strings.

    Returns:
        pd.DataFrame: The DataFrame with cleansed and standardized identifiers.
    """
    # Create a copy to avoid modifying the original DataFrame in place.
    df = etf_holdings_df.copy()

    # Define the list of identifier columns that require standardization.
    identifier_columns: List[str] = ['cusip', 'isin', 'sedol', 'etf_id']

    # Iterate through each identifier column to apply cleansing operations.
    for col in identifier_columns:
        # Ensure the column is of string type before applying string operations.
        # This prevents errors if a column was unexpectedly numeric.
        df[col] = df[col].astype(str)

        # Chain vectorized string operations for maximum efficiency.
        # .str.strip() removes leading/trailing whitespace.
        # .str.upper() converts all characters to uppercase.
        df[col] = df[col].str.strip().str.upper()

    # Cleanse the 'etf_name' column separately, preserving title case.
    # Remove leading/trailing whitespace.
    # Replace multiple spaces with a single space for consistency.
    df['etf_name'] = df['etf_name'].str.strip().str.replace(r'\s+', ' ', regex=True)

    return df


def _normalize_holdings_weights(
    etf_holdings_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Normalizes portfolio weights for each ETF to ensure they sum exactly to 1.0.

    This function corrects for minor floating-point discrepancies in source data
    by recalculating each constituent's weight as a fraction of the portfolio's
    total weight. This is a critical step for ensuring portfolios represent
    valid probability distributions.

    Args:
        etf_holdings_df (pd.DataFrame): The ETF holdings DataFrame.
        config (Dict[str, Any]): The research configuration dictionary, used for
                                 numerical precision parameters.

    Returns:
        pd.DataFrame: The DataFrame with normalized and rounded portfolio weights.
    """
    # Create a copy to avoid modifying the original DataFrame.
    df = etf_holdings_df.copy()

    # Calculate the sum of weights for each ETF group.
    # .groupby('etf_id')['weight'] selects the weight column for each ETF.
    # .transform('sum') computes the sum for each group and broadcasts the result
    # back to a Series with the same index as the original DataFrame.
    weight_sums = df.groupby('etf_id')['weight'].transform('sum')

    # To prevent division by zero, replace any zero sums with NaN, which will
    # result in NaN weights that can be handled or flagged.
    weight_sums = weight_sums.replace(0, np.nan)

    # Normalize the weights by dividing each weight by its group's sum.
    df['weight'] = df['weight'] / weight_sums

    # Round the normalized weights to the specified precision from the config.
    # This ensures consistent numerical representation.
    precision = config['computational_config']['weight_precision']
    df['weight'] = df['weight'].round(precision)

    return df


def _aggregate_duplicate_holdings(
    etf_holdings_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Aggregates duplicate security holdings within each ETF portfolio.

    If a security (CUSIP) appears multiple times for the same ETF, this function
    consolidates these entries into a single record. It sums numerical fields
    like weights and market values, and preserves the first observed value for
    descriptive fields.

    Args:
        etf_holdings_df (pd.DataFrame): The ETF holdings DataFrame which may
                                        contain duplicate entries.

    Returns:
        pd.DataFrame: A DataFrame with unique holdings per ETF.
    """
    # Check if there are any duplicates to aggregate.
    # The composite key for a unique holding is the ETF and the security CUSIP.
    if not etf_holdings_df.duplicated(subset=['etf_id', 'cusip']).any():
        # If no duplicates exist, return the original DataFrame.
        return etf_holdings_df

    # Define the aggregation logic for each column.
    # Numerical values are summed.
    # Descriptive/identifier values take the first observed instance.
    aggregation_rules: Dict[str, str] = {
        'weight': 'sum',
        'market_value_usd': 'sum',
        'shares_held': 'sum',
        'etf_name': 'first',
        'isin': 'first',
        'sedol': 'first',
        'as_of_date': 'first'
    }

    # Group by the composite key and apply the aggregation rules.
    # .reset_index() converts the grouped keys back into columns.
    df_aggregated = etf_holdings_df.groupby(['etf_id', 'cusip'], as_index=False).agg(
        aggregation_rules
    )

    return df_aggregated


def cleanse_etf_holdings_dataframe(
    etf_holdings_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the complete cleansing of the ETF holdings DataFrame.

    This master function executes a sequential cleansing pipeline to transform
    raw (but validated) holdings data into a standardized, analytically pure
    format. The pipeline ensures that all identifiers are canonical, all
    portfolios are numerically sound, and all entries are unique.

    The cleansing pipeline proceeds in four steps:
    1.  **Identifier Standardization**: Cleans and standardizes all string-based
        security and fund identifiers (e.g., CUSIP, ISIN, ETF ID).
    2.  **Duplicate Aggregation**: Consolidates any duplicate security entries
        within each ETF portfolio, summing numerical values.
    3.  **Weight Normalization (Post-Aggregation)**: Re-normalizes portfolio
        weights after aggregation to ensure they sum precisely to 1.0. This
        step is crucial as the summation in the previous step can alter totals.
    4.  **Final Validation**: A final check ensures the cleansed data still
        adheres to the most critical constraints (e.g., weight sums).

    Args:
        etf_holdings_df (pd.DataFrame): The raw (but validated) ETF holdings
                                        DataFrame to be cleansed.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A cleansed, standardized, and analytically ready
                      DataFrame of ETF holdings.
    """
    # Input validation for the function itself.
    if not isinstance(etf_holdings_df, pd.DataFrame):
        raise TypeError("Input 'etf_holdings_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Standardize all string identifiers ---
    # This ensures that CUSIPs, ISINs, etc., are in a consistent format.
    df_cleaned = _cleanse_holdings_identifiers(etf_holdings_df)

    # --- Step 2: Aggregate any duplicate holdings ---
    # This ensures each security appears only once per ETF.
    df_cleaned = _aggregate_duplicate_holdings(df_cleaned)

    # --- Step 3: Normalize portfolio weights ---
    # This is performed *after* aggregation because summing weights can
    # change the portfolio total. This step ensures all portfolios sum to 1.0.
    df_cleaned = _normalize_holdings_weights(df_cleaned, config)

    # --- Step 4: Final Sanity Check ---
    # After all transformations, perform a final check on the weight sums.
    weight_sums = df_cleaned.groupby('etf_id')['weight'].sum()
    if not np.allclose(weight_sums, 1.0, atol=1e-6):
        raise ValueError(
            "Weights do not sum to 1.0 for all ETFs after cleansing. "
            "Please check the aggregation and normalization logic."
        )

    return df_cleaned


In [None]:
# Task 7: Bond Features Data Cleansing

def _cleanse_features_categorical(
    bond_features_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Standardizes categorical and identifier text fields in the bond features DataFrame.

    This function applies consistent formatting rules to all categorical and
    string-based identifier columns to ensure they are in a canonical format.
    This is essential for accurate grouping, joining, and feature encoding.

    - CUSIPs and Ratings: Converted to uppercase.
    - Issuer, Industry, Country: Converted to title case after stripping whitespace.

    Args:
        bond_features_df (pd.DataFrame): The bond features DataFrame with raw
                                          categorical text data.

    Returns:
        pd.DataFrame: The DataFrame with cleansed and standardized text features.
    """
    # Create a copy to avoid modifying the original DataFrame in place.
    df = bond_features_df.copy()

    # Define columns to be converted to uppercase.
    # These are typically codes or identifiers.
    upper_case_cols: List[str] = ['cusip', 'bond_rating_composite']
    for col in upper_case_cols:
        if col in df.columns:
            # Chain vectorized string operations for efficiency.
            df[col] = df[col].str.strip().str.upper()

    # Define columns to be converted to title case.
    # This is suitable for proper nouns like company or country names.
    title_case_cols: List[str] = ['issuer', 'industry', 'country_of_risk']
    for col in title_case_cols:
        if col in df.columns:
            # Chain operations: strip whitespace, replace multiple spaces, convert to title case.
            df[col] = (
                df[col].str.strip()
                .str.replace(r'\s+', ' ', regex=True)
                .str.title()
            )

    return df


def _correct_features_numerical(
    bond_features_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Validates and corrects numerical features based on business logic.

    This function applies specific, rule-based corrections to numerical columns
    to enforce logical constraints and handle invalid data points as per the
    task specification.

    - `age_days`: Enforces a minimum value of 1.
    - `coupon_frequency`: Imputes invalid values with the column's mode.
    - `coupon_rate`, `amount_issued_usd`: Rounds to specified precision.

    Args:
        bond_features_df (pd.DataFrame): The bond features DataFrame.

    Returns:
        pd.DataFrame: The DataFrame with corrected numerical features.
    """
    # Create a copy to avoid modifying the original DataFrame.
    df = bond_features_df.copy()

    # Correction 1: Enforce minimum age for 'age_days'.
    # A bond's age cannot be zero or negative. We enforce a minimum of 1 day.
    # numpy.where is a highly efficient vectorized conditional operation.
    if 'age_days' in df.columns:
        df['age_days'] = np.where(df['age_days'] <= 0, 1, df['age_days'])

    # Correction 2: Impute invalid 'coupon_frequency' values.
    if 'coupon_frequency' in df.columns:
        # Define the set of valid frequencies.
        valid_frequencies = {1, 2, 4, 12}
        # Create a boolean mask to identify rows with invalid frequencies.
        invalid_mask = ~df['coupon_frequency'].isin(valid_frequencies)

        # Proceed only if there are invalid values to correct.
        if invalid_mask.any():
            # Calculate the mode of the *valid* frequencies.
            # This is the most common payment frequency (typically 2 for semi-annual).
            valid_mode = df.loc[~invalid_mask, 'coupon_frequency'].mode()[0]

            # Use the mask with .loc to replace only the invalid values.
            df.loc[invalid_mask, 'coupon_frequency'] = valid_mode

    # Correction 3: Round numerical columns to their specified precision.
    if 'coupon_rate' in df.columns:
        df['coupon_rate'] = df['coupon_rate'].round(3)
    if 'amount_issued_usd' in df.columns:
        df['amount_issued_usd'] = df['amount_issued_usd'].round(2)

    return df


def _validate_features_targets(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> None:
    """
    Validates the integrity of the target variable columns.

    This function enforces the critical assumption from the paper that the
    dataset contains no missing values for the target variables (OAS and Yield).
    If the corresponding configuration flag is set, it will check for NaNs and
    raise a ValueError if any are found, halting the pipeline.

    Args:
        bond_features_df (pd.DataFrame): The bond features DataFrame.
        config (Dict[str, Any]): The research configuration dictionary.

    Raises:
        ValueError: If `validate_no_missing_values` is True and missing
                    values are found in the target columns.
    """
    # This check is conditional on the configuration setting.
    if config['feature_engineering']['validate_no_missing_values']:
        # Define the target variable columns.
        target_cols = ['oas_bp', 'yield_to_maturity']

        # Check for missing values in the specified target columns.
        missing_in_targets = bond_features_df[target_cols].isnull().sum()

        # Filter for columns that actually have missing values.
        affected_cols = missing_in_targets[missing_in_targets > 0]

        # If any target columns have missing values, raise a critical error.
        if not affected_cols.empty:
            raise ValueError(
                "Missing values found in target variables, violating a core "
                "assumption of the study. Please clean the source data. "
                f"Affected columns and counts: {affected_cols.to_dict()}"
            )


def cleanse_bond_features_dataframe(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the complete cleansing of the bond features DataFrame.

    This master function executes a sequential cleansing pipeline to transform
    the raw (but validated) bond features data into a pristine, standardized
    format suitable for machine learning.

    The cleansing pipeline proceeds in three steps:
    1.  **Categorical Standardization**: Cleans and standardizes all text-based
        categorical features and identifiers.
    2.  **Numerical Correction**: Applies specific, rule-based corrections to
        numerical features to enforce business logic (e.g., minimum age).
    3.  **Target Variable Validation**: Performs a final, critical check to
        ensure the target variables for the ML model are complete and contain
        no missing values, as per the paper's methodology.

    Args:
        bond_features_df (pd.DataFrame): The raw (but validated) bond features
                                          DataFrame to be cleansed.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A cleansed, standardized, and analytically ready
                      DataFrame of bond features.
    """
    # Input validation for the function itself.
    if not isinstance(bond_features_df, pd.DataFrame):
        raise TypeError("Input 'bond_features_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Standardize categorical and identifier text fields ---
    df_cleaned = _cleanse_features_categorical(bond_features_df)

    # --- Step 2: Apply rule-based corrections to numerical features ---
    df_cleaned = _correct_features_numerical(df_cleaned)

    # --- Step 3: Validate integrity of target variables (fail-fast) ---
    # This step does not return a value but will raise an error if validation fails.
    _validate_features_targets(df_cleaned, config)

    return df_cleaned


In [None]:
# Task 8: Monthly Returns Data Cleansing

def _standardize_returns_index(
    monthly_returns_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Standardizes the time series structure of the monthly returns DataFrame.

    This function ensures the DataFrame is optimally structured for time-series
    analysis by:
    1.  Converting the 'date' column to a pandas DatetimeIndex.
    2.  Sorting the DataFrame chronologically by the new index.
    3.  Verifying the integrity of the index (uniqueness and monotonicity).

    Args:
        monthly_returns_df (pd.DataFrame): The monthly returns DataFrame with
                                           a 'date' column.

    Returns:
        pd.DataFrame: The DataFrame with a clean, sorted DatetimeIndex.
    """
    # Create a copy to avoid modifying the original DataFrame.
    df = monthly_returns_df.copy()

    # Convert the 'date' column to datetime objects, if not already.
    # This is idempotent and ensures the correct dtype for indexing.
    df['date'] = pd.to_datetime(df['date'])

    # Sort the DataFrame by date to ensure chronological order.
    df = df.sort_values(by='date')

    # Set the 'date' column as the DataFrame's index.
    df = df.set_index('date')

    # Perform final integrity checks on the newly created index.
    if not df.index.is_unique:
        raise ValueError("The 'date' index contains duplicate values after sorting.")
    if not df.index.is_monotonic_increasing:
        raise ValueError("The 'date' index is not monotonically increasing.")

    return df


def _handle_missing_returns(
    monthly_returns_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Enforces the no-missing-values policy for the returns data.

    The source paper's methodology strictly requires a complete return series for
    all ETFs. This function enforces that constraint. By default, it will raise
    a ValueError if any missing values (NaNs) are found. A conditional imputation
    path is included for flexibility but is not the default behavior.

    Args:
        monthly_returns_df (pd.DataFrame): The returns DataFrame, indexed by date.

    Returns:
        pd.DataFrame: The validated DataFrame, guaranteed to have no missing values.

    Raises:
        ValueError: If any missing values are found in the return columns.
    """
    # Check for any missing values across all return columns.
    # .values.any() is a highly optimized way to check the entire DataFrame.
    if monthly_returns_df.isnull().values.any():
        # If missing values are found, raise a critical error.
        # This enforces the strict methodology of the paper.
        missing_counts = monthly_returns_df.isnull().sum()
        affected_cols = missing_counts[missing_counts > 0].to_dict()
        raise ValueError(
            "Missing values detected in the monthly returns data, which violates "
            f"the study's core requirement. Affected columns: {affected_cols}"
        )

    # If no missing values are found, return the DataFrame as is.
    return monthly_returns_df


def _format_return_values(
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Standardizes the numerical format and precision of return values.

    This function ensures all return data is numerically consistent by:
    1.  Rounding all return values to a specified number of decimal places.
    2.  Performing a final validation of the value ranges to confirm the
        decimal format.

    Args:
        monthly_returns_df (pd.DataFrame): The returns DataFrame, indexed by date.
        config (Dict[str, Any]): The research configuration dictionary.

    Returns:
        pd.DataFrame: The DataFrame with formatted return values.
    """
    # Create a copy to avoid modifying the original DataFrame.
    df = monthly_returns_df.copy()

    # Get the required precision for correlation calculations from the config.
    precision = config['computational_config']['correlation_precision']

    # Apply the rounding operation to all numerical columns.
    # The index is not affected.
    df = df.round(precision)

    # Perform a final sanity check on the magnitude of the returns.
    # This confirms that the data is in decimal format (e.g., 0.0234) and not
    # percentage format (e.g., 2.34).
    max_abs_return = df.abs().max().max()
    if max_abs_return >= 1.0:
        # A return of 100% or more in a month for a bond ETF is highly
        # improbable and likely indicates a data formatting error.
        raise ValueError(
            f"Maximum absolute return is {max_abs_return:.2%}. Data appears to be "
            "in percentage format or contains extreme outliers. Please verify."
        )

    return df


def cleanse_monthly_returns_dataframe(
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the complete cleansing of the monthly returns DataFrame.

    This master function prepares the ground-truth returns data for econometric
    analysis. It transforms the validated raw data into a pristine,
    analysis-ready time-series format.

    The cleansing pipeline proceeds in three steps:
    1.  **Index Standardization**: Converts the 'date' column into a clean,
        sorted, and unique DatetimeIndex, which is the standard for
        time-series operations in pandas.
    2.  **Missing Value Enforcement**: Strictly enforces the paper's requirement
        of no missing data by raising an error if any NaNs are present.
    3.  **Value Formatting**: Standardizes the numerical precision of all return
        values by rounding them to the specified number of decimal places.

    Args:
        monthly_returns_df (pd.DataFrame): The raw (but validated) monthly
                                           returns DataFrame.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A cleansed, indexed, and formatted DataFrame ready for
                      correlation analysis.
    """
    # Input validation for the function itself.
    if not isinstance(monthly_returns_df, pd.DataFrame):
        raise TypeError("Input 'monthly_returns_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Standardize the date index ---
    # This creates a proper time-series structure.
    df_cleaned = _standardize_returns_index(monthly_returns_df)

    # --- Step 2: Enforce the no-missing-values policy ---
    # This is a critical, non-negotiable check based on the paper's methodology.
    df_cleaned = _handle_missing_returns(df_cleaned)

    # --- Step 3: Standardize the numerical format of the return values ---
    # This ensures consistent precision for all subsequent calculations.
    df_cleaned = _format_return_values(df_cleaned, config)

    return df_cleaned


In [None]:
# Task 9: Prepare Categorical Features for One-Hot Encoding

def _get_categorical_features_to_encode(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[List[str], Dict[str, str]]:
    """
    Identifies categorical features for encoding based on the configuration.

    This function maps the conceptual feature names from the configuration to the
    actual column names in the DataFrame. It handles potential discrepancies and
    gracefully ignores configured features that are not present in the data,
    logging a warning for transparency.

    Args:
        bond_features_df (pd.DataFrame): The cleansed bond features DataFrame.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        Tuple[List[str], Dict[str, str]]: A tuple containing:
        - A list of actual column names in the DataFrame to be encoded.
        - The mapping dictionary used for the translation.
    """
    # Define the explicit mapping from conceptual config names to actual column names.
    # This provides a robust layer of abstraction.
    column_mapping: Dict[str, str] = {
        'issuer': 'issuer',
        'industry': 'industry',
        'bond_rating': 'bond_rating_composite',
        'country': 'country_of_risk',
        'flag_144a': 'rule_144a_flag',
        # The following are in the config but not in the provided data schema.
        'market': 'market',
        'currency': 'currency'
    }

    # Retrieve the list of conceptual feature names from the config.
    features_from_config = config['feature_engineering']['categorical_features_to_one_hot_encode']

    # Initialize the list to hold the actual column names found in the DataFrame.
    columns_to_encode: List[str] = []

    # Iterate through the configured features to find their corresponding columns.
    for feature_name in features_from_config:
        # Get the actual column name from the mapping.
        actual_col_name = column_mapping.get(feature_name)

        # Check if the mapped column name exists in the DataFrame.
        if actual_col_name and actual_col_name in bond_features_df.columns:
            columns_to_encode.append(actual_col_name)
        else:
            # If a configured feature is not found, log a warning.
            # This makes the pipeline robust to minor schema changes.
            logging.warning(
                f"Categorical feature '{feature_name}' from config not found "
                f"in the bond features DataFrame. It will be skipped."
            )

    return columns_to_encode, column_mapping


def _analyze_categorical_cardinality(
    bond_features_df: pd.DataFrame,
    columns_to_encode: List[str]
) -> Dict[str, int]:
    """
    Analyzes and reports the cardinality of categorical features.

    This function calculates the number of unique values for each categorical
    feature designated for encoding. This information is crucial for anticipating
    the dimensionality of the resulting feature matrix and potential memory usage.

    Args:
        bond_features_df (pd.DataFrame): The cleansed bond features DataFrame.
        columns_to_encode (List[str]): The list of column names to analyze.

    Returns:
        Dict[str, int]: A dictionary mapping each column name to its cardinality.
    """
    # Use a dictionary comprehension for a concise and efficient calculation.
    # .nunique() is an optimized pandas method for counting unique values.
    cardinality_report = {
        col: bond_features_df[col].nunique() for col in columns_to_encode
    }

    # Log a summary of the analysis for the user.
    logging.info("Categorical feature cardinality analysis:")
    for col, count in cardinality_report.items():
        logging.info(f"  - Feature '{col}': {count} unique values.")

    total_new_features = sum(cardinality_report.values())
    logging.info(
        f"Total new features to be created by one-hot encoding: {total_new_features}"
    )

    return cardinality_report


def _apply_one_hot_encoding(
    bond_features_df: pd.DataFrame,
    columns_to_encode: List[str]
) -> pd.DataFrame:
    """
    Applies one-hot encoding to the specified categorical columns.

    This function transforms categorical data into a numerical format suitable
    for machine learning models using the pandas `get_dummies` function. It is
    optimized for memory efficiency by using a uint8 data type.

    Args:
        bond_features_df (pd.DataFrame): The cleansed bond features DataFrame.
        columns_to_encode (List[str]): The list of column names to encode.

    Returns:
        pd.DataFrame: A DataFrame containing only the new one-hot encoded columns.
                      The index is preserved for later alignment.
    """
    # Apply one-hot encoding using pandas.get_dummies.
    # This function is highly optimized for this task.
    # - columns: Specifies which columns to encode.
    # - prefix/prefix_sep: Creates clear column names (e.g., 'industry_Technology').
    # - dtype=np.uint8: A critical memory optimization. Uses 1 byte per value
    #   instead of the default 8 (for int64), reducing memory usage significantly.
    encoded_df = pd.get_dummies(
        bond_features_df[columns_to_encode],
        columns=columns_to_encode,
        prefix=columns_to_encode,
        prefix_sep='_',
        dtype=np.uint8
    )

    return encoded_df


def prepare_categorical_features(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the preparation of categorical features for ML model training.

    This master function executes a complete pipeline to transform raw categorical
    data into a one-hot encoded numerical format. It ensures that the process is
    transparent, robust to minor schema changes, and memory-efficient.

    The pipeline proceeds in three steps:
    1.  **Feature Identification**: Maps conceptual features from the config to
        actual DataFrame columns, handling any discrepancies.
    2.  **Cardinality Analysis**: Reports the number of unique values per feature
        to provide insight into the resulting dimensionality.
    3.  **One-Hot Encoding**: Performs the transformation into a numerical format,
        optimized for memory usage.

    Args:
        bond_features_df (pd.DataFrame): The cleansed bond features DataFrame.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A DataFrame containing the one-hot encoded features, with
                      the original index preserved for alignment with other
                      feature sets.
    """
    # Input validation for the function itself.
    if not isinstance(bond_features_df, pd.DataFrame):
        raise TypeError("Input 'bond_features_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Identify the actual columns to be encoded based on config ---
    columns_to_encode, _ = _get_categorical_features_to_encode(
        bond_features_df, config
    )

    if not columns_to_encode:
        logging.warning("No categorical features found to encode. Returning an empty DataFrame.")
        return pd.DataFrame(index=bond_features_df.index)

    # --- Step 2: Analyze the cardinality of these features ---
    # This provides a crucial checkpoint before the memory-intensive encoding step.
    _ = _analyze_categorical_cardinality(bond_features_df, columns_to_encode)

    # --- Step 3: Apply the one-hot encoding transformation ---
    # This is the core transformation step.
    categorical_features_encoded = _apply_one_hot_encoding(
        bond_features_df, columns_to_encode
    )

    logging.info(
        f"Successfully created {categorical_features_encoded.shape[1]} "
        "one-hot encoded features."
    )

    return categorical_features_encoded


In [None]:
# Task 10: Prepare Numerical Features for Normalization

def _get_numerical_features_to_scale(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Identifies numerical features for scaling based on the configuration.

    This function maps conceptual feature names from the config to actual
    DataFrame columns and verifies that these columns are of a numeric data type,
    making them suitable for normalization.

    Args:
        bond_features_df (pd.DataFrame): The cleansed bond features DataFrame.
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        List[str]: A list of actual, validated numerical column names.
    """
    # Define the explicit mapping from conceptual config names to actual column names.
    column_mapping: Dict[str, str] = {
        'days_to_maturity': 'days_to_maturity',
        'age': 'age_days',
        'coupon': 'coupon_rate',
        'amount_issued': 'amount_issued_usd',
        'coupon_frequency': 'coupon_frequency'
    }

    # Retrieve the list of conceptual feature names from the config.
    features_from_config = config['feature_engineering']['numerical_features']

    # Initialize lists for columns to scale and those that fail validation.
    columns_to_scale: List[str] = []
    invalid_columns: List[str] = []

    # Iterate through configured features to find and validate their columns.
    for feature_name in features_from_config:
        actual_col_name = column_mapping.get(feature_name)
        if actual_col_name and actual_col_name in bond_features_df.columns:
            # Verify that the column is numeric before adding it to the list.
            if pd.api.types.is_numeric_dtype(bond_features_df[actual_col_name]):
                columns_to_scale.append(actual_col_name)
            else:
                invalid_columns.append(actual_col_name)
        else:
            logging.warning(
                f"Numerical feature '{feature_name}' from config not found "
                f"in the bond features DataFrame. It will be skipped."
            )

    # If any configured numerical columns were found to be non-numeric, raise an error.
    if invalid_columns:
        raise TypeError(
            f"The following columns configured as numerical are not of a numeric "
            f"dtype: {invalid_columns}"
        )

    return columns_to_scale


def _compute_scaling_parameters(
    numerical_features_df: pd.DataFrame
) -> pd.Series:
    """
    Computes the maximum value for each numerical feature.

    These maximum values serve as the scaling parameters for normalization. This
    function should ONLY be called on the training dataset to prevent data

    leakage from the test set.

    Args:
        numerical_features_df (pd.DataFrame): A DataFrame containing only the
                                              numerical features from the
                                              TRAINING data.

    Returns:
        pd.Series: A Series where the index is the feature name and the value
                   is the maximum value of that feature.
    """
    # Calculate the maximum value for each column. .max() is highly optimized.
    scaling_params = numerical_features_df.max()

    # Handle the edge case where a maximum value is zero to prevent division by zero.
    # Replace any zero with a small epsilon for numerical stability.
    epsilon = 1e-9
    zero_max_cols = scaling_params[scaling_params == 0].index.tolist()
    if zero_max_cols:
        logging.warning(
            f"Features {zero_max_cols} have a maximum value of 0. Replacing with "
            f"{epsilon} for stable division."
        )
        scaling_params[zero_max_cols] = epsilon

    return scaling_params


def _apply_max_scaling(
    features_df: pd.DataFrame,
    scaling_params: pd.Series
) -> pd.DataFrame:
    """
    Applies max normalization to a DataFrame of numerical features.

    This function scales each feature by dividing it by its corresponding
    maximum value, as provided by the scaling_params Series. This transforms
    the feature values to a [0, 1] range.

    Args:
        features_df (pd.DataFrame): The DataFrame of numerical features to scale
                                    (can be train or test data).
        scaling_params (pd.Series): The scaling parameters (max values) learned
                                    from the training data.

    Returns:
        pd.DataFrame: The DataFrame with scaled numerical features.
    """
    # Verify that the columns to be scaled exist in the scaling parameters.
    if not set(features_df.columns).issubset(set(scaling_params.index)):
        missing_params = set(features_df.columns) - set(scaling_params.index)
        raise ValueError(
            f"Scaling parameters are missing for the following columns: {missing_params}"
        )

    # Apply the vectorized division. Pandas aligns the operation by the
    # Series index (column names), ensuring correctness.
    # The formula being implemented is: scaled_feature = feature / max(feature)
    scaled_df = features_df / scaling_params

    # Validate that all scaled values are within the [0, 1] range.
    # Use a small tolerance to account for potential floating-point inaccuracies.
    if not (scaled_df.min().min() >= -1e-9 and scaled_df.max().max() <= 1.0 + 1e-9):
        raise ValueError("Scaled values fall outside the expected [0, 1] range.")

    return scaled_df


def prepare_numerical_features(
    bond_features_df: pd.DataFrame,
    config: Dict[str, Any],
    scaling_params: pd.Series = None
) -> Tuple[pd.DataFrame, pd.Series]:
    """
    Orchestrates the preparation of numerical features for ML model training.

    This master function executes a pipeline to identify, scale, and validate
    numerical features. It operates in two modes:
    1.  **Training Mode** (`scaling_params` is None): It computes the scaling
        parameters (max values) from the provided DataFrame and then applies
        the scaling. It returns both the scaled features and the learned parameters.
    2.  **Inference Mode** (`scaling_params` is provided): It uses the pre-computed
        scaling parameters to scale the provided DataFrame. This prevents data
        leakage and ensures consistent scaling between training and test sets.

    Args:
        bond_features_df (pd.DataFrame): The cleansed bond features DataFrame.
        config (Dict[str, Any]): The validated research configuration dictionary.
        scaling_params (pd.Series, optional): Pre-computed scaling parameters.
                                              If None, they will be computed from
                                              the data. Defaults to None.

    Returns:
        Tuple[pd.DataFrame, pd.Series]: A tuple containing:
        - The DataFrame with scaled numerical features.
        - The Series of scaling parameters used (either computed or passed in).
    """
    # Input validation for the function itself.
    if not isinstance(bond_features_df, pd.DataFrame):
        raise TypeError("Input 'bond_features_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Identify the numerical columns to be scaled ---
    columns_to_scale = _get_numerical_features_to_scale(bond_features_df, config)
    numerical_features_df = bond_features_df[columns_to_scale]

    # --- Step 2: Compute or validate scaling parameters ---
    if scaling_params is None:
        # Training Mode: Compute scaling parameters from the current data.
        logging.info("Computing scaling parameters from the provided data (Training Mode).")
        computed_scaling_params = _compute_scaling_parameters(numerical_features_df)
    else:
        # Inference Mode: Use the provided scaling parameters.
        logging.info("Using pre-computed scaling parameters (Inference Mode).")
        if not isinstance(scaling_params, pd.Series):
            raise TypeError("Input 'scaling_params' must be a pandas Series.")
        computed_scaling_params = scaling_params

    # --- Step 3: Apply the max normalization ---
    numerical_features_scaled = _apply_max_scaling(
        numerical_features_df, computed_scaling_params
    )

    logging.info(
        f"Successfully scaled {len(columns_to_scale)} numerical features."
    )

    return numerical_features_scaled, computed_scaling_params


In [None]:
# Task 11: Construct Complete Feature Matrix

def _merge_feature_sets(
    categorical_features: pd.DataFrame,
    numerical_features: pd.DataFrame
) -> pd.DataFrame:
    """
    Merges one-hot encoded categorical and scaled numerical feature sets.

    This function combines the two separately processed feature DataFrames into a
    single, wide feature matrix (X). It relies on the pandas index (which should
    be the CUSIP) to ensure perfect row-wise alignment of the features for each bond.

    Args:
        categorical_features (pd.DataFrame): DataFrame of one-hot encoded features.
        numerical_features (pd.DataFrame): DataFrame of scaled numerical features.

    Returns:
        pd.DataFrame: The complete, merged feature matrix (X).

    Raises:
        ValueError: If the indices of the input DataFrames do not match.
    """
    # Pre-computation check: Verify that the indices are identical.
    # This is a critical guardrail to prevent misalignment.
    if not categorical_features.index.equals(numerical_features.index):
        raise ValueError(
            "Indices of categorical and numerical feature DataFrames do not match. "
            "Cannot merge."
        )

    # Concatenate the two DataFrames horizontally (axis=1).
    # pandas automatically aligns the data based on the shared index.
    complete_feature_matrix = pd.concat(
        [categorical_features, numerical_features], axis=1
    )

    # Final validation of the merged matrix shape.
    expected_rows = len(categorical_features)
    expected_cols = len(categorical_features.columns) + len(numerical_features.columns)
    if complete_feature_matrix.shape != (expected_rows, expected_cols):
        raise RuntimeError("Merged feature matrix has an unexpected shape.")

    logging.info(
        f"Successfully merged feature sets into a complete matrix with shape: "
        f"{complete_feature_matrix.shape}"
    )

    return complete_feature_matrix


def _extract_and_align_targets(
    bond_features_df: pd.DataFrame,
    complete_feature_matrix: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Extracts the target variables and aligns them with the feature matrix.

    This function selects the target columns (OAS and Yield) from the original
    features DataFrame and ensures their row order perfectly matches the final,
    assembled feature matrix (X). This alignment is critical to ensure the model
    learns the correct feature-target relationships.

    Args:
        bond_features_df (pd.DataFrame): The cleansed, original bond features DataFrame.
        complete_feature_matrix (pd.DataFrame): The fully assembled feature matrix (X).
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        pd.DataFrame: The target matrix (Y), perfectly aligned with X.
    """
    # Define the mapping from conceptual target names to actual column names.
    target_mapping: Dict[str, str] = {
        'oas': 'oas_bp',
        'yield': 'yield_to_maturity'
    }

    # Get the list of actual target column names.
    target_cols = [
        target_mapping[t] for t in config['feature_engineering']['target_variables']
    ]

    # Extract the target columns from the original features DataFrame.
    target_matrix = bond_features_df[target_cols]

    # CRITICAL STEP: Align the target matrix (Y) to the feature matrix (X) index.
    # .loc[complete_feature_matrix.index] reorders the rows of the target matrix
    # to match the exact order of the feature matrix, guaranteeing perfect alignment.
    aligned_target_matrix = target_matrix.loc[complete_feature_matrix.index]

    logging.info(
        f"Successfully extracted and aligned target matrix (Y) with shape: "
        f"{aligned_target_matrix.shape}"
    )

    return aligned_target_matrix


def _perform_data_split(
    feature_matrix: pd.DataFrame,
    target_matrix: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Splits the feature and target matrices into training and testing sets.

    This function uses scikit-learn's `train_test_split` to partition the data
    according to the ratios and random state specified in the configuration.
    This ensures a reproducible split, which is fundamental for the integrity
    of the model evaluation process.

    Args:
        feature_matrix (pd.DataFrame): The complete feature matrix (X).
        target_matrix (pd.DataFrame): The complete target matrix (Y).
        config (Dict[str, Any]): The validated research configuration dictionary.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: A tuple
        containing the four resulting DataFrames in the standard order:
        (X_train, X_test, y_train, y_test).
    """
    # Retrieve splitting parameters from the configuration.
    test_size = config['feature_engineering']['testing_set_fraction']
    random_state = config['feature_engineering']['random_state_for_shuffle']
    should_shuffle = config['feature_engineering']['shuffle_before_split']

    # Perform the split using scikit-learn's train_test_split.
    # Providing both X and Y ensures the split is applied consistently to both.
    # The function returns pandas DataFrames because the inputs are DataFrames,
    # which preserves the CUSIP index for traceability.
    X_train, X_test, y_train, y_test = train_test_split(
        feature_matrix,
        target_matrix,
        test_size=test_size,
        random_state=random_state,
        shuffle=should_shuffle
    )

    logging.info("Successfully split data into training and testing sets.")
    logging.info(f"  - X_train shape: {X_train.shape}")
    logging.info(f"  - X_test shape:  {X_test.shape}")
    logging.info(f"  - y_train shape: {y_train.shape}")
    logging.info(f"  - y_test shape:  {y_test.shape}")

    return X_train, X_test, y_train, y_test


def construct_and_split_data(
    bond_features_df: pd.DataFrame,
    categorical_features_encoded: pd.DataFrame,
    numerical_features_scaled: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates the construction and splitting of the final ML datasets.

    This master function brings together all prepared feature sets, assembles
    the final feature and target matrices, and partitions them into training
    and testing sets suitable for model training and evaluation.

    The pipeline proceeds in three steps:
    1.  **Feature Merging**: Combines the one-hot encoded categorical features
        and the scaled numerical features into a single feature matrix (X).
    2.  **Target Extraction and Alignment**: Selects the target variables (OAS,
        Yield) and ensures their row order perfectly matches the feature matrix.
    3.  **Data Splitting**: Partitions the complete X and Y matrices into
        reproducible training and testing sets based on the configuration.

    Args:
        bond_features_df (pd.DataFrame): The original, cleansed bond features DataFrame.
        categorical_features_encoded (pd.DataFrame): DataFrame of one-hot encoded features.
        numerical_features_scaled (pd.DataFrame): DataFrame of scaled numerical features.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: A tuple
        containing the four final DataFrames for the ML pipeline:
        (X_train, X_test, y_train, y_test).
    """
    # Input validation for the function itself.
    if not all(isinstance(df, pd.DataFrame) for df in [bond_features_df, categorical_features_encoded, numerical_features_scaled]):
        raise TypeError("All data inputs must be pandas DataFrames.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Merge the prepared feature sets into a complete matrix (X) ---
    # The CUSIP should be the index for alignment.
    bond_features_df = bond_features_df.set_index('cusip')

    complete_feature_matrix = _merge_feature_sets(
        categorical_features_encoded, numerical_features_scaled
    )

    # --- Step 2: Extract the target variables (Y) and align them with X ---
    target_matrix = _extract_and_align_targets(
        bond_features_df, complete_feature_matrix, config
    )

    # --- Step 3: Split the complete X and Y matrices into train/test sets ---
    X_train, X_test, y_train, y_test = _perform_data_split(
        complete_feature_matrix, target_matrix, config
    )

    return X_train, X_test, y_train, y_test


In [None]:
# Task 12: Random Forest Hyperparameter Optimization

def optimize_random_forest_hyperparameters(
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Optimizes Random Forest Regressor hyperparameters using grid search.

    This function performs a systematic search for the optimal `n_estimators` and
    `max_depth` for a multi-output Random Forest model, strictly following the
    methodology outlined in the paper. It uses k-fold cross-validation on the
    training data to evaluate each combination of hyperparameters, selecting the
    combination that minimizes the Root Mean Squared Error (RMSE).

    The process is as follows:
    1.  Instantiate a base `RandomForestRegressor` with a fixed random state for
        reproducibility.
    2.  Define the parameter grid to search over, as specified in the config.
    3.  Configure and execute `GridSearchCV` with 5-fold cross-validation,
        using all available CPU cores for efficiency.
    4.  The search optimizes for the 'neg_root_mean_squared_error' scoring
        metric, which is equivalent to minimizing RMSE.
    5.  The function extracts, logs, and returns the best-performing parameters.

    Args:
        X_train (pd.DataFrame): The training feature matrix.
        y_train (pd.DataFrame): The training target matrix.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Dict[str, Any]: A dictionary containing the optimal hyperparameters
                        (e.g., {'n_estimators': 200, 'max_depth': 15}).
    """
    # --- Input Validation ---
    if not isinstance(X_train, pd.DataFrame) or not isinstance(y_train, pd.DataFrame):
        raise TypeError("X_train and y_train must be pandas DataFrames.")
    if X_train.shape[0] != y_train.shape[0]:
        raise ValueError("X_train and y_train must have the same number of rows.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    logging.info("Starting Random Forest hyperparameter optimization...")
    start_time = time.time()

    # --- Step 1: Configure the Grid Search ---
    # Retrieve the Random Forest configuration details.
    rf_config = config['random_forest_config']

    # Define the parameter grid to search. This is taken directly from the config
    # to ensure adherence to the study's methodology.
    param_grid: Dict[str, Any] = rf_config['hyperparameters_to_tune']
    logging.info(f"Parameter grid for search: {param_grid}")

    # Instantiate the base model. A fixed random_state is crucial for reproducibility
    # of the model's internal mechanisms (e.g., bootstrapping).
    estimator = RandomForestRegressor(random_state=rf_config['random_state'])

    # Instantiate the GridSearchCV object.
    # - estimator: The model to tune.
    # - param_grid: The hyperparameter space to search.
    # - cv: The number of cross-validation folds (5, as per the paper).
    # - scoring: The metric to optimize. 'neg_root_mean_squared_error' will be
    #   maximized, which is equivalent to minimizing RMSE. Scikit-learn's
    #   scorers for multi-output regressors average the scores by default.
    # - n_jobs: -1 uses all available CPU cores for parallel execution.
    # - verbose: Provides progress updates during the search.
    grid_search = GridSearchCV(
        estimator=estimator,
        param_grid=param_grid,
        cv=rf_config['cross_validation_folds'],
        scoring=rf_config['cv_scoring_metric'],
        n_jobs=rf_config['n_jobs'],
        verbose=2  # Provide more detailed output during the fit.
    )

    # --- Step 2: Execute the Grid Search on the Training Data ---
    logging.info(f"Executing {rf_config['cross_validation_folds']}-fold cross-validation grid search...")
    # The .fit() method runs the entire search process. It will train
    # (num_param_combinations * cv_folds) models in total.
    grid_search.fit(X_train, y_train)

    # --- Step 3: Extract and Report the Optimal Parameters ---
    # The best parameters are stored in the .best_params_ attribute.
    best_params: Dict[str, Any] = grid_search.best_params_
    # The best score is the average cross-validated score for the best parameters.
    # It will be negative, so we take its absolute value to get the RMSE.
    best_score_rmse = abs(grid_search.best_score_)

    end_time = time.time()
    duration = end_time - start_time

    logging.info("Hyperparameter optimization completed.")
    logging.info(f"  - Duration: {duration:.2f} seconds.")
    logging.info(f"  - Best cross-validated RMSE: {best_score_rmse:.4f}")
    logging.info(f"  - Best parameters found: {best_params}")

    # Final validation to ensure the result is a dictionary.
    if not isinstance(best_params, dict):
        raise TypeError("Grid search did not return a dictionary of best parameters.")

    return best_params


In [None]:
# Task 13: Train Final Random Forest Model

def _calculate_multi_output_metrics(
    y_true: pd.DataFrame,
    y_pred: np.ndarray,
    target_names: List[str]
) -> Tuple[float, float]:
    """
    Calculates average RMSE and MAPE for a multi-output regression model.

    Args:
        y_true (pd.DataFrame): The ground truth target values.
        y_pred (np.ndarray): The predicted target values from the model.
        target_names (List[str]): The names of the target columns.

    Returns:
        Tuple[float, float]: A tuple containing the average RMSE and average MAPE.
    """
    # Create a DataFrame for predictions to align with y_true's columns.
    y_pred_df = pd.DataFrame(y_pred, index=y_true.index, columns=y_true.columns)

    rmses, mapes = [], []

    # Calculate metrics for each target variable individually.
    for i, target in enumerate(target_names):
        # Calculate RMSE for the current target.
        # RMSE = sqrt(MSE)
        rmse = np.sqrt(mean_squared_error(y_true[target], y_pred_df[target]))
        rmses.append(rmse)

        # Calculate MAPE for the current target.
        # MAPE = mean(|(y_true - y_pred) / y_true|) * 100
        # Add a small epsilon to the denominator to avoid division by zero.
        epsilon = 1e-9
        mape = np.mean(np.abs((y_true[target] - y_pred_df[target]) / (y_true[target] + epsilon)))
        mapes.append(mape)

    # Return the average of the metrics across all target outputs.
    return np.mean(rmses), np.mean(mapes)


def train_final_model_and_evaluate(
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_test: pd.DataFrame,
    optimal_params: Dict[str, Any],
    config: Dict[str, Any]
) -> RandomForestRegressor:
    """
    Trains the final Random Forest model and evaluates its performance.

    This function takes the optimal hyperparameters, trains a new Random Forest
    model on the entire training dataset, and then evaluates its performance on
    both the training and testing sets. It validates the performance against the
    benchmarks reported in the source paper to ensure the model's fidelity.

    The process is as follows:
    1.  Instantiate `RandomForestRegressor` with the optimal parameters.
    2.  Train the model on the full `X_train` and `y_train` datasets.
    3.  Generate predictions for both training and testing sets.
    4.  Calculate multi-output RMSE and MAPE for both sets.
    5.  Compare the calculated metrics against the expected values from the
        configuration, logging warnings if deviations exceed tolerance.

    Args:
        X_train (pd.DataFrame): The training feature matrix.
        y_train (pd.DataFrame): The training target matrix.
        X_test (pd.DataFrame): The testing feature matrix.
        y_test (pd.DataFrame): The testing target matrix.
        optimal_params (Dict[str, Any]): The optimal hyperparameters from grid search.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        RandomForestRegressor: The final, trained scikit-learn model object.
    """
    # --- Input Validation ---
    if not all(isinstance(df, pd.DataFrame) for df in [X_train, y_train, X_test, y_test]):
        raise TypeError("All data inputs must be pandas DataFrames.")
    if not isinstance(optimal_params, dict):
        raise TypeError("Input 'optimal_params' must be a dictionary.")

    logging.info("Training final Random Forest model with optimal parameters...")
    start_time = time.time()

    # --- Step 1: Instantiate the final model ---
    rf_config = config['random_forest_config']
    final_model = RandomForestRegressor(
        n_estimators=optimal_params['n_estimators'],
        max_depth=optimal_params['max_depth'],
        random_state=rf_config['random_state'],
        n_jobs=rf_config['n_jobs']
    )

    # --- Step 2: Train the model on the full training set ---
    final_model.fit(X_train, y_train)

    end_time = time.time()
    logging.info(f"Model training completed in {end_time - start_time:.2f} seconds.")

    # --- Step 3: Evaluate model performance ---
    logging.info("Evaluating model performance on training and testing sets...")
    # Generate predictions for both sets.
    y_train_pred = final_model.predict(X_train)
    y_test_pred = final_model.predict(X_test)

    # Get target names for metric calculation.
    target_names = y_train.columns.tolist()

    # Calculate performance metrics for the training set.
    train_rmse, train_mape = _calculate_multi_output_metrics(y_train, y_train_pred, target_names)
    logging.info(f"  - Training Set   | RMSE: {train_rmse:.4f}, MAPE: {train_mape:.4f}")

    # Calculate performance metrics for the testing set.
    test_rmse, test_mape = _calculate_multi_output_metrics(y_test, y_test_pred, target_names)
    logging.info(f"  - Testing Set    | RMSE: {test_rmse:.4f}, MAPE: {test_mape:.4f}")

    # --- Step 4: Validate performance against paper benchmarks ---
    expected_train_rmse = rf_config['expected_training_rmse']
    expected_test_rmse = rf_config['expected_testing_rmse']

    # Check training RMSE against the expected value from the paper.
    if not np.isclose(train_rmse, expected_train_rmse, atol=0.05):
        logging.warning(
            f"Training RMSE ({train_rmse:.4f}) deviates significantly from "
            f"the paper's reported value ({expected_train_rmse:.4f})."
        )

    # Check testing RMSE against the expected value from the paper.
    if not np.isclose(test_rmse, expected_test_rmse, atol=0.05):
        logging.warning(
            f"Testing RMSE ({test_rmse:.4f}) deviates significantly from "
            f"the paper's reported value ({expected_test_rmse:.4f})."
        )

    logging.info("Performance evaluation and validation complete.")

    return final_model


In [None]:
# Task 14: Generate Proximity Matrix from Trained Random Forest

@njit(parallel=True)
def _accelerated_proximity_loop(
    leaf_nodes: np.ndarray,
    n_samples: int,
    n_trees: int
) -> np.ndarray:
    """
    Calculates the proximity matrix using a Numba-accelerated loop.

    This function iterates through each tree's leaf assignments and efficiently
    updates a co-occurrence matrix. It is designed to be memory-efficient and
    fast, leveraging Numba for just-in-time compilation and parallel execution.

    Args:
        leaf_nodes (np.ndarray): Integer matrix of shape (n_samples, n_trees)
                                 containing terminal node indices.
        n_samples (int): The number of samples (bonds).
        n_trees (int): The number of trees in the forest.

    Returns:
        np.ndarray: A square matrix of shape (n_samples, n_samples) containing
                    the count of times each pair of samples ended in the same leaf.
    """
    # Initialize the proximity count matrix with zeros.
    proximity_counts = np.zeros((n_samples, n_samples), dtype=np.int32)

    # Iterate through each tree in the forest.
    for i in range(n_trees):
        # Iterate through each pair of samples (bonds).
        for j in range(n_samples):
            for k in range(j, n_samples):
                # If two samples land in the same leaf node for the current tree...
                if leaf_nodes[j, i] == leaf_nodes[k, i]:
                    # ...increment the counter for that pair.
                    proximity_counts[j, k] += 1

    return proximity_counts


def generate_proximity_matrix(
    final_model: RandomForestRegressor,
    X_complete: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Generates the proximity matrix from the trained Random Forest model.

    This function operationalizes Equation (2) from the paper, calculating the
    similarity between every pair of bonds based on how frequently they land in
    the same terminal node across all trees in the forest.

    The process is as follows:
    1.  **Leaf Node Extraction**: Use the model's `.apply()` method to get the
        terminal node index for each bond in each tree.
    2.  **Proximity Calculation**: Employ a memory-efficient, Numba-accelerated
        algorithm to count the co-occurrences of sample pairs in the same leaves.
        This avoids the extreme memory consumption of a fully vectorized approach.
    3.  **Normalization and Optimization**: Divide the co-occurrence counts by the
        total number of trees to get the final proximity scores. The resulting
        matrix is made perfectly symmetric and memory-optimized.
    4.  **Validation**: The final matrix is validated for its essential properties
        (symmetry, diagonal of ones, value range).

    Args:
        final_model (RandomForestRegressor): The final, trained scikit-learn model.
        X_complete (pd.DataFrame): The complete feature matrix for ALL bonds
                                   (both train and test).
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: The final, validated proximity matrix as a DataFrame with
                      CUSIPs as the index and columns.
    """
    # --- Input Validation ---
    if not isinstance(final_model, RandomForestRegressor):
        raise TypeError("Input 'final_model' must be a RandomForestRegressor.")
    if not isinstance(X_complete, pd.DataFrame):
        raise TypeError("Input 'X_complete' must be a pandas DataFrame.")

    logging.info("Generating proximity matrix from the trained Random Forest...")
    start_time = time.time()

    # --- Step 1: Extract Leaf Node Assignments ---
    # The .apply() method returns an integer array of shape (n_samples, n_estimators)
    # where each element is the index of the leaf that a sample ends up in.
    logging.info("Step 1/4: Extracting terminal leaf node assignments...")
    leaf_nodes = final_model.apply(X_complete)
    n_samples, n_trees = leaf_nodes.shape
    logging.info(f"Extracted leaf nodes for {n_samples} samples across {n_trees} trees.")

    # --- Step 2: Calculate Proximity Counts using Accelerated Loop ---
    # This is a memory-efficient alternative to full vectorization.
    logging.info("Step 2/4: Calculating pairwise proximity counts (this may take time)...")
    # Call the Numba-jitted function for high-performance computation.
    proximity_counts = _accelerated_proximity_loop(leaf_nodes, n_samples, n_trees)

    # --- Step 3: Normalize and Optimize the Matrix ---
    logging.info("Step 3/4: Normalizing and optimizing the proximity matrix...")
    # The proximity is the count of co-occurrences divided by the number of trees.
    # Convert to float32 for memory efficiency (halves memory vs. float64).
    proximity_matrix = (proximity_counts / n_trees).astype(np.float32)

    # Exploit symmetry: The loop only computes the upper triangle, so we copy it
    # to the lower triangle to make the matrix fully symmetric.
    # This is faster and more robust than re-calculating.
    proximity_matrix = proximity_matrix + proximity_matrix.T - np.diag(proximity_matrix.diagonal())

    # --- Step 4: Validate the Final Matrix ---
    logging.info("Step 4/4: Validating final proximity matrix properties...")
    # Check 1: Symmetry
    if not np.allclose(proximity_matrix, proximity_matrix.T, atol=1e-6):
        raise ValueError("Proximity matrix is not symmetric.")
    # Check 2: Diagonal must be all ones.
    if not np.allclose(np.diag(proximity_matrix), 1.0, atol=1e-6):
        raise ValueError("Diagonal of the proximity matrix is not all ones.")
    # Check 3: All values must be in the [0, 1] range.
    if not (proximity_matrix.min() >= -1e-6 and proximity_matrix.max() <= 1.0 + 1e-6):
        raise ValueError("Proximity matrix contains values outside the [0, 1] range.")

    # Convert the final numpy array to a pandas DataFrame, using the CUSIPs
    # from the input DataFrame's index for both the new index and columns.
    proximity_df = pd.DataFrame(
        proximity_matrix,
        index=X_complete.index,
        columns=X_complete.index
    )

    end_time = time.time()
    logging.info(f"Proximity matrix generation completed in {end_time - start_time:.2f} seconds.")
    logging.info(f"Final matrix shape: {proximity_df.shape}")

    return proximity_df


In [None]:
# Task 15: Extract Portfolio-Level Data for Similarity Analysis

# Define a type hint for the complex nested dictionary structure for clarity.
PortfolioData = Dict[str, Dict[str, Union[List[str], np.ndarray]]]


def extract_portfolio_level_data(
    etf_holdings_df: pd.DataFrame,
    proximity_matrix: pd.DataFrame
) -> PortfolioData:
    """
    Extracts and structures portfolio data for similarity analysis.

    This function transforms the flat holdings DataFrame into a nested dictionary,
    which serves as a highly efficient, analysis-ready data structure for the
    subsequent similarity algorithms. For each ETF, it compiles its constituent
    CUSIPs, their corresponding weights, and their integer indices within the
    proximity matrix.

    The process is as follows:
    1.  **Create CUSIP-to-Index Map**: A mapping is built from the CUSIPs in the
        proximity matrix's index to their integer positions (0, 1, 2, ...).
        This allows for O(1) lookups.
    2.  **Group by ETF**: The holdings DataFrame is grouped by 'etf_id' to
        process each portfolio individually.
    3.  **Extract and Translate**: For each ETF, the function extracts its list
        of CUSIPs and weights. It then uses the map to translate the CUSIPs
        into their corresponding integer indices.
    4.  **Assemble Structure**: The extracted CUSIPs, weights (as a NumPy array),
        and indices (as a NumPy array) are stored in a nested dictionary keyed
        by the ETF identifier.
    5.  **Validation**: A final check ensures the weights for each extracted
        portfolio sum to approximately 1.0.

    Args:
        etf_holdings_df (pd.DataFrame): The cleansed and validated DataFrame of
                                        ETF holdings.
        proximity_matrix (pd.DataFrame): The final, validated proximity matrix
                                         with CUSIPs as its index and columns.

    Returns:
        PortfolioData: A nested dictionary where each key is an ETF ID and each
                       value is a dictionary containing the portfolio's 'cusips',
                       'weights', and 'indices'.
    """
    # --- Input Validation ---
    if not isinstance(etf_holdings_df, pd.DataFrame):
        raise TypeError("Input 'etf_holdings_df' must be a pandas DataFrame.")
    if not isinstance(proximity_matrix, pd.DataFrame):
        raise TypeError("Input 'proximity_matrix' must be a pandas DataFrame.")
    if not proximity_matrix.index.equals(proximity_matrix.columns):
        raise ValueError("Proximity matrix must be square with identical index and columns.")

    logging.info("Extracting and structuring portfolio-level data...")

    # --- Step 1: Create the CUSIP-to-Index Mapping ---
    # This dictionary provides an O(1) lookup for converting a CUSIP string
    # to its integer row/column index in the proximity matrix.
    cusip_to_idx: Dict[str, int] = {
        cusip: i for i, cusip in enumerate(proximity_matrix.index)
    }

    # Initialize the main dictionary to store the structured portfolio data.
    portfolios: PortfolioData = {}

    # --- Step 2: Group by ETF and Process Each Portfolio ---
    # .groupby() is a highly efficient way to iterate over each ETF's holdings.
    for etf_id, group in etf_holdings_df.groupby('etf_id'):
        # Ensure the group is sorted by CUSIP for deterministic ordering, although
        # the algorithms do not strictly require it.
        group = group.sort_values(by='cusip').reset_index(drop=True)

        # --- Step 3: Extract CUSIPs, Weights, and Translate to Indices ---
        # Extract the list of CUSIPs for the current ETF.
        cusips: List[str] = group['cusip'].tolist()
        # Extract the corresponding weights as a NumPy array for efficient math.
        weights: np.ndarray = group['weight'].to_numpy(dtype=np.float64)

        try:
            # Translate the list of CUSIP strings into a NumPy array of integers.
            # This pre-computation is a critical performance optimization.
            indices: np.ndarray = np.array(
                [cusip_to_idx[c] for c in cusips], dtype=np.int32
            )
        except KeyError as e:
            # This error should not be reached if cross-dataset validation passed,
            # but it serves as a final, robust guardrail.
            raise KeyError(
                f"CUSIP {e} from portfolio '{etf_id}' not found in the "
                "proximity matrix. Data is inconsistent."
            ) from e

        # --- Step 4: Final Validation for the current portfolio ---
        # Verify that the weights for this portfolio sum to 1.0.
        if not np.isclose(weights.sum(), 1.0, atol=1e-6):
            logging.warning(
                f"Weights for ETF '{etf_id}' sum to {weights.sum():.8f}, which is "
                "not 1.0. This may indicate an issue in the cleansing process."
            )

        # --- Step 5: Assemble the final data structure for this ETF ---
        portfolios[etf_id] = {
            'cusips': cusips,
            'weights': weights,
            'indices': indices
        }

    logging.info(f"Successfully processed {len(portfolios)} portfolios.")

    # Final validation of the output structure.
    if len(portfolios) != etf_holdings_df['etf_id'].nunique():
        raise RuntimeError("The number of processed portfolios does not match the number of unique ETFs.")

    return portfolios


In [None]:
# Task 16: Implement STRAPSim Algorithm

# Define type hints for clarity
Portfolio = Dict[str, Union[List[str], np.ndarray]]
STRAPSimResult = Tuple[float, float, float]


def compute_strapsim_between_pair(
    portfolio_x: Portfolio,
    portfolio_y: Portfolio,
    proximity_matrix: np.ndarray,
    config: Dict[str, Any]
) -> STRAPSimResult:
    """
    Computes the STRAPSim score between a single pair of portfolios.

    This function is the core implementation of the STRAPSim algorithm as
    described in Section 2.1 and Equations (1a) and (1b) of the paper. It
    quantifies the similarity between two weighted sets (portfolios) through an
    iterative, greedy matching process that is both weight- and residual-aware.

    The algorithm proceeds as follows:
    1.  **Initialization**: Copies of the portfolio weight vectors are created to
        track residual (unmatched) weights. The total similarity score is set to 0.
    2.  **Candidate Generation**: All possible pairwise similarity scores between
        constituents of the two portfolios are extracted from the global
        proximity matrix.
    3.  **Prioritization**: These candidate matches are sorted in descending order
        of their similarity score. This pre-sorting step is a crucial
        performance optimization that implements the argmax logic of the greedy
        approach efficiently.
    4.  **Greedy Matching Loop**: The function iterates through the prioritized list
        of matches. For each potential match:
        a. It determines the amount of weight that can be transferred, which is
           the minimum of the two constituents' current residual weights.
        b. The similarity score is incremented by this transfer weight multiplied
           by the pair's similarity.
        c. The residual weights of both constituents are decremented by the
           transfer weight, fulfilling the "residual-aware" property.
    5.  **Finalization**: After iterating through all possible matches, the final
        similarity score and the remaining residual weights for each portfolio
        are calculated and returned.

    Args:
        portfolio_x (Portfolio): The structured data for the first portfolio.
        portfolio_y (Portfolio): The structured data for the second portfolio.
        proximity_matrix (np.ndarray): The global (N x N) matrix of constituent
                                       similarities.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        STRAPSimResult: A tuple containing:
        - The final STRAPSim similarity score.
        - The total residual (unmatched) weight from portfolio X.
        - The total residual (unmatched) weight from portfolio Y.
    """
    # --- Step 1: Initialization ---
    # Retrieve algorithm parameters from the configuration.
    strapsim_config = config['strapsim_algorithm']
    weight_tol = strapsim_config['weight_precision_tolerance']

    # Create copies of the weight vectors to serve as residual weights.
    # This is crucial to avoid modifying the original portfolio data.
    residual_weights_x = portfolio_x['weights'].copy()
    residual_weights_y = portfolio_y['weights'].copy()

    # Initialize the cumulative similarity score.
    similarity_score = 0.0

    # Extract the integer indices for the constituents of each portfolio.
    indices_x = portfolio_x['indices']
    indices_y = portfolio_y['indices']

    # --- Step 2: Candidate Generation and Prioritization ---
    # Create a grid of all pairwise index combinations between the two portfolios.
    # This is a highly efficient way to get all pairs for slicing.
    idx_pairs = np.ix_(indices_x, indices_y)

    # Slice the global proximity matrix to get the sub-matrix of relevant similarities.
    sub_matrix = proximity_matrix[idx_pairs]

    # Create a list of all candidate matches with their similarity scores and
    # their *local* indices within the sub-matrix.
    candidate_matches = [
        (sub_matrix[i, j], i, j)
        for i in range(len(indices_x))
        for j in range(len(indices_y))
    ]

    # Sort the candidates in descending order of similarity. This is the core
    # of the greedy approach, ensuring we always process the best match first.
    candidate_matches.sort(key=lambda item: item[0], reverse=True)

    # --- Step 3: Greedy Matching Loop ---
    # Iterate through the prioritized list of all possible matches.
    for sim, local_idx_x, local_idx_y in candidate_matches:
        # Get the current residual weights for the constituents in this pair.
        weight_x = residual_weights_x[local_idx_x]
        weight_y = residual_weights_y[local_idx_y]

        # If either constituent has already been fully matched, skip this pair.
        # We use a tolerance for robust floating-point comparison.
        if weight_x < weight_tol or weight_y < weight_tol:
            continue

        # Determine the amount of weight to transfer: the minimum available.
        # This is the core of Equation (1a): min(w_x(i), w_y(j))
        transfer_weight = min(weight_x, weight_y)

        # Increment the total similarity score.
        # This is the core of Equation (1a): score += S_ij * min(...)
        similarity_score += sim * transfer_weight

        # Decrement the residual weights of both constituents.
        # This is the "residual-aware" update from Equation (1b).
        residual_weights_x[local_idx_x] -= transfer_weight
        residual_weights_y[local_idx_y] -= transfer_weight

    # --- Step 4: Finalization ---
    # Calculate the total remaining (unmatched) weight for each portfolio.
    residual_x = np.sum(residual_weights_x)
    residual_y = np.sum(residual_weights_y)

    # Optional: Perform a weight conservation check for debugging and validation.
    if strapsim_config['weight_conservation_validation']:
        total_initial_weight = np.sum(portfolio_x['weights']) + np.sum(portfolio_y['weights'])
        total_final_value = similarity_score + residual_x + residual_y
        # Note: The sum is not 2.0 because similarity_score is not a weight.
        # The correct check is that the sum of residual weights equals the sum of initial weights
        # minus the total weight that was "consumed" and converted into the score.
        # A simpler check is not readily available, but we can ensure residuals are non-negative.
        if residual_x < -weight_tol or residual_y < -weight_tol:
            raise ValueError("Weight conservation violated: residual weights are negative.")

    # Round the final score to the specified precision.
    precision = strapsim_config['precision_digits']

    return round(similarity_score, precision), residual_x, residual_y


In [None]:
# Task 17: Compute STRAPSim for All Portfolio Pairs

# Define type hints from previous tasks for clarity.
Portfolio = Dict[str, Union[List[str], np.ndarray]]
PortfolioData = Dict[str, Portfolio]
STRAPSimResult = Tuple[float, float, float]

def compute_all_strapsim_pairs(
    portfolios: PortfolioData,
    proximity_matrix: np.ndarray,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates the computation of STRAPSim scores for all unique portfolio pairs.

    This function systematically calculates the similarity between every pair of
    ETFs in the provided portfolio data structure. It is designed for efficiency
    by avoiding redundant calculations and provides clear progress monitoring.

    The process is as follows:
    1.  **Initialization**: Creates empty matrices for similarity scores and
        residuals. It also establishes a deterministic mapping from ETF IDs to
        matrix indices.
    2.  **Diagonal Population**: Sets the diagonal of the similarity matrix to 1.0
        (self-similarity) and the residual matrix to 0.0.
    3.  **Pairwise Computation**: Iterates through all unique pairs of ETFs using
        `itertools.combinations`. For each pair, it calls the core
        `compute_strapsim_between_pair` function.
    4.  **Symmetric Storage**: Stores the result for each pair `(i, j)` in both
        `matrix[i, j]` and `matrix[j, i]` to ensure the final matrices are symmetric.
    5.  **Finalization**: Converts the completed NumPy arrays into well-labeled
        pandas DataFrames, with ETF IDs as both the index and columns.

    Args:
        portfolios (PortfolioData): The nested dictionary of structured portfolio data.
        proximity_matrix (np.ndarray): The global (N x N) matrix of constituent
                                       similarities.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing:
        - A square DataFrame of pairwise STRAPSim similarity scores.
        - A square DataFrame of pairwise total residual weights.
    """
    # --- Input Validation ---
    if not isinstance(portfolios, dict):
        raise TypeError("Input 'portfolios' must be a dictionary.")
    if not isinstance(proximity_matrix, np.ndarray):
        raise TypeError("Input 'proximity_matrix' must be a numpy array.")

    logging.info("Starting computation of STRAPSim scores for all portfolio pairs...")
    start_time = time.time()

    # --- Step 1: Initialization ---
    # Get a sorted list of ETF IDs to ensure deterministic matrix order.
    etf_ids: List[str] = sorted(portfolios.keys())
    n_etfs = len(etf_ids)

    # Create a mapping from ETF ID to its integer index in the matrices.
    etf_to_matrix_idx: Dict[str, int] = {etf_id: i for i, etf_id in enumerate(etf_ids)}

    # Initialize empty NumPy arrays to store the results.
    # Using np.nan helps identify any pairs that were missed by the loop.
    similarity_matrix = np.full((n_etfs, n_etfs), np.nan, dtype=np.float64)
    residual_matrix = np.full((n_etfs, n_etfs), np.nan, dtype=np.float64)

    # --- Step 2: Diagonal Population ---
    # A portfolio is perfectly similar to itself with zero residual.
    np.fill_diagonal(similarity_matrix, 1.0)
    np.fill_diagonal(residual_matrix, 0.0)

    # --- Step 3: Pairwise Computation ---
    # Generate all unique pairs of ETF IDs to compute.
    etf_pairs = list(itertools.combinations(etf_ids, 2))

    logging.info(f"Calculating STRAPSim for {len(etf_pairs)} unique ETF pairs...")

    # Use tqdm to create a progress bar for the loop.
    for etf_id_x, etf_id_y in tqdm(etf_pairs, desc="Computing STRAPSim Pairs"):
        # Retrieve the structured data for the two portfolios.
        portfolio_x = portfolios[etf_id_x]
        portfolio_y = portfolios[etf_id_y]

        # Call the core algorithm to compute the similarity.
        score, residual_x, residual_y = compute_strapsim_between_pair(
            portfolio_x, portfolio_y, proximity_matrix, config
        )

        # Get the integer indices for the matrix.
        idx_x = etf_to_matrix_idx[etf_id_x]
        idx_y = etf_to_matrix_idx[etf_id_y]

        # --- Step 4: Symmetric Storage ---
        # Store the score in both (i, j) and (j, i) positions.
        similarity_matrix[idx_x, idx_y] = score
        similarity_matrix[idx_y, idx_x] = score

        # Store the total residual weight.
        total_residual = residual_x + residual_y
        residual_matrix[idx_x, idx_y] = total_residual
        residual_matrix[idx_y, idx_x] = total_residual

    # --- Final Validation ---
    if np.isnan(similarity_matrix).any() or np.isnan(residual_matrix).any():
        raise RuntimeError("Calculation failed: Not all pairs were computed.")

    # --- Step 5: Finalization ---
    # Convert the NumPy arrays to labeled pandas DataFrames.
    similarity_df = pd.DataFrame(
        similarity_matrix, index=etf_ids, columns=etf_ids
    )
    residual_df = pd.DataFrame(
        residual_matrix, index=etf_ids, columns=etf_ids
    )

    end_time = time.time()
    logging.info(f"All-pairs STRAPSim computation completed in {end_time - start_time:.2f} seconds.")

    return similarity_df, residual_df


In [None]:
# Task 18: Compute Jaccard Index Similarity

# Define type hints from previous tasks for clarity.
Portfolio = Dict[str, Union[List[str], np.ndarray]]
PortfolioData = Dict[str, Portfolio]


def compute_jaccard_matrix(
    portfolios: PortfolioData,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Computes the Jaccard Index similarity for all unique portfolio pairs.

    This function implements the standard Jaccard Index, a baseline similarity
    metric defined in Equation (3) of the paper. It measures similarity as the
    ratio of the size of the intersection to the size of the union of the two
    sets of constituents. This metric ignores both portfolio weights and any
    notion of similarity between non-identical constituents.

    The process is as follows:
    1.  **Initialization**: Creates empty matrices for Jaccard scores and their
        residuals. Establishes a deterministic mapping from ETF IDs to matrix indices.
    2.  **Pairwise Computation**: Iterates through all unique pairs of ETFs. For
        each pair:
        a. Converts the lists of constituent CUSIPs into sets.
        b. Uses efficient set operations to find the intersection and union.
        c. Calculates the Jaccard Index: |Intersection| / |Union|.
    3.  **Symmetric Storage**: Stores the result for each pair `(i, j)` in both
        `matrix[i, j]` and `matrix[j, i]`.
    4.  **Finalization**: Converts the completed NumPy arrays into well-labeled
        pandas DataFrames.

    Args:
        portfolios (PortfolioData): The nested dictionary of structured portfolio data.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing:
        - A square DataFrame of pairwise Jaccard Index similarity scores.
        - A square DataFrame of the corresponding Jaccard residuals (1 - score).
    """
    # --- Input Validation ---
    if not isinstance(portfolios, dict):
        raise TypeError("Input 'portfolios' must be a dictionary.")

    logging.info("Starting computation of Jaccard Index for all portfolio pairs...")
    start_time = time.time()

    # --- Step 1: Initialization ---
    # Get a sorted list of ETF IDs for deterministic matrix order.
    etf_ids: List[str] = sorted(portfolios.keys())
    n_etfs = len(etf_ids)

    # Create a mapping from ETF ID to its integer index in the matrices.
    etf_to_matrix_idx: Dict[str, int] = {etf_id: i for i, etf_id in enumerate(etf_ids)}

    # Initialize empty NumPy arrays for the results.
    jaccard_matrix = np.full((n_etfs, n_etfs), np.nan, dtype=np.float64)
    residual_matrix = np.full((n_etfs, n_etfs), np.nan, dtype=np.float64)

    # Set the diagonal to 1.0 for similarity and 0.0 for residual.
    np.fill_diagonal(jaccard_matrix, 1.0)
    np.fill_diagonal(residual_matrix, 0.0)

    # --- Step 2: Pairwise Computation ---
    # Generate all unique pairs of ETF IDs.
    etf_pairs = list(itertools.combinations(etf_ids, 2))

    logging.info(f"Calculating Jaccard Index for {len(etf_pairs)} unique ETF pairs...")

    # Use tqdm for a progress bar.
    for etf_id_x, etf_id_y in tqdm(etf_pairs, desc="Computing Jaccard Pairs"):
        # Retrieve the constituent CUSIP lists for the two portfolios.
        cusips_x: Set[str] = set(portfolios[etf_id_x]['cusips'])
        cusips_y: Set[str] = set(portfolios[etf_id_y]['cusips'])

        # Use Python's highly optimized set operations.
        # Intersection: elements common to both sets.
        intersection_size = len(cusips_x.intersection(cusips_y))
        # Union: all unique elements from both sets.
        union_size = len(cusips_x.union(cusips_y))

        # Calculate the Jaccard Index.
        # J(X, Y) = |X intersect Y| / |X union Y|
        if union_size == 0:
            # Edge case: If both sets are empty, their union is empty.
            # They are identical, so their similarity is 1.0.
            jaccard_score = 1.0
        else:
            jaccard_score = intersection_size / union_size

        # The residual is simply 1 minus the score.
        residual = 1.0 - jaccard_score

        # Get the integer indices for matrix storage.
        idx_x = etf_to_matrix_idx[etf_id_x]
        idx_y = etf_to_matrix_idx[etf_id_y]

        # --- Step 3: Symmetric Storage ---
        # Populate the matrices symmetrically.
        jaccard_matrix[idx_x, idx_y] = jaccard_score
        jaccard_matrix[idx_y, idx_x] = jaccard_score
        residual_matrix[idx_x, idx_y] = residual
        residual_matrix[idx_y, idx_x] = residual

    # --- Final Validation ---
    if np.isnan(jaccard_matrix).any():
        raise RuntimeError("Jaccard matrix calculation failed: Not all pairs were computed.")

    # --- Step 4: Finalization ---
    # Convert the NumPy arrays to labeled pandas DataFrames.
    jaccard_df = pd.DataFrame(
        jaccard_matrix, index=etf_ids, columns=etf_ids
    )
    residual_df = pd.DataFrame(
        residual_matrix, index=etf_ids, columns=etf_ids
    )

    end_time = time.time()
    logging.info(f"All-pairs Jaccard Index computation completed in {end_time - start_time:.2f} seconds.")

    return jaccard_df, residual_df


In [None]:
# Task 19: Compute Weighted Jaccard Similarity

# Define type hints from previous tasks for clarity.
Portfolio = Dict[str, Union[List[str], np.ndarray]]
PortfolioData = Dict[str, Portfolio]


def compute_weighted_jaccard_matrix(
    portfolios: PortfolioData,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Computes the Weighted Jaccard similarity for all unique portfolio pairs.

    This function implements the Weighted Jaccard Index, a baseline similarity
    metric defined in Equation (4) of the paper. It extends the standard Jaccard
    Index by incorporating the weights of the constituents.

    The formula is:
    J_w(X, Y) = sum(min(w_x(k), w_y(k))) for k in intersection(X, Y) /
                sum(max(w_x(k), w_y(k))) for k in union(X, Y)

    The process is as follows:
    1.  **Initialization**: Creates empty matrices for scores and residuals.
    2.  **Pairwise Computation**: For each unique pair of ETFs:
        a. Creates efficient CUSIP-to-weight lookup dictionaries.
        b. Determines the intersection and union of the constituent sets.
        c. Calculates the weighted intersection (numerator) by summing the
           minimum weight for each common constituent.
        d. Calculates the weighted union (denominator) by summing the
           maximum weight for each constituent present in either portfolio.
        e. Computes the final score as the ratio of the two sums.
    3.  **Symmetric Storage and Finalization**: Stores results symmetrically and
        converts the final matrices into labeled pandas DataFrames.

    Args:
        portfolios (PortfolioData): The nested dictionary of structured portfolio data.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: A tuple containing:
        - A square DataFrame of pairwise Weighted Jaccard similarity scores.
        - A square DataFrame of the corresponding residuals (1 - score).
    """
    # --- Input Validation ---
    if not isinstance(portfolios, dict):
        raise TypeError("Input 'portfolios' must be a dictionary.")

    logging.info("Starting computation of Weighted Jaccard Index for all portfolio pairs...")
    start_time = time.time()

    # --- Step 1: Initialization ---
    etf_ids: List[str] = sorted(portfolios.keys())
    n_etfs = len(etf_ids)
    etf_to_matrix_idx: Dict[str, int] = {etf_id: i for i, etf_id in enumerate(etf_ids)}

    wj_matrix = np.full((n_etfs, n_etfs), np.nan, dtype=np.float64)
    residual_matrix = np.full((n_etfs, n_etfs), np.nan, dtype=np.float64)

    np.fill_diagonal(wj_matrix, 1.0)
    np.fill_diagonal(residual_matrix, 0.0)

    # --- Step 2: Pairwise Computation ---
    etf_pairs = list(itertools.combinations(etf_ids, 2))

    logging.info(f"Calculating Weighted Jaccard for {len(etf_pairs)} unique ETF pairs...")

    for etf_id_x, etf_id_y in tqdm(etf_pairs, desc="Computing Weighted Jaccard Pairs"):
        # Create efficient CUSIP-to-weight lookup dictionaries for the pair.
        weights_x: Dict[str, float] = dict(zip(portfolios[etf_id_x]['cusips'], portfolios[etf_id_x]['weights']))
        weights_y: Dict[str, float] = dict(zip(portfolios[etf_id_y]['cusips'], portfolios[etf_id_y]['weights']))

        # Determine the intersection and union of the constituent CUSIPs.
        cusips_x: Set[str] = set(weights_x.keys())
        cusips_y: Set[str] = set(weights_y.keys())
        intersection_cusips = cusips_x.intersection(cusips_y)
        union_cusips = cusips_x.union(cusips_y)

        # Calculate the numerator: the sum of minimum weights over the intersection.
        # This is the weighted size of the intersection.
        weighted_intersection = sum(min(weights_x[c], weights_y[c]) for c in intersection_cusips)

        # Calculate the denominator: the sum of maximum weights over the union.
        # This is the weighted size of the union. The .get(c, 0.0) method
        # elegantly handles cases where a CUSIP exists in only one portfolio.
        weighted_union = sum(max(weights_x.get(c, 0.0), weights_y.get(c, 0.0)) for c in union_cusips)

        # Calculate the Weighted Jaccard score.
        if weighted_union == 0:
            # Edge case: If both portfolios are empty, their similarity is 1.0.
            wj_score = 1.0
        else:
            wj_score = weighted_intersection / weighted_union

        residual = 1.0 - wj_score

        # --- Step 3: Symmetric Storage ---
        idx_x = etf_to_matrix_idx[etf_id_x]
        idx_y = etf_to_matrix_idx[etf_id_y]
        wj_matrix[idx_x, idx_y] = wj_score
        wj_matrix[idx_y, idx_x] = wj_score
        residual_matrix[idx_x, idx_y] = residual
        residual_matrix[idx_y, idx_x] = residual

    # --- Final Validation ---
    if np.isnan(wj_matrix).any():
        raise RuntimeError("Weighted Jaccard matrix calculation failed: Not all pairs were computed.")

    # --- Step 4: Finalization ---
    wj_df = pd.DataFrame(
        wj_matrix, index=etf_ids, columns=etf_ids
    )
    residual_df = pd.DataFrame(
        residual_matrix, index=etf_ids, columns=etf_ids
    )

    end_time = time.time()
    logging.info(f"All-pairs Weighted Jaccard computation completed in {end_time - start_time:.2f} seconds.")

    return wj_df, residual_df


In [None]:
# Task 20: Compute Adapted BERTScore Similarity

# Define type hints from previous tasks for clarity.
Portfolio = Dict[str, Union[List[str], np.ndarray]]
PortfolioData = Dict[str, Portfolio]
BERTScoreFullResult = Tuple[float, float, float, float, float, float]


def _compute_bertscore_for_pair(
    portfolio_x: Portfolio,
    portfolio_y: Portfolio,
    proximity_matrix: np.ndarray
) -> BERTScoreFullResult:
    """
    Computes the adapted BERTScore and its residuals for a single pair.

    This remediated function provides a complete implementation of the adapted
    BERTScore metric as described in Section 2.4 of the paper, including both
    the primary scores (Equations 6a, 6b, 6c) and the residual scores
    (Equations 7a, 7b, 7c).

    Primary Scores (Vectorized Calculation):
    - Recall (X -> Y): Weighted average of the best-match similarity for each item in X.
    - Precision (Y -> X): Weighted average of the best-match similarity for each item in Y.
    - F1 Score: The harmonic mean of Precision and Recall.

    Residual Scores (Iterative Calculation):
    - Recall Residual: Calculates the total weight remaining in X after each of its
      constituents is matched once to its best partner in Y.
    - Precision Residual: The symmetric calculation for portfolio Y.
    - F1 Residual: The harmonic mean of the two residual scores.

    Args:
        portfolio_x (Portfolio): The structured data for the reference portfolio (X).
        portfolio_y (Portfolio): The structured data for the candidate portfolio (Y).
        proximity_matrix (np.ndarray): The global (N x N) matrix of similarities.

    Returns:
        BERTScoreFullResult: A tuple containing six scores:
        (Recall, Precision, F1, Recall_Residual, Precision_Residual, F1_Residual).
    """
    # --- Initialization ---
    indices_x = portfolio_x['indices']
    indices_y = portfolio_y['indices']
    weights_x = portfolio_x['weights']
    weights_y = portfolio_y['weights']

    # Handle edge case of empty portfolios.
    if len(indices_x) == 0 or len(indices_y) == 0:
        is_identical = len(indices_x) == len(indices_y)
        score = 1.0 if is_identical else 0.0
        residual = 0.0 if is_identical else 1.0
        return (score, score, score, residual, residual, residual)

    # Slice the proximity matrix to get the relevant sub-matrix.
    sub_matrix = proximity_matrix[np.ix_(indices_x, indices_y)]

    # --- Primary Score Calculation (Vectorized) ---
    # Recall (Equation 6a): Weighted average of max similarities for each item in X.
    max_sim_for_x = sub_matrix.max(axis=1)
    recall = np.dot(weights_x, max_sim_for_x)

    # Precision (Equation 6b): Weighted average of max similarities for each item in Y.
    max_sim_for_y = sub_matrix.max(axis=0)
    precision = np.dot(weights_y, max_sim_for_y)

    # F1 Score (Equation 6c): Harmonic mean of Precision and Recall.
    if (precision + recall) == 0:
        f1_score = 0.0
    else:
        f1_score = 2 * (precision * recall) / (precision + recall)

    # --- Residual Score Calculation (Iterative) ---
    # Recall Residual (Equation 7a)
    residual_weights_x = weights_x.copy()
    best_match_indices_for_x = np.argmax(sub_matrix, axis=1)
    for i in range(len(indices_x)):
        best_match_j = best_match_indices_for_x[i]
        # Per the paper's formula, the weight consumed is min(w_x, w_y_match).
        transfer_weight = min(weights_x[i], weights_y[best_match_j])
        residual_weights_x[i] -= transfer_weight
    # The residual score is 1 - (total remaining weight / total initial weight).
    # Since total initial weight is 1.0, this simplifies to 1 - sum(residuals).
    recall_residual = 1.0 - np.sum(np.maximum(0, residual_weights_x))

    # Precision Residual (Equation 7b)
    residual_weights_y = weights_y.copy()
    best_match_indices_for_y = np.argmax(sub_matrix, axis=0)
    for j in range(len(indices_y)):
        best_match_i = best_match_indices_for_y[j]
        transfer_weight = min(weights_y[j], weights_x[best_match_i])
        residual_weights_y[j] -= transfer_weight
    precision_residual = 1.0 - np.sum(np.maximum(0, residual_weights_y))

    # F1 Residual (Equation 7c)
    if (precision_residual + recall_residual) == 0:
        f1_residual = 0.0
    else:
        f1_residual = 2 * (precision_residual * recall_residual) / (precision_residual + recall_residual)

    return recall, precision, f1_score, recall_residual, precision_residual, f1_residual



def compute_bertscore_matrix(
    portfolios: PortfolioData,
    proximity_matrix: np.ndarray,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates the computation of adapted BERTScore and residuals for all pairs.

    This remediated function systematically calculates the full suite of six
    BERTScore-related metrics (Recall, Precision, F1, and their corresponding
    residuals) for every unique pair of ETFs. It serves as the main entry point
    for generating all BERTScore baseline similarity and residual matrices.

    Args:
        portfolios (PortfolioData): The nested dictionary of structured portfolio data.
        proximity_matrix (np.ndarray): The global (N x N) matrix of similarities.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        Tuple[pd.DataFrame, ...]: A tuple containing the six square DataFrames:
        (Recall_Scores, Precision_Scores, F1_Scores, Recall_Residuals,
         Precision_Residuals, F1_Residuals).
    """
    # --- Input Validation ---
    if not isinstance(portfolios, dict):
        raise TypeError("Input 'portfolios' must be a dictionary.")
    if not isinstance(proximity_matrix, np.ndarray):
        raise TypeError("Input 'proximity_matrix' must be a numpy array.")

    logging.info("Starting computation of adapted BERTScore and residuals for all portfolio pairs...")
    start_time = time.time()

    # --- Initialization ---
    etf_ids: List[str] = sorted(portfolios.keys())
    n_etfs = len(etf_ids)
    etf_to_matrix_idx: Dict[str, int] = {etf_id: i for i, etf_id in enumerate(etf_ids)}

    # Initialize six empty matrices for all scores and residuals.
    matrices = {
        "recall": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "precision": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "f1": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "recall_residual": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "precision_residual": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "f1_residual": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
    }

    # Set diagonals: 1.0 for scores, 0.0 for residuals.
    for name, matrix in matrices.items():
        fill_value = 1.0 if "residual" not in name else 0.0
        np.fill_diagonal(matrix, fill_value)

    # --- Pairwise Computation ---
    etf_pairs = list(itertools.combinations(etf_ids, 2))
    logging.info(f"Calculating BERTScore suite for {len(etf_pairs)} unique ETF pairs...")

    for etf_id_x, etf_id_y in tqdm(etf_pairs, desc="Computing BERTScore Suite"):
        portfolio_x = portfolios[etf_id_x]
        portfolio_y = portfolios[etf_id_y]

        # Unpack all six results from the remediated helper function.
        r_xy, p_xy, f1_xy, rr_xy, pr_xy, fr_xy = _compute_bertscore_for_pair(
            portfolio_x, portfolio_y, proximity_matrix
        )

        # Compute in the reverse direction to get asymmetric scores.
        r_yx, p_yx, f1_yx, rr_yx, pr_yx, fr_yx = _compute_bertscore_for_pair(
            portfolio_y, portfolio_x, proximity_matrix
        )

        # Get matrix indices.
        idx_x = etf_to_matrix_idx[etf_id_x]
        idx_y = etf_to_matrix_idx[etf_id_y]

        # Store directional scores (Recall and Precision + their residuals).
        matrices["recall"][idx_x, idx_y], matrices["recall"][idx_y, idx_x] = r_xy, r_yx
        matrices["precision"][idx_x, idx_y], matrices["precision"][idx_y, idx_x] = p_xy, p_yx
        matrices["recall_residual"][idx_x, idx_y], matrices["recall_residual"][idx_y, idx_x] = rr_xy, rr_yx
        matrices["precision_residual"][idx_x, idx_y], matrices["precision_residual"][idx_y, idx_x] = pr_xy, pr_yx

        # Store symmetric scores (F1 and its residual). Average for robustness.
        f1_avg = (f1_xy + f1_yx) / 2.0
        matrices["f1"][idx_x, idx_y] = matrices["f1"][idx_y, idx_x] = f1_avg
        fr_avg = (fr_xy + fr_yx) / 2.0
        matrices["f1_residual"][idx_x, idx_y] = matrices["f1_residual"][idx_y, idx_x] = fr_avg

    # --- Finalization ---
    # Convert all matrices to labeled DataFrames.
    dataframes = {
        name: pd.DataFrame(matrix, index=etf_ids, columns=etf_ids)
        for name, matrix in matrices.items()
    }

    end_time = time.time()
    logging.info(f"All-pairs BERTScore suite computation completed in {end_time - start_time:.2f} seconds.")

    return (
        dataframes["recall"],
        dataframes["precision"],
        dataframes["f1"],
        dataframes["recall_residual"],
        dataframes["precision_residual"],
        dataframes["f1_residual"],
    )


In [None]:
# Task 21: Compute Return Correlation Matrix

def compute_return_correlation_matrix(
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Computes the pairwise return correlation matrix for all ETFs.

    This function calculates the Pearson correlation coefficient for every pair of
    ETF return series in the input DataFrame. This resulting matrix serves as the
    "ground truth" or "calibration benchmark" for economic similarity, against
    which all model-based similarity metrics will be evaluated, as described in
    Section 4.3 of the paper.

    The process is as follows:
    1.  **Input Validation**: Ensures the input is a properly indexed time-series
        DataFrame with no missing values.
    2.  **Correlation Calculation**: Leverages the highly optimized `.corr()` method
        of pandas DataFrames to compute the entire pairwise correlation matrix in a
        single operation.
    3.  **Output Validation**: Performs a series of rigorous checks on the resulting
        matrix to ensure its mathematical properties are correct (e.g., symmetry,
        diagonal of ones, values in [-1, 1]).

    Args:
        monthly_returns_df (pd.DataFrame): The cleansed, date-indexed DataFrame
                                           of monthly ETF returns.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A square, symmetric DataFrame containing the pairwise
                      Pearson correlation coefficients.

    Raises:
        TypeError: If the input is not a pandas DataFrame.
        ValueError: If the input DataFrame has issues like a non-datetime index,
                    missing values, or if the resulting correlation matrix fails
                    validation checks.
    """
    # --- Step 1: Input Validation ---
    if not isinstance(monthly_returns_df, pd.DataFrame):
        raise TypeError("Input 'monthly_returns_df' must be a pandas DataFrame.")
    if not isinstance(monthly_returns_df.index, pd.DatetimeIndex):
        raise ValueError("Input DataFrame must have a DatetimeIndex.")
    if monthly_returns_df.isnull().values.any():
        # This check is critical as correlation is sensitive to missing data.
        raise ValueError("Input DataFrame contains missing values.")

    logging.info("Computing pairwise return correlation matrix...")
    start_time = time.time()

    # --- Step 2: Correlation Calculation ---
    # Retrieve the specified correlation method from the configuration.
    method = config['evaluation_config']['return_correlation_method']
    if method != 'pearson':
        logging.warning(f"Configuration specifies correlation method '{method}', not 'pearson'.")

    # Use the built-in .corr() method, which is highly optimized and numerically stable.
    # It computes the correlation between all pairs of columns.
    correlation_matrix = monthly_returns_df.corr(method=method)

    # --- Step 3: Output Validation ---
    # Check for any NaN values, which can occur if a column has zero variance.
    if correlation_matrix.isnull().values.any():
        nan_cols = correlation_matrix.columns[correlation_matrix.isnull().any()].tolist()
        raise ValueError(
            f"Correlation matrix contains NaN values. This may be caused by "
            f"zero-variance return series in columns: {nan_cols}"
        )

    # Check 1: The matrix must be square.
    if correlation_matrix.shape[0] != correlation_matrix.shape[1]:
        raise ValueError("Resulting correlation matrix is not square.")

    # Check 2: The matrix must be symmetric.
    if not np.allclose(correlation_matrix.values, correlation_matrix.values.T, atol=1e-9):
        raise ValueError("Resulting correlation matrix is not symmetric.")

    # Check 3: The diagonal elements must all be 1.0.
    if not np.allclose(np.diag(correlation_matrix.values), 1.0, atol=1e-9):
        raise ValueError("The diagonal of the correlation matrix is not all ones.")

    # Check 4: All values must be within the valid range [-1, 1].
    if not ((correlation_matrix.values >= -1.0 - 1e-9) & (correlation_matrix.values <= 1.0 + 1e-9)).all():
        raise ValueError("Correlation matrix contains values outside the [-1, 1] range.")

    end_time = time.time()
    logging.info(f"Return correlation matrix computation completed in {end_time - start_time:.2f} seconds.")
    logging.info(f"Final matrix shape: {correlation_matrix.shape}")

    return correlation_matrix


In [None]:
# Task 22: Prepare Data for Spearman Correlation Analysis

def prepare_data_for_spearman_analysis(
    similarity_matrices: Dict[str, pd.DataFrame],
    correlation_matrix: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Prepares and ranks data for Spearman correlation analysis.

    This function transforms the wide-format similarity and correlation matrices
    into a single, long-format ("tidy") DataFrame. It then calculates the ranks
    of similarity scores and correlation values for each reference ETF, which is
    the necessary input for the Spearman rank correlation test.

    The process is as follows:
    1.  **Unpivot Matrices**: Each similarity matrix and the correlation matrix are
        transformed from a square (N x N) format into a long format with columns
        for the reference ETF, comparison ETF, and the corresponding value.
        Self-comparisons are excluded.
    2.  **Combine Data**: The unpivoted data from all similarity methods are
        concatenated into a single DataFrame.
    3.  **Merge Ground Truth**: This combined DataFrame is merged with the unpivoted
        correlation data, aligning the ground-truth correlation value with each
        pair's similarity score.
    4.  **Calculate Ranks**: For each reference ETF and each similarity method, the
        function calculates the descending rank of both the similarity scores and
        the correlation values. Ties are handled by assigning the average rank.

    Args:
        similarity_matrices (Dict[str, pd.DataFrame]): A dictionary where keys are
            the names of similarity methods (e.g., 'STRAPSim') and values are
            the corresponding N x N similarity matrices.
        correlation_matrix (pd.DataFrame): The N x N matrix of pairwise return
                                            correlations (the ground truth).
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A tidy DataFrame containing the original values and the
                      calculated ranks, ready for statistical testing.
    """
    # --- Input Validation ---
    if not isinstance(similarity_matrices, dict) or not all(isinstance(df, pd.DataFrame) for df in similarity_matrices.values()):
        raise TypeError("Input 'similarity_matrices' must be a dictionary of pandas DataFrames.")
    if not isinstance(correlation_matrix, pd.DataFrame):
        raise TypeError("Input 'correlation_matrix' must be a pandas DataFrame.")

    logging.info("Preparing and ranking data for Spearman correlation analysis...")

    # --- Step 1: Unpivot All Matrices ---
    all_long_dfs: List[pd.DataFrame] = []

    # Unpivot the ground-truth correlation matrix first.
    # .stack() is an efficient way to unpivot, creating a Series.
    corr_long = correlation_matrix.stack().reset_index()
    corr_long.columns = ['reference_etf', 'comparison_etf', 'correlation_value']

    # Exclude self-comparisons.
    corr_long = corr_long[corr_long['reference_etf'] != corr_long['comparison_etf']]

    # Unpivot each similarity matrix.
    for method_name, sim_matrix in similarity_matrices.items():
        sim_long = sim_matrix.stack().reset_index()
        sim_long.columns = ['reference_etf', 'comparison_etf', 'similarity_score']
        sim_long['method'] = method_name
        all_long_dfs.append(sim_long)

    # --- Step 2: Combine Similarity Data ---
    # Concatenate all the long-format similarity DataFrames.
    combined_sim_df = pd.concat(all_long_dfs, ignore_index=True)

    # Exclude self-comparisons from the similarity data as well.
    combined_sim_df = combined_sim_df[combined_sim_df['reference_etf'] != combined_sim_df['comparison_etf']]

    # --- Step 3: Merge Ground Truth ---
    # Merge the similarity data with the correlation data on the ETF pair keys.
    # This aligns the similarity score from each method with the ground-truth correlation.
    analysis_df = pd.merge(
        combined_sim_df,
        corr_long,
        on=['reference_etf', 'comparison_etf']
    )

    # --- Step 4: Calculate Ranks ---
    # Retrieve ranking parameters from the configuration.
    ranking_method = config['evaluation_config']['handle_ranking_ties']
    is_descending = config['evaluation_config']['ranking_method'] == 'descending'

    # Calculate ranks within each group of (reference_etf, method).
    # .groupby() partitions the data for each reference ETF's ranking task.
    # .rank() is then applied to each partition.
    # 'ascending=False' means a higher score/correlation gets a better (lower) rank.
    analysis_df['similarity_rank'] = analysis_df.groupby(['reference_etf', 'method'])['similarity_score'].rank(
        method=ranking_method, ascending=not is_descending
    )

    # The correlation ranks need to be calculated for each reference ETF as well.
    # We group only by 'reference_etf' here, as the correlation is the same for all methods.
    analysis_df['correlation_rank'] = analysis_df.groupby('reference_etf')['correlation_value'].rank(
        method=ranking_method, ascending=not is_descending
    )

    logging.info("Data preparation and ranking complete.")
    logging.info(f"Created tidy analysis DataFrame with shape: {analysis_df.shape}")

    # Final validation of the output.
    expected_rows = len(similarity_matrices) * (len(correlation_matrix) * (len(correlation_matrix) - 1))
    if analysis_df.shape[0] != expected_rows:
        raise ValueError("The final analysis DataFrame has an unexpected number of rows.")
    if analysis_df.isnull().values.any():
        raise ValueError("The final analysis DataFrame contains unexpected null values.")

    return analysis_df


In [None]:
# Task 23: Compute Spearman Rank Correlation

def _calculate_spearman_for_group(
    group: pd.DataFrame
) -> pd.Series:
    """
    Helper function to calculate Spearman correlation for a single group.

    This function is designed to be used with `groupby().apply()`. It takes a
    sub-DataFrame corresponding to one reference ETF and one similarity method,
    and computes the Spearman rank correlation and its p-value between the
    similarity ranks and the ground-truth correlation ranks.

    Args:
        group (pd.DataFrame): A sub-DataFrame containing 'similarity_rank' and
                              'correlation_rank' columns for a single test case.

    Returns:
        pd.Series: A Series containing the calculated 'correlation' and 'p_value'.
    """
    # Use scipy.stats.spearmanr to compute the correlation and p-value.
    # This is the industry-standard function for this statistical test.
    # It correctly handles the calculation based on the provided ranks.
    correlation, p_value = spearmanr(
        group['similarity_rank'],
        group['correlation_rank']
    )

    # Return the results as a pandas Series with named items.
    # This allows .apply() to elegantly construct a results DataFrame.
    return pd.Series({'correlation': correlation, 'p_value': p_value})


def compute_spearman_rank_correlation(
    analysis_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Computes the Spearman rank correlation for each method and reference ETF.

    This function serves as the primary evaluation engine of the study. It takes
    the tidy, ranked data and systematically performs a Spearman rank correlation
    test for each similarity method against the ground-truth return correlations,
    from the perspective of each individual reference ETF.

    The process is as follows:
    1.  **Group Data**: The input DataFrame is grouped by `['reference_etf', 'method']`,
        creating a distinct experimental unit for each test.
    2.  **Apply Statistical Test**: The `scipy.stats.spearmanr` function is applied
        to the rank columns of each group to compute the correlation coefficient
        (rho) and the two-tailed p-value.
    3.  **Determine Significance**: Based on the computed p-values, boolean flags
        are added to indicate statistical significance at the alpha levels
        specified in the configuration (e.g., 5% and 10%).
    4.  **Format Output**: The results are compiled into a clean, multi-indexed
        DataFrame for subsequent aggregation and analysis.

    Args:
        analysis_df (pd.DataFrame): The tidy DataFrame from the ranking preparation step.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A DataFrame summarizing the results of the Spearman tests,
                      with one row per reference_etf-method combination.
    """
    # --- Input Validation ---
    if not isinstance(analysis_df, pd.DataFrame):
        raise TypeError("Input 'analysis_df' must be a pandas DataFrame.")
    required_cols = {'reference_etf', 'method', 'similarity_rank', 'correlation_rank'}
    if not required_cols.issubset(analysis_df.columns):
        raise ValueError(f"Input DataFrame is missing required columns. Expected: {required_cols}")

    logging.info("Computing Spearman rank correlations for all methods...")

    # --- Step 1 & 2: Group Data and Apply Statistical Test ---
    # Group by the experimental unit: each reference ETF and each method.
    # .apply() passes each resulting sub-DataFrame to our helper function.
    # The results are automatically combined into a new DataFrame.
    results_df = analysis_df.groupby(['reference_etf', 'method']).apply(
        _calculate_spearman_for_group
    )

    # --- Step 3: Determine Statistical Significance ---
    # Retrieve the significance levels from the configuration.
    alpha_levels = config['evaluation_config']['significance_levels']

    # For each specified alpha level, create a boolean flag column.
    # The column name is dynamically created (e.g., 'significant_at_5pct').
    for alpha in alpha_levels:
        col_name = f"significant_at_{int(alpha*100)}pct"
        results_df[col_name] = results_df['p_value'] < alpha

    # Reset the index to turn the multi-index ('reference_etf', 'method')
    # back into columns for a standard flat DataFrame structure.
    results_df = results_df.reset_index()

    logging.info("Spearman rank correlation computation complete.")
    logging.info(f"Generated results for {len(results_df)} test cases.")

    # --- Final Validation ---
    expected_rows = analysis_df['reference_etf'].nunique() * analysis_df['method'].nunique()
    if len(results_df) != expected_rows:
        raise ValueError("The results DataFrame has an unexpected number of rows.")
    if results_df['correlation'].isnull().any() or results_df['p_value'].isnull().any():
        raise ValueError("The results DataFrame contains unexpected null values.")

    return results_df


In [None]:
# Task 24: Aggregate Statistics Across ETFs

def aggregate_spearman_results(
    spearman_results_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Aggregates Spearman correlation results to produce the final summary table.

    This function takes the detailed, per-ETF test results and computes summary
    statistics for each similarity method, replicating Table 4 from the paper.
    It provides the ultimate quantitative comparison of STRAPSim against the
    baseline methods.

    The process is as follows:
    1.  **Group by Method**: The detailed results are grouped by the 'method' column.
    2.  **Aggregate Statistics**: For each method, the function calculates:
        - The average Spearman correlation coefficient.
        - The average p-value.
        - The percentage of tests that were statistically significant at each
          alpha level specified in the configuration (e.g., 5% and 10%).
    3.  **Format Table**: The resulting summary data is formatted into a
        presentation-quality DataFrame with column names and ordering that
        exactly match Table 4 from the paper.

    Args:
        spearman_results_df (pd.DataFrame): The DataFrame of detailed Spearman
                                            test results from the previous task.
        config (Dict[str, Any]): The validated global configuration dictionary.

    Returns:
        pd.DataFrame: A summary DataFrame presenting the aggregated performance
                      of each similarity method.
    """
    # --- Input Validation ---
    if not isinstance(spearman_results_df, pd.DataFrame):
        raise TypeError("Input 'spearman_results_df' must be a pandas DataFrame.")
    required_cols = {'method', 'correlation', 'p_value'}
    if not required_cols.issubset(spearman_results_df.columns):
        raise ValueError(f"Input DataFrame is missing required columns. Expected at least: {required_cols}")

    logging.info("Aggregating Spearman correlation results across all ETFs...")

    # --- Step 1 & 2: Group by Method and Aggregate Statistics ---
    # Define the aggregation operations in a dictionary.
    # This is a clean and efficient way to compute multiple statistics at once.
    agg_functions = {
        'correlation': 'mean',
        'p_value': 'mean'
    }

    # Add aggregation rules for the significance percentage columns.
    # The mean of a boolean column gives the proportion of True values.
    for alpha in config['evaluation_config']['significance_levels']:
        col_name = f"significant_at_{int(alpha*100)}pct"
        if col_name in spearman_results_df.columns:
            agg_functions[col_name] = 'mean'

    # Perform the groupby and aggregation.
    summary_df = spearman_results_df.groupby('method').agg(agg_functions)

    # --- Step 3: Format the Output Table ---
    # Convert the significance proportions to percentages.
    for alpha in config['evaluation_config']['significance_levels']:
        col_name = f"significant_at_{int(alpha*100)}pct"
        if col_name in summary_df.columns:
            summary_df[col_name] = summary_df[col_name] * 100

    # Rename columns to exactly match Table 4 from the paper.
    rename_map = {
        'correlation': 'Average Coefficient',
        'p_value': 'Average P-value',
        'significant_at_5pct': '% of significant samples (α=5%)',
        'significant_at_10pct': '% of significant samples (α=10%)'
    }
    summary_df = summary_df.rename(columns=rename_map)

    # Define the desired order of methods (rows) for the final table.
    # Assuming the 'method' column in the input contains these exact names.
    method_order = ['Jaccard', 'weighted Jaccard', 'BertScore', 'STRAPSim']
    # Filter and reorder the summary DataFrame to match the paper's presentation.
    # .reindex() is a robust way to enforce a specific order.
    summary_df = summary_df.reindex(method_order)

    # Apply final formatting for presentation.
    # Define precision for each column.
    formatting_map = {
        'Average Coefficient': '{:.4f}',
        'Average P-value': '{:.4f}',
        '% of significant samples (α=5%)': '{:.0f}',
        '% of significant samples (α=10%)': '{:.0f}'
    }
    formatted_summary_df = summary_df.style.format(formatting_map)

    logging.info("Successfully aggregated results into summary table.")
    # Display the formatted table in the log for immediate review.
    logging.info(f"\n--- Final Results Summary (Table 4 Replication) ---\n{formatted_summary_df.to_string()}")

    # Return the raw (unformatted) DataFrame for potential downstream use.
    return summary_df


In [None]:
# Task 25: Design End-to-End Pipeline Orchestrator

def run_strapsim_pipeline(
    etf_holdings_df: pd.DataFrame,
    bond_features_df: pd.DataFrame,
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the end-to-end STRAPSim research pipeline.

    This master function executes the entire research workflow as described in the
    paper, from initial data validation to the final results aggregation. It serves
    as a single entry point to reproduce the study's findings, managing the
    sequential execution of dozens of modular, single-responsibility functions.

    The pipeline is organized into the following major phases:
    1.  **Validation**: Rigorously validates the configuration and all raw input
        DataFrames. The pipeline halts immediately if any validation fails.
    2.  **Cleansing**: Standardizes and cleans the validated data to prepare it
        for analysis.
    3.  **Feature Engineering**: Constructs the feature and target matrices for the
        Random Forest model, including one-hot encoding and normalization.
    4.  **Model Training**: Optimizes hyperparameters via cross-validation, trains
        the final model, and validates its performance against paper benchmarks.
    5.  **Similarity Computation**: Generates the core proximity matrix and then
        computes all similarity matrices (STRAPSim and baselines).
    6.  **Evaluation**: Computes the ground-truth return correlation matrix and
        evaluates the performance of each similarity metric by calculating the
        Spearman rank correlation, producing a final summary table.

    Args:
        etf_holdings_df (pd.DataFrame): The raw DataFrame of ETF holdings.
        bond_features_df (pd.DataFrame): The raw DataFrame of bond features.
        monthly_returns_df (pd.DataFrame): The raw DataFrame of monthly returns.
        config (Dict[str, Any]): The global configuration dictionary for the study.

    Returns:
        Dict[str, Any]: A dictionary containing the key artifacts of the study,
                        including the final results summary table, all computed
                        similarity matrices, and the trained model.
    """
    logging.info("--- Starting STRAPSim End-to-End Research Pipeline ---")

    # A dictionary to store all major artifacts produced by the pipeline.
    results_artifacts: Dict[str, Any] = {}

    try:
        # === PHASE 1: INPUT VALIDATION ===
        logging.info("[PHASE 1/6] Validating all inputs...")
        is_valid, errors = validate_configuration(config)
        if not is_valid: raise ValueError(f"Configuration is invalid: {errors}")

        is_valid, errors = validate_etf_holdings_dataframe(etf_holdings_df, config)
        if not is_valid: raise ValueError(f"ETF holdings DataFrame is invalid: {errors}")

        # NOTE: Assuming bond_features_df has been prepared with an ordered categorical for ratings.
        # In a real pipeline, this would be another pre-processing step.
        is_valid, errors = validate_bond_features_dataframe(bond_features_df, config)
        if not is_valid: raise ValueError(f"Bond features DataFrame is invalid: {errors}")

        is_valid, errors = validate_monthly_returns_dataframe(monthly_returns_df, etf_holdings_df, config)
        if not is_valid: raise ValueError(f"Monthly returns DataFrame is invalid: {errors}")

        is_valid, errors = validate_cross_dataset_consistency(etf_holdings_df, bond_features_df, monthly_returns_df, config)
        if not is_valid: raise ValueError(f"Cross-dataset consistency check failed: {errors}")
        logging.info("All inputs validated successfully.")

        # === PHASE 2: DATA CLEANSING ===
        logging.info("[PHASE 2/6] Cleansing and standardizing data...")
        holdings_clean = cleanse_etf_holdings_dataframe(etf_holdings_df, config)
        features_clean = cleanse_bond_features_dataframe(bond_features_df, config)
        returns_clean = cleanse_monthly_returns_dataframe(monthly_returns_df, config)
        logging.info("All data cleansed successfully.")

        # === PHASE 3: FEATURE ENGINEERING ===
        logging.info("[PHASE 3/6] Engineering features for Random Forest model...")
        categorical_encoded = prepare_categorical_features(features_clean.set_index('cusip'), config)
        numerical_scaled, scaling_params = prepare_numerical_features(features_clean.set_index('cusip'), config)

        X_train, X_test, y_train, y_test = construct_and_split_data(
            features_clean, categorical_encoded, numerical_scaled, config
        )
        # The complete feature matrix for proximity calculation
        X_complete = pd.concat([X_train, X_test]).sort_index()
        logging.info("Feature engineering and data splitting complete.")

        # === PHASE 4: MODEL TRAINING & PROXIMITY MATRIX GENERATION ===
        logging.info("[PHASE 4/6] Training model and generating proximity matrix...")
        optimal_params = optimize_random_forest_hyperparameters(X_train, y_train, config)

        final_model = train_final_model_and_evaluate(
            X_train, y_train, X_test, y_test, optimal_params, config
        )
        results_artifacts['trained_model'] = final_model

        proximity_matrix_df = generate_proximity_matrix(final_model, X_complete, config)
        results_artifacts['proximity_matrix'] = proximity_matrix_df
        logging.info("Model training and proximity matrix generation complete.")

        # === PHASE 5: SIMILARITY & GROUND-TRUTH COMPUTATION ===
        logging.info("[PHASE 5/6] Computing all similarity matrices...")
        portfolio_data = extract_portfolio_level_data(holdings_clean, proximity_matrix_df)

        # Store proximity matrix as a numpy array for performance
        proximity_matrix_np = proximity_matrix_df.to_numpy()

        # Compute STRAPSim
        strapsim_df, _ = compute_all_strapsim_pairs(portfolio_data, proximity_matrix_np, config)

        # Compute Baselines
        jaccard_df, _ = compute_jaccard_matrix(portfolio_data, config)
        wj_df, _ = compute_weighted_jaccard_matrix(portfolio_data, config)
        _, _, f1_df, _, _, _ = compute_bertscore_matrix(portfolio_data, proximity_matrix_np, config)

        # Compute Ground Truth
        correlation_df = compute_return_correlation_matrix(returns_clean, config)

        similarity_matrices = {
            'STRAPSim': strapsim_df,
            'Jaccard': jaccard_df,
            'weighted Jaccard': wj_df,
            'BertScore': f1_df # Using F1 as the primary BertScore metric
        }
        results_artifacts['similarity_matrices'] = similarity_matrices
        results_artifacts['correlation_matrix'] = correlation_df
        logging.info("All similarity matrices computed successfully.")

        # === PHASE 6: FINAL EVALUATION ===
        logging.info("[PHASE 6/6] Performing final evaluation...")
        analysis_df = prepare_data_for_spearman_analysis(similarity_matrices, correlation_df, config)

        spearman_results_df = compute_spearman_rank_correlation(analysis_df, config)

        final_summary_table = aggregate_spearman_results(spearman_results_df, config)
        results_artifacts['final_summary_table'] = final_summary_table
        logging.info("Evaluation complete.")

    except (ValueError, TypeError, KeyError, RuntimeError) as e:
        # Catch any errors during the pipeline, log them, and re-raise.
        logging.error(f"--- PIPELINE FAILED ---")
        logging.error(f"An error occurred during the pipeline execution: {e}")
        # Re-raise the exception to halt execution and provide a traceback.
        raise

    logging.info("--- STRAPSim End-to-End Research Pipeline Completed Successfully ---")

    # Return all the key results and artifacts.
    return results_artifacts


### Extended Pipeline

In [None]:
# Task 26: Random Forest Hyperparameter Sensitivity Analysis

def _run_single_sensitivity_scenario(
    scenario_params: Dict[str, Any],
    base_config: Dict[str, Any],
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_test: pd.DataFrame,
    X_complete: pd.DataFrame,
    portfolio_data: Dict, # Using generic Dict for brevity, should be PortfolioData
    correlation_df: pd.DataFrame
) -> Tuple[float, float]:
    """
    Executes a single run of the core analysis pipeline for a given scenario.

    This helper function takes a specific set of hyperparameters, trains a model,
    generates the proximity and STRAPSim matrices, and evaluates the final
    Spearman correlation. It is a streamlined version of the main pipeline,
    bypassing the hyperparameter search.

    Args:
        scenario_params (Dict[str, Any]): The hyperparameters for this specific run.
        base_config (Dict[str, Any]): The base configuration dictionary.
        (and all necessary data artifacts: X_train, y_train, etc.)

    Returns:
        Tuple[float, float]: A tuple containing the test RMSE of the model and
                             the final average Spearman correlation for STRAPSim.
    """
    # Create a deep copy of the config to avoid modifying the original.
    scenario_config = copy.deepcopy(base_config)

    # --- Train Model with Scenario Parameters ---
    # Instantiate and train the model directly with the scenario's hyperparameters.
    model = RandomForestRegressor(
        n_estimators=scenario_params['n_estimators'],
        max_depth=scenario_params.get('max_depth'), # .get() handles None for max_depth
        random_state=scenario_config['random_forest_config']['random_state'],
        n_jobs=scenario_config['random_forest_config']['n_jobs']
    )
    model.fit(X_train, y_train)

    # --- Evaluate Model Performance (RMSE) ---
    y_test_pred = model.predict(X_test)
    test_rmse, _ = _calculate_multi_output_metrics(y_test, y_test_pred, y_test.columns.tolist())

    # --- Generate Proximity and STRAPSim Matrices ---
    proximity_matrix_df = generate_proximity_matrix(model, X_complete, scenario_config)
    proximity_matrix_np = proximity_matrix_df.to_numpy()

    strapsim_df, _ = compute_all_strapsim_pairs(
        portfolio_data, proximity_matrix_np, scenario_config
    )

    # --- Evaluate Final Spearman Correlation ---
    # We only need to evaluate the STRAPSim method for this analysis.
    analysis_df = prepare_data_for_spearman_analysis(
        {'STRAPSim': strapsim_df}, correlation_df, scenario_config
    )
    spearman_results_df = compute_spearman_rank_correlation(analysis_df, scenario_config)

    # The result is the average correlation for the STRAPSim method.
    avg_spearman_corr = spearman_results_df['correlation'].mean()

    return test_rmse, avg_spearman_corr


def run_hyperparameter_sensitivity_analysis(
    # This function requires all the clean data and artifacts as input.
    # In a class-based implementation, these would be attributes.
    # Here, we pass them as arguments for a functional approach.
    base_config: Dict[str, Any],
    optimal_params: Dict[str, Any],
    X_train: pd.DataFrame,
    y_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_test: pd.DataFrame,
    portfolio_data: Dict, # PortfolioData
    correlation_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Orchestrates a sensitivity analysis on Random Forest hyperparameters.

    This function systematically evaluates the robustness of the study's main
    conclusion (the performance of STRAPSim) to changes in the underlying
    Random Forest model's hyperparameters (`n_estimators` and `max_depth`).

    The process is as follows:
    1.  **Define Scenarios**: Creates a list of alternative hyperparameter
        configurations, varying one parameter at a time while keeping the other
        at its optimal value.
    2.  **Iterate and Execute**: For each scenario, it runs a streamlined version
        of the analysis pipeline to compute the key performance indicators:
        the model's test RMSE and the final average Spearman correlation for STRAPSim.
    3.  **Aggregate and Report**: Compiles the results from all scenarios into a
        single summary DataFrame, allowing for a clear analysis of how the final
        outcome changes with the model's configuration.

    Args:
        base_config (Dict[str, Any]): The validated global configuration dictionary.
        optimal_params (Dict[str, Any]): The optimal parameters found by GridSearchCV.
        (and all necessary data artifacts from the main pipeline)

    Returns:
        pd.DataFrame: A summary DataFrame showing the test RMSE and average
                      Spearman correlation for each tested hyperparameter scenario.
    """
    logging.info("--- Starting Hyperparameter Sensitivity Analysis ---")

    # --- Step 1: Define Scenarios ---
    scenarios: List[Dict[str, Any]] = []
    # Scenario Group 1: Vary n_estimators, keep max_depth optimal.
    n_estimators_range = [50, 100, 200, 300, 500, 1000]
    for n in n_estimators_range:
        scenarios.append({'n_estimators': n, 'max_depth': optimal_params['max_depth']})

    # Scenario Group 2: Vary max_depth, keep n_estimators optimal.
    max_depth_range = [5, 10, 15, 20, 25, None]
    for d in max_depth_range:
        scenarios.append({'n_estimators': optimal_params['n_estimators'], 'max_depth': d})

    # Ensure the optimal parameters are included for a baseline comparison.
    if optimal_params not in scenarios:
        scenarios.insert(0, optimal_params)

    # Remove duplicate scenarios if any exist.
    unique_scenarios = [dict(t) for t in {tuple(d.items()) for d in scenarios}]
    logging.info(f"Defined {len(unique_scenarios)} unique scenarios for sensitivity analysis.")

    # --- Step 2: Iterate and Execute ---
    results: List[Dict[str, Any]] = []

    # Recreate the complete feature matrix needed for proximity calculation.
    X_complete = pd.concat([X_train, X_test]).sort_index()

    for i, params in enumerate(unique_scenarios):
        logging.info(f"--- Running Scenario {i+1}/{len(unique_scenarios)}: {params} ---")
        try:
            test_rmse, avg_spearman_corr = _run_single_sensitivity_scenario(
                scenario_params=params,
                base_config=base_config,
                X_train=X_train, y_train=y_train,
                X_test=X_test, y_test=y_test,
                X_complete=X_complete,
                portfolio_data=portfolio_data,
                correlation_df=correlation_df
            )

            # Store the results for this scenario.
            results.append({
                'n_estimators': params['n_estimators'],
                'max_depth': params.get('max_depth'),
                'test_rmse': test_rmse,
                'avg_spearman_corr': avg_spearman_corr
            })
        except Exception as e:
            logging.error(f"Scenario {params} failed with error: {e}")
            results.append({
                'n_estimators': params['n_estimators'],
                'max_depth': params.get('max_depth'),
                'test_rmse': np.nan,
                'avg_spearman_corr': np.nan
            })

    # --- Step 3: Aggregate and Report ---
    results_df = pd.DataFrame(results).sort_values(by=['n_estimators', 'max_depth']).reset_index(drop=True)

    logging.info("--- Hyperparameter Sensitivity Analysis Complete ---")
    logging.info(f"\n--- Sensitivity Analysis Results ---\n{results_df.to_string()}")

    return results_df


In [None]:
# Task 27: Data Split Sensitivity Analysis

def _run_single_split_scenario(
    random_seed: int,
    base_config: Dict[str, Any],
    optimal_params: Dict[str, Any],
    X_complete: pd.DataFrame,
    y_complete: pd.DataFrame,
    portfolio_data: Dict, # PortfolioData
    correlation_df: pd.DataFrame
) -> Tuple[float, float]:
    """
    Executes a single run of the analysis pipeline for a given random seed.

    This helper function takes a specific random seed, re-splits the data,
    re-trains the model, and re-evaluates the final Spearman correlation.

    Args:
        random_seed (int): The random state to use for the train-test split.
        base_config (Dict[str, Any]): The base configuration dictionary.
        optimal_params (Dict[str, Any]): The fixed, optimal model hyperparameters.
        (and all necessary data artifacts: X_complete, y_complete, etc.)

    Returns:
        Tuple[float, float]: A tuple containing the test RMSE of the model and
                             the final average Spearman correlation for STRAPSim.
    """
    # Create a deep copy of the config and update the random seed for the split.
    scenario_config = copy.deepcopy(base_config)
    scenario_config['feature_engineering']['random_state_for_shuffle'] = random_seed

    # --- Step 1: Re-split the data with the new random seed ---
    X_train, X_test, y_train, y_test = _perform_data_split(
        X_complete, y_complete, scenario_config
    )

    # --- Step 2: Re-train the model and evaluate its RMSE ---
    # Note: We do not re-run hyperparameter optimization. We use the optimal params.
    model = train_final_model_and_evaluate(
        X_train, y_train, X_test, y_test, optimal_params, scenario_config
    )

    # Extract the test RMSE for this run's model.
    y_test_pred = model.predict(X_test)
    test_rmse, _ = _calculate_multi_output_metrics(y_test, y_test_pred, y_test.columns.tolist())

    # --- Step 3: Generate Proximity and STRAPSim Matrices ---
    proximity_matrix_df = generate_proximity_matrix(model, X_complete, scenario_config)
    proximity_matrix_np = proximity_matrix_df.to_numpy()

    strapsim_df, _ = compute_all_strapsim_pairs(
        portfolio_data, proximity_matrix_np, scenario_config
    )

    # --- Step 4: Evaluate Final Spearman Correlation ---
    analysis_df = prepare_data_for_spearman_analysis(
        {'STRAPSim': strapsim_df}, correlation_df, scenario_config
    )
    spearman_results_df = compute_spearman_rank_correlation(analysis_df, scenario_config)
    avg_spearman_corr = spearman_results_df['correlation'].mean()

    return test_rmse, avg_spearman_corr


def run_data_split_sensitivity_analysis(
    base_config: Dict[str, Any],
    optimal_params: Dict[str, Any],
    # This function requires the complete, unsplit datasets.
    X_complete: pd.DataFrame,
    y_complete: pd.DataFrame,
    portfolio_data: Dict, # PortfolioData
    correlation_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Orchestrates a sensitivity analysis on the train-test split random seed.

    This function assesses the robustness of the study's results to the specific
    random partitioning of data into training and testing sets. It repeatedly
    runs the core analysis pipeline, each time with a different random seed,
    and analyzes the stability of the final performance metrics.

    The process is as follows:
    1.  **Define Scenarios**: A predefined list of random seeds is used.
    2.  **Iterate and Execute**: For each seed, it re-splits the data, re-trains
        the model using the fixed optimal hyperparameters, and re-calculates
        the model's test RMSE and the final STRAPSim Spearman correlation.
    3.  **Aggregate and Report**: Compiles the results from all runs into a
        summary DataFrame and calculates the mean and standard deviation of the
        key metrics to quantify the stability of the results.

    Args:
        base_config (Dict[str, Any]): The validated global configuration dictionary.
        optimal_params (Dict[str, Any]): The optimal parameters found by GridSearchCV.
        X_complete (pd.DataFrame): The complete, unsplit feature matrix.
        y_complete (pd.DataFrame): The complete, unsplit target matrix.
        (and other necessary data artifacts)

    Returns:
        pd.DataFrame: A summary DataFrame showing the test RMSE and average
                      Spearman correlation for each tested random seed.
    """
    logging.info("--- Starting Data Split Sensitivity Analysis ---")

    # --- Step 1: Define Scenarios (Random Seeds) ---
    # A list of seeds to test the stability of the results.
    random_seeds_to_test: List[int] = [0, 1, 7, 21, 42, 99, 123, 456, 789, 999]
    logging.info(f"Testing {len(random_seeds_to_test)} different random seeds for train-test split.")

    # --- Step 2: Iterate and Execute ---
    results: List[Dict[str, Any]] = []

    for i, seed in enumerate(random_seeds_to_test):
        logging.info(f"--- Running Scenario {i+1}/{len(random_seeds_to_test)}: random_seed={seed} ---")
        try:
            test_rmse, avg_spearman_corr = _run_single_split_scenario(
                random_seed=seed,
                base_config=base_config,
                optimal_params=optimal_params,
                X_complete=X_complete,
                y_complete=y_complete,
                portfolio_data=portfolio_data,
                correlation_df=correlation_df
            )

            results.append({
                'random_seed': seed,
                'test_rmse': test_rmse,
                'avg_spearman_corr': avg_spearman_corr
            })
        except Exception as e:
            logging.error(f"Scenario with seed {seed} failed with error: {e}")
            results.append({
                'random_seed': seed,
                'test_rmse': np.nan,
                'avg_spearman_corr': np.nan
            })

    # --- Step 3: Aggregate and Report ---
    results_df = pd.DataFrame(results).set_index('random_seed')

    logging.info("--- Data Split Sensitivity Analysis Complete ---")
    logging.info(f"\n--- Sensitivity Analysis Results ---\n{results_df.to_string()}")

    # Calculate and log summary statistics for the key outcome variable.
    mean_corr = results_df['avg_spearman_corr'].mean()
    std_corr = results_df['avg_spearman_corr'].std()
    logging.info(f"\n--- Stability Summary ---")
    logging.info(f"Average Spearman Correlation across all seeds: {mean_corr:.4f}")
    logging.info(f"Standard Deviation of Spearman Correlation:   {std_corr:.4f}")

    if std_corr > 0.05:
        logging.warning("High standard deviation detected, suggesting results may be sensitive to the data split.")
    else:
        logging.info("Low standard deviation detected, suggesting results are robust to the data split.")

    return results_df


In [None]:
# Task 28: Similarity Metric Component Sensitivity

# Define type hints from previous tasks for clarity.
Portfolio = Dict[str, Union[List[str], np.ndarray]]
PortfolioData = Dict[str, Portfolio]
BERTScoreFullResult = Tuple[float, float, float, float, float, float]


def _run_threshold_sensitivity_analysis(
    base_config: Dict[str, Any],
    portfolio_data: PortfolioData,
    proximity_matrix_np: np.ndarray
) -> pd.DataFrame:
    """
    Analyzes the sensitivity of STRAPSim results to the convergence threshold.

    Args:
        base_config (Dict[str, Any]): The base configuration dictionary.
        portfolio_data (PortfolioData): The structured portfolio data.
        proximity_matrix_np (np.ndarray): The NumPy proximity matrix.

    Returns:
        pd.DataFrame: A summary of results for each tested threshold.
    """
    logging.info("--- Starting STRAPSim Convergence Threshold Sensitivity Analysis ---")

    # Define the scenarios: a range of convergence thresholds to test.
    thresholds_to_test = [1e-4, 1e-5, 1e-6, 1e-7, 1e-8]
    results = []

    for threshold in thresholds_to_test:
        logging.info(f"Running analysis with convergence_threshold = {threshold}...")
        # Create a deep copy of the config and modify the threshold.
        scenario_config = copy.deepcopy(base_config)
        scenario_config['strapsim_algorithm']['weight_precision_tolerance'] = threshold

        start_time = time.time()
        # Re-compute the STRAPSim matrix with the modified config.
        strapsim_df, _ = compute_all_strapsim_pairs(
            portfolio_data, proximity_matrix_np, scenario_config
        )
        duration = time.time() - start_time

        # Calculate the average similarity score (excluding the diagonal).
        np.fill_diagonal(strapsim_df.values, np.nan)
        avg_score = strapsim_df.stack().mean()

        results.append({
            'convergence_threshold': threshold,
            'avg_strapsim_score': avg_score,
            'computation_time_sec': duration
        })

    return pd.DataFrame(results).set_index('convergence_threshold')


def _run_low_weight_holding_sensitivity_analysis(
    base_config: Dict[str, Any],
    holdings_clean_df: pd.DataFrame,
    proximity_matrix_df: pd.DataFrame,
    correlation_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Analyzes the sensitivity of results to the exclusion of low-weight holdings.

    Args:
        base_config (Dict[str, Any]): The base configuration dictionary.
        holdings_clean_df (pd.DataFrame): The cleansed holdings data.
        proximity_matrix_df (pd.DataFrame): The proximity matrix as a DataFrame.
        correlation_df (pd.DataFrame): The ground-truth return correlation matrix.

    Returns:
        pd.DataFrame: A summary of evaluation results for each exclusion threshold.
    """
    logging.info("--- Starting Low-Weight Holding Exclusion Sensitivity Analysis ---")

    # Define scenarios: weight thresholds below which holdings are excluded. 0.0 is the baseline.
    exclusion_thresholds = [0.0, 0.0001, 0.001, 0.005, 0.01] # 0.0%, 0.01%, 0.1%, 0.5%, 1.0%
    results = []
    proximity_matrix_np = proximity_matrix_df.to_numpy()

    for threshold in exclusion_thresholds:
        logging.info(f"Running analysis with exclusion_threshold = {threshold:.4f}...")

        # Filter the holdings DataFrame.
        filtered_holdings = holdings_clean_df[holdings_clean_df['weight'] >= threshold].copy()

        # CRITICAL: Re-normalize the weights of the remaining holdings.
        weight_sums = filtered_holdings.groupby('etf_id')['weight'].transform('sum')
        filtered_holdings['weight'] = filtered_holdings['weight'] / weight_sums

        # Re-run the core analysis pipeline with the filtered & re-normalized data.
        portfolio_data_filtered = extract_portfolio_level_data(filtered_holdings, proximity_matrix_df)

        strapsim_df, _ = compute_all_strapsim_pairs(
            portfolio_data_filtered, proximity_matrix_np, base_config
        )

        analysis_df = prepare_data_for_spearman_analysis(
            {'STRAPSim': strapsim_df}, correlation_df, base_config
        )
        spearman_results_df = compute_spearman_rank_correlation(analysis_df, base_config)
        avg_spearman_corr = spearman_results_df['correlation'].mean()

        results.append({
            'exclusion_threshold': threshold,
            'avg_spearman_corr': avg_spearman_corr,
            'avg_holdings_per_etf': filtered_holdings.groupby('etf_id').size().mean()
        })

    return pd.DataFrame(results).set_index('exclusion_threshold')


def _compute_bertscore_for_pair(
    portfolio_x: Portfolio,
    portfolio_y: Portfolio,
    proximity_matrix: np.ndarray,
    weight_scheme: str = 'min_weight'
) -> BERTScoreFullResult:
    """
    Computes the adapted BERTScore and its residuals with dynamic weight schemes.

    This function provides a complete implementation of the adapted BERTScore
    metric as described in Section 2.4 of the paper, including both the primary
    scores (Equations 6a, 6b, 6c) and the residual scores (Equations 7a, 7b, 7c).
    This version is updated to accept a `weight_scheme` parameter, which
    dynamically alters the logic used in the residual calculation, making it
    suitable for sensitivity analysis.

    Args:
        portfolio_x (Portfolio): The structured data for the reference portfolio (X).
        portfolio_y (Portfolio): The structured data for the candidate portfolio (Y).
        proximity_matrix (np.ndarray): The global (N x N) matrix of similarities.
        weight_scheme (str): The weighting scheme to use for residual calculation.
                             Options: 'min_weight', 'max_weight', 'arithmetic_mean'.
                             Defaults to 'min_weight'.

    Returns:
        BERTScoreFullResult: A tuple containing six scores:
        (Recall, Precision, F1, Recall_Residual, Precision_Residual, F1_Residual).

    Raises:
        ValueError: If an unknown `weight_scheme` is provided.
    """
    # --- Initialization ---
    # Extract integer indices for efficient slicing of the proximity matrix.
    indices_x: np.ndarray = portfolio_x['indices']
    indices_y: np.ndarray = portfolio_y['indices']

    # Extract weight vectors as NumPy arrays.
    weights_x: np.ndarray = portfolio_x['weights']
    weights_y: np.ndarray = portfolio_y['weights']

    # --- Edge Case Handling ---
    # Handle cases where one or both portfolios are empty.
    if len(indices_x) == 0 or len(indices_y) == 0:
        # If both are empty, they are identical (similarity=1, residual=0).
        is_identical = len(indices_x) == len(indices_y)
        score = 1.0 if is_identical else 0.0
        residual = 0.0 if is_identical else 1.0
        return (score, score, score, residual, residual, residual)

    # --- Primary Score Calculation (Vectorized) ---
    # Slice the proximity matrix to get the relevant sub-matrix of similarities.
    sub_matrix = proximity_matrix[np.ix_(indices_x, indices_y)]

    # Equation 6a (Recall): Weighted average of max similarities for each item in X.
    # For each row (item in X), find the maximum similarity score against all items in Y.
    max_sim_for_x = sub_matrix.max(axis=1)
    # The recall is the dot product of X's weights and these best-match scores.
    recall = np.dot(weights_x, max_sim_for_x)

    # Equation 6b (Precision): Weighted average of max similarities for each item in Y.
    # For each column (item in Y), find the maximum similarity score against all items in X.
    max_sim_for_y = sub_matrix.max(axis=0)
    # The precision is the dot product of Y's weights and these best-match scores.
    precision = np.dot(weights_y, max_sim_for_y)

    # Equation 6c (F1 Score): Harmonic mean of Precision and Recall.
    # Check for division by zero.
    if (precision + recall) == 0:
        f1_score = 0.0
    else:
        f1_score = 2 * (precision * recall) / (precision + recall)

    # --- DYNAMIC Residual Score Calculation (Iterative) ---
    # Helper to apply the selected weighting scheme.
    def get_transfer_weight(w1: float, w2: float) -> float:
        if weight_scheme == 'min_weight':
            return min(w1, w2)
        if weight_scheme == 'max_weight':
            return max(w1, w2)
        if weight_scheme == 'arithmetic_mean':
            return (w1 + w2) / 2.0
        # Raise an error for unsupported schemes.
        raise ValueError(f"Unknown weight_scheme: '{weight_scheme}'")

    # Equation 7a (Recall Residual): Calculate remaining weight in X.
    # Create a copy of the weights to track consumption.
    residual_weights_x = weights_x.copy()
    # Find the index of the best match in Y for each item in X.
    best_match_indices_for_x = np.argmax(sub_matrix, axis=1)
    # Iterate through each item in X to calculate its weight consumption.
    for i in range(len(indices_x)):
        # Get the index of the best-matching item in Y.
        best_match_j = best_match_indices_for_x[i]
        # Calculate the weight to be transferred based on the chosen scheme.
        transfer_weight = get_transfer_weight(weights_x[i], weights_y[best_match_j])
        # Decrement the residual weight.
        residual_weights_x[i] -= transfer_weight
    # The residual score is 1 minus the sum of remaining non-negative weights.
    recall_residual = 1.0 - np.sum(np.maximum(0, residual_weights_x))

    # Equation 7b (Precision Residual): Calculate remaining weight in Y.
    # Create a copy of the weights to track consumption.
    residual_weights_y = weights_y.copy()
    # Find the index of the best match in X for each item in Y.
    best_match_indices_for_y = np.argmax(sub_matrix, axis=0)
    # Iterate through each item in Y to calculate its weight consumption.
    for j in range(len(indices_y)):
        # Get the index of the best-matching item in X.
        best_match_i = best_match_indices_for_y[j]
        # Calculate the weight to be transferred.
        transfer_weight = get_transfer_weight(weights_y[j], weights_x[best_match_i])
        # Decrement the residual weight.
        residual_weights_y[j] -= transfer_weight
    # The residual score is 1 minus the sum of remaining non-negative weights.
    precision_residual = 1.0 - np.sum(np.maximum(0, residual_weights_y))

    # Equation 7c (F1 Residual): Harmonic mean of the two residual scores.
    # Check for division by zero.
    if (precision_residual + recall_residual) == 0:
        f1_residual = 0.0
    else:
        f1_residual = 2 * (precision_residual * recall_residual) / (precision_residual + recall_residual)

    # Return all six computed metrics.
    return recall, precision, f1_score, recall_residual, precision_residual, f1_residual


def compute_bertscore_matrix(
    portfolios: PortfolioData,
    proximity_matrix: np.ndarray,
    config: Dict[str, Any],
    weight_scheme: str = 'min_weight'
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates the computation of adapted BERTScore and residuals for all pairs.

    This function systematically calculates the full suite of six BERTScore-related
    metrics for every unique pair of ETFs. It is refactored to accept a
    `weight_scheme` parameter, which is passed down to the core calculation
    helper function, enabling sensitivity analysis on this component.

    Args:
        portfolios (PortfolioData): The nested dictionary of structured portfolio data.
        proximity_matrix (np.ndarray): The global (N x N) matrix of similarities.
        config (Dict[str, Any]): The validated global configuration dictionary.
        weight_scheme (str): The weighting scheme to use for residual calculation.
                             Defaults to 'min_weight'.

    Returns:
        Tuple[pd.DataFrame, ...]: A tuple containing the six square DataFrames:
        (Recall_Scores, Precision_Scores, F1_Scores, Recall_Residuals,
         Precision_Residuals, F1_Residuals).
    """
    # --- Input Validation ---
    if not isinstance(portfolios, dict):
        raise TypeError("Input 'portfolios' must be a dictionary.")
    if not isinstance(proximity_matrix, np.ndarray):
        raise TypeError("Input 'proximity_matrix' must be a numpy array.")

    # --- Initialization ---
    # Get a sorted list of ETF IDs for deterministic matrix order.
    etf_ids: List[str] = sorted(portfolios.keys())
    n_etfs = len(etf_ids)
    # Create a mapping from ETF ID to its integer index in the matrices.
    etf_to_matrix_idx: Dict[str, int] = {etf_id: i for i, etf_id in enumerate(etf_ids)}

    # Initialize six empty matrices for all scores and residuals.
    matrices = {
        "recall": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "precision": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "f1": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "recall_residual": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "precision_residual": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
        "f1_residual": np.full((n_etfs, n_etfs), np.nan, dtype=np.float64),
    }

    # Set diagonals: 1.0 for scores, 0.0 for residuals.
    for name, matrix in matrices.items():
        fill_value = 1.0 if "residual" not in name else 0.0
        np.fill_diagonal(matrix, fill_value)

    # --- Pairwise Computation ---
    # Generate all unique pairs of ETF IDs to compute.
    etf_pairs = list(itertools.combinations(etf_ids, 2))

    # Iterate through each unique pair.
    for etf_id_x, etf_id_y in tqdm(etf_pairs, desc=f"Computing BERTScore Suite (scheme: {weight_scheme})"):
        # Retrieve the structured data for the two portfolios.
        portfolio_x = portfolios[etf_id_x]
        portfolio_y = portfolios[etf_id_y]

        # Compute scores for the (X, Y) direction, passing the weight_scheme.
        r_xy, p_xy, f1_xy, rr_xy, pr_xy, fr_xy = _compute_bertscore_for_pair(
            portfolio_x, portfolio_y, proximity_matrix, weight_scheme
        )

        # Compute scores for the (Y, X) direction.
        r_yx, p_yx, f1_yx, rr_yx, pr_yx, fr_yx = _compute_bertscore_for_pair(
            portfolio_y, portfolio_x, proximity_matrix, weight_scheme
        )

        # Get matrix indices for storage.
        idx_x = etf_to_matrix_idx[etf_id_x]
        idx_y = etf_to_matrix_idx[etf_id_y]

        # Store directional scores (Recall and Precision + their residuals).
        matrices["recall"][idx_x, idx_y], matrices["recall"][idx_y, idx_x] = r_xy, r_yx
        matrices["precision"][idx_x, idx_y], matrices["precision"][idx_y, idx_x] = p_xy, p_yx
        matrices["recall_residual"][idx_x, idx_y], matrices["recall_residual"][idx_y, idx_x] = rr_xy, rr_yx
        matrices["precision_residual"][idx_x, idx_y], matrices["precision_residual"][idx_y, idx_x] = pr_xy, pr_yx

        # Store symmetric scores (F1 and its residual) by averaging for robustness.
        f1_avg = (f1_xy + f1_yx) / 2.0
        matrices["f1"][idx_x, idx_y] = matrices["f1"][idx_y, idx_x] = f1_avg
        fr_avg = (fr_xy + fr_yx) / 2.0
        matrices["f1_residual"][idx_x, idx_y] = matrices["f1_residual"][idx_y, idx_x] = fr_avg

    # --- Finalization ---
    # Convert all matrices to labeled DataFrames.
    dataframes = {
        name: pd.DataFrame(matrix, index=etf_ids, columns=etf_ids)
        for name, matrix in matrices.items()
    }

    # Return the tuple of six completed DataFrames.
    return (
        dataframes["recall"], dataframes["precision"], dataframes["f1"],
        dataframes["recall_residual"], dataframes["precision_residual"], dataframes["f1_residual"],
    )


def _run_bertscore_weight_scheme_analysis(
    base_config: Dict[str, Any],
    portfolio_data: PortfolioData,
    proximity_matrix_np: np.ndarray,
    correlation_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Analyzes the sensitivity of BERTScore results to the weighting scheme.

    This helper function iterates through different weighting schemes defined for
    the BERTScore residual calculation. For each scheme, it re-computes the
    BERTScore F1 matrix and evaluates its alignment with the ground-truth return
    correlations via the Spearman rank correlation test.

    Args:
        base_config (Dict[str, Any]): The base configuration dictionary.
        portfolio_data (PortfolioData): The structured portfolio data.
        proximity_matrix_np (np.ndarray): The NumPy proximity matrix.
        correlation_df (pd.DataFrame): The ground-truth return correlation matrix.

    Returns:
        pd.DataFrame: A summary of evaluation results for each weighting scheme.
    """
    # Announce the start of this specific analysis.
    logging.info("--- Starting BERTScore Weighting Scheme Sensitivity Analysis ---")

    # Define scenarios: different weighting schemes for the residual calculation.
    schemes_to_test = ['min_weight', 'max_weight', 'arithmetic_mean']
    # Initialize a list to store the results of each scenario.
    results = []

    # Iterate through each defined weighting scheme.
    for scheme in schemes_to_test:
        # Log the current scenario being executed.
        logging.info(f"Running analysis with weight_scheme = '{scheme}'...")

        # Re-compute the full suite of BERTScore matrices using the specific scheme.
        # We only need the F1 score matrix for this analysis.
        _, _, f1_df, _, _, _ = compute_bertscore_matrix(
            portfolio_data, proximity_matrix_np, base_config, weight_scheme=scheme
        )

        # Evaluate the final Spearman correlation for the F1 score from this scheme.
        # Prepare the data for analysis, focusing only on the 'BertScore' method.
        analysis_df = prepare_data_for_spearman_analysis(
            {'BertScore': f1_df}, correlation_df, base_config
        )
        # Compute the Spearman correlation results.
        spearman_results_df = compute_spearman_rank_correlation(analysis_df, base_config)
        # Calculate the average Spearman correlation for the BertScore method.
        avg_spearman_corr = spearman_results_df[spearman_results_df['method'] == 'BertScore']['correlation'].mean()

        # Append the results for this scheme to our list.
        results.append({'weight_scheme': scheme, 'avg_spearman_corr': avg_spearman_corr})

    # Convert the list of results into a pandas DataFrame, indexed by the scheme.
    return pd.DataFrame(results).set_index('weight_scheme')


def run_metric_component_sensitivity_analysis(
    base_config: Dict[str, Any],
    holdings_clean_df: pd.DataFrame,
    portfolio_data: PortfolioData,
    proximity_matrix_df: pd.DataFrame,
    correlation_df: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates a complete suite of sensitivity analyses on metric components.

    This complete, remediated version performs all three specified analyses:
    1.  **STRAPSim Convergence Threshold**: Tests sensitivity to numerical precision.
    2.  **Low-Weight Holding Exclusion**: Examines the impact of trimming portfolios.
    3.  **BERTScore Weighting Scheme**: Analyzes how different weighting schemes in
        the residual calculation affect the final outcome.

    Args:
        base_config (Dict[str, Any]): The validated global configuration dictionary.
        holdings_clean_df (pd.DataFrame): The cleansed, full holdings DataFrame.
        portfolio_data (PortfolioData): The structured portfolio data from the full dataset.
        proximity_matrix_df (pd.DataFrame): The proximity matrix as a DataFrame.
        correlation_df (pd.DataFrame): The ground-truth return correlation matrix.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary containing the summary DataFrames
                                 for each sensitivity analysis performed.
    """
    # --- Input Validation ---
    if not isinstance(base_config, dict):
        raise TypeError("Input 'base_config' must be a dictionary.")

    # Initialize a dictionary to store the results of each analysis.
    results: Dict[str, pd.DataFrame] = {}
    # Convert proximity matrix to NumPy array for performance.
    proximity_matrix_np = proximity_matrix_df.to_numpy()

    # --- Analysis 1: STRAPSim Convergence Threshold ---
    # Execute the sensitivity analysis for the STRAPSim convergence threshold.
    convergence_results_df = _run_threshold_sensitivity_analysis(
        base_config, portfolio_data, proximity_matrix_np
    )
    # Store the results.
    results['convergence_sensitivity'] = convergence_results_df
    # Log the summary table for immediate review.
    logging.info(f"\n--- Convergence Sensitivity Results ---\n{convergence_results_df.to_string()}")

    # --- Analysis 2: Low-Weight Holding Exclusion ---
    # Execute the sensitivity analysis for excluding low-weight holdings.
    low_weight_results_df = _run_low_weight_holding_sensitivity_analysis(
        base_config, holdings_clean_df, proximity_matrix_df, correlation_df
    )
    # Store the results.
    results['low_weight_sensitivity'] = low_weight_results_df
    # Log the summary table.
    logging.info(f"\n--- Low-Weight Holding Sensitivity Results ---\n{low_weight_results_df.to_string()}")

    # --- Analysis 3: BERTScore Weighting Scheme ---
    # Execute the newly added sensitivity analysis for BERTScore weighting schemes.
    bertscore_weight_results_df = _run_bertscore_weight_scheme_analysis(
        base_config, portfolio_data, proximity_matrix_np, correlation_df
    )
    # Store the results.
    results['bertscore_weight_sensitivity'] = bertscore_weight_results_df
    # Log the summary table.
    logging.info(f"\n--- BERTScore Weighting Sensitivity Results ---\n{bertscore_weight_results_df.to_string()}")

    # Return the dictionary containing all analysis results.
    return results


In [None]:
# Top-Level Orchestrators

def run_strapsim_pipeline_for_robustness(
    etf_holdings_df: pd.DataFrame,
    bond_features_df: pd.DataFrame,
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Executes the core STRAPSim pipeline and returns all intermediate artifacts.

    This function is a variant of the main pipeline orchestrator, designed
    specifically to produce not only the final results but also all the

    intermediate data structures (cleansed data, feature matrices, trained model,
    etc.) that are required as inputs for the subsequent robustness analyses.
    This "calculate-once, reuse-many" approach is highly efficient and robust for
    conducting sensitivity studies.

    The pipeline proceeds through the main analytical phases:
    1.  **Data Cleansing**: Standardizes and cleans the validated raw data.
    2.  **Feature Engineering**: Constructs the feature and target matrices for the
        Random Forest model, including one-hot encoding and normalization, and
        then splits the data into training and testing sets.
    3.  **Model Training**: Optimizes hyperparameters via cross-validation and
        trains the final model.
    4.  **Proximity Matrix Generation**: Uses the trained model to generate the
        constituent-level similarity matrix.
    5.  **Artifact Collection**: Gathers all key intermediate and final data
        structures into a single dictionary for downstream use.

    Note: This function intentionally omits the final similarity computation and
    evaluation steps of the main pipeline. Its sole purpose is to generate the
    foundational artifacts (like the proximity matrix and portfolio data) that
    are common inputs to all robustness analysis scenarios. The final evaluation
    is performed within each specific sensitivity test to measure the impact of
    the scenario's parameters.

    Args:
        etf_holdings_df (pd.DataFrame): The raw DataFrame of ETF holdings.
        bond_features_df (pd.DataFrame): The raw DataFrame of bond features.
        monthly_returns_df (pd.DataFrame): The raw DataFrame of monthly returns.
        config (Dict[str, Any]): The global configuration dictionary for the study.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing all key artifacts
                        produced during the pipeline execution, ready for use in
                        robustness analysis functions.

    Raises:
        ValueError: If any of the initial data validation checks fail.
        TypeError: If inputs are not of the expected type.
        Exception: Propagates any exception that occurs during the pipeline execution.
    """
    # Announce the start of the main pipeline execution.
    logging.info("--- Running Core STRAPSim Pipeline to Generate Artifacts ---")

    # Initialize a dictionary to store all major artifacts produced by the pipeline.
    # The configuration itself is the first artifact.
    pipeline_artifacts: Dict[str, Any] = {'config': config}

    try:
        # === PHASE 1 & 2: VALIDATION and CLEANSING ===
        # The validation functions are assumed to be called before this orchestrator.
        # This function proceeds directly to cleansing the already-validated data.
        logging.info("Cleansing and standardizing all input data...")

        # Cleanse the ETF holdings data.
        holdings_clean = cleanse_etf_holdings_dataframe(etf_holdings_df, config)

        # Cleanse the bond features master data.
        features_clean = cleanse_bond_features_dataframe(bond_features_df, config)

        # Cleanse the monthly returns time-series data.
        returns_clean = cleanse_monthly_returns_dataframe(monthly_returns_df, config)

        # Store the cleansed data artifacts.
        pipeline_artifacts.update({
            'holdings_clean': holdings_clean,
            'features_clean': features_clean,
            'returns_clean': returns_clean
        })

        # === PHASE 3: FEATURE ENGINEERING ===
        logging.info("Engineering features and splitting data...")

        # Prepare categorical features by one-hot encoding them.
        categorical_encoded = prepare_categorical_features(features_clean.set_index('cusip'), config)

        # Prepare numerical features by scaling them. The scaling parameters are also returned.
        numerical_scaled, scaling_params = prepare_numerical_features(features_clean.set_index('cusip'), config)

        # Assemble the final feature/target matrices and split into train/test sets.
        X_train, X_test, y_train, y_test = construct_and_split_data(
            features_clean, categorical_encoded, numerical_scaled, config
        )

        # Reconstruct the complete, unsplit feature and target matrices for later use.
        X_complete = pd.concat([X_train, X_test]).sort_index()
        y_complete = pd.concat([y_train, y_test]).sort_index()

        # Store all feature engineering artifacts.
        pipeline_artifacts.update({
            'X_train': X_train, 'X_test': X_test, 'y_train': y_train, 'y_test': y_test,
            'X_complete': X_complete, 'y_complete': y_complete, 'scaling_params': scaling_params
        })

        # === PHASE 4: MODEL TRAINING & PROXIMITY MATRIX ===
        logging.info("Training model and generating proximity matrix...")

        # Find the optimal hyperparameters using grid search on the training data.
        optimal_params = optimize_random_forest_hyperparameters(X_train, y_train, config)

        # Train the final model using the optimal parameters and evaluate its performance.
        final_model = train_final_model_and_evaluate(
            X_train, y_train, X_test, y_test, optimal_params, config
        )

        # Generate the constituent-level proximity matrix using the final model.
        proximity_matrix_df = generate_proximity_matrix(final_model, X_complete, config)

        # Store the modeling artifacts.
        pipeline_artifacts.update({
            'optimal_params': optimal_params,
            'trained_model': final_model,
            'proximity_matrix_df': proximity_matrix_df
        })

        # === PHASE 5: FINAL ARTIFACT PREPARATION ===
        logging.info("Preparing final data structures for analysis...")

        # Create the structured portfolio data dictionary needed by similarity algorithms.
        portfolio_data = extract_portfolio_level_data(holdings_clean, proximity_matrix_df)

        # Compute the ground-truth return correlation matrix.
        correlation_df = compute_return_correlation_matrix(returns_clean, config)

        # Store the final data structure artifacts.
        pipeline_artifacts.update({
            'portfolio_data': portfolio_data,
            'correlation_df': correlation_df
        })

    except Exception as e:
        # Catch any exception during the pipeline, log it with full traceback, and re-raise.
        logging.error("--- Artifact Generation Pipeline FAILED ---", exc_info=True)
        raise

    # Announce the successful completion of the artifact generation process.
    logging.info("--- Core Pipeline Finished: All Artifacts Generated Successfully ---")

    # Return the comprehensive dictionary containing all generated data and model objects.
    return pipeline_artifacts


def run_robustness_analysis_suite(
    pipeline_artifacts: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the execution of the complete suite of robustness analyses.

    This function takes a dictionary of pre-computed artifacts from the main
    research pipeline and uses them to run a series of sensitivity tests. This
    design is highly efficient as it avoids re-computing foundational data
    structures for each analysis.

    The suite includes:
    1.  **Hyperparameter Sensitivity (Task 26)**
    2.  **Data Split Sensitivity (Task 27)**
    3.  **Metric Component Sensitivity (Task 28)**

    Args:
        pipeline_artifacts (Dict[str, Any]): A dictionary containing all the
            necessary data and model artifacts from the main pipeline run.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary where keys are the names of the
                                 analyses and values are the corresponding
                                 summary DataFrames.
    """
    # --- Input Validation ---
    # Check for the presence of a few key artifacts to ensure the input is valid.
    required_artifacts = [
        'base_config', 'optimal_params', 'X_complete', 'y_complete',
        'portfolio_data', 'correlation_df', 'holdings_clean_df'
    ]
    if not all(key in pipeline_artifacts for key in required_artifacts):
        raise ValueError("Input 'pipeline_artifacts' is missing required data.")

    # Announce the start of the robustness analysis phase.
    logging.info("--- Starting Full Robustness Analysis Suite ---")

    # Initialize a dictionary to store the results of all analyses.
    robustness_results: Dict[str, pd.DataFrame] = {}

    try:
        # --- Unpack all necessary artifacts from the input dictionary ---
        # This makes the subsequent function calls clean and readable.
        base_config = pipeline_artifacts['base_config']
        optimal_params = pipeline_artifacts['optimal_params']
        X_complete = pipeline_artifacts['X_complete']
        y_complete = pipeline_artifacts['y_complete']
        X_train, y_train = pipeline_artifacts['X_train'], pipeline_artifacts['y_train']
        X_test, y_test = pipeline_artifacts['X_test'], pipeline_artifacts['y_test']
        portfolio_data = pipeline_artifacts['portfolio_data']
        correlation_df = pipeline_artifacts['correlation_df']
        holdings_clean_df = pipeline_artifacts['holdings_clean_df']
        proximity_matrix_df = pipeline_artifacts['proximity_matrix_df']

        # --- Execute Hyperparameter Sensitivity Analysis (Task 26) ---
        logging.info("\n" + "="*80)
        robustness_results['hyperparameter_sensitivity'] = run_hyperparameter_sensitivity_analysis(
            base_config, optimal_params, X_train, y_train, X_test, y_test,
            portfolio_data, correlation_df
        )

        # --- Execute Data Split Sensitivity Analysis (Task 27) ---
        logging.info("\n" + "="*80)
        robustness_results['data_split_sensitivity'] = run_data_split_sensitivity_analysis(
            base_config, optimal_params, X_complete, y_complete,
            portfolio_data, correlation_df
        )

        # --- Execute Metric Component Sensitivity Analysis (Task 28) ---
        logging.info("\n" + "="*80)
        robustness_results['metric_component_sensitivity'] = run_metric_component_sensitivity_analysis(
            base_config, holdings_clean_df, portfolio_data,
            proximity_matrix_df, correlation_df
        )

    except Exception as e:
        # Catch and log any errors during the analyses.
        logging.error("--- ROBUSTNESS ANALYSIS SUITE FAILED ---", exc_info=True)
        raise

    # Announce the successful completion of the entire suite.
    logging.info("\n" + "="*80)
    logging.info("--- Full Robustness Analysis Suite Completed Successfully ---")

    # Return the aggregated results.
    return robustness_results


def run_full_study(
    etf_holdings_df: pd.DataFrame,
    bond_features_df: pd.DataFrame,
    monthly_returns_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Top-level orchestrator to run the main analysis and all robustness checks.

    This function provides a single entry point to execute the entire study. It
    first runs the core pipeline to generate the main results and all necessary
    intermediate data artifacts. It then passes these artifacts to the robustness
    analysis suite to perform a full validation of the study's conclusions.

    Args:
        etf_holdings_df (pd.DataFrame): The raw DataFrame of ETF holdings.
        bond_features_df (pd.DataFrame): The raw DataFrame of bond features.
        monthly_returns_df (pd.DataFrame): The raw DataFrame of monthly returns.
        config (Dict[str, Any]): The global configuration dictionary for the study.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing the results from
                        both the main analysis and the robustness suite.
    """
    # --- Stage 1: Run the core pipeline to generate primary results and artifacts ---
    # The config is added to the artifacts dictionary for use in the next stage.
    pipeline_artifacts = run_strapsim_pipeline_for_robustness(
        etf_holdings_df, bond_features_df, monthly_returns_df, config
    )
    pipeline_artifacts['base_config'] = config

    # --- Stage 2: Run the full suite of robustness analyses using the artifacts ---
    robustness_results = run_robustness_analysis_suite(pipeline_artifacts)

    # --- Final Aggregation ---
    # Combine the main results (if any were stored) and the robustness results.
    final_output = {
        "main_analysis_artifacts": pipeline_artifacts,
        "robustness_analysis_results": robustness_results
    }

    return final_output

