# README.md

# Market Complexity Dashboard: A Framework for Advanced Financial Time Series Analysis

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Type Checking: mypy](https://img.shields.io/badge/type_checking-mypy-blue)](http://mypy-lang.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%23025596?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![Matplotlib](https://img.shields.io/badge/Matplotlib-%23ffffff.svg?style=flat&logo=Matplotlib&logoColor=black)](https://matplotlib.org/)
[![Seaborn](https://img.shields.io/badge/Seaborn-%233776AB.svg?style=flat&logo=seaborn&logoColor=white)](https://seaborn.pydata.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2507.23414-b31b1b.svg)](https://arxiv.org/abs/2507.23414)
[![Research](https://img.shields.io/badge/Research-Financial%20Complexity-green)](https://github.com/chirindaopensource/multifractal_multiscale_analysis)
[![Discipline](https://img.shields.io/badge/Discipline-Econophysics-blue)](https://github.com/chirindaopensource/multifractal_multiscale_analysis)
[![Methodology](https://img.shields.io/badge/Methodology-Nonlinear%20Time%20Series-orange)](https://github.com/chirindaopensource/multifractal_multiscale_analysis)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/multifractal_multiscale_analysis)

**Repository:** `https://github.com/chirindaopensource/multifractal_multiscale_analysis`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Complexity of Financial Time Series: Multifractal and Multiscale Entropy Analyses"** by:

*   Oday Masoudi
*   Farhad Shahbazi
*   Mohammad Sharifi

The project provides a complete, end-to-end computational framework for quantifying the dynamic complexity of financial assets. It moves beyond traditional, linear measures of risk (like standard deviation) to deliver a deeper, more nuanced characterization of market behavior based on concepts from information theory and fractal geometry. The goal is to provide a transparent, robust, and computationally efficient toolkit for researchers and practitioners to analyze nonlinearity, predictability, and the rich scaling structure of financial time series.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: market_complexity_dashboard_generator](#key-callable-market_complexity_dashboard_generator)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "Complexity of Financial Time Series: Multifractal and Multiscale Entropy Analyses." The core of this repository is the iPython Notebook `multifractal_multiscale_analysis_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation and cleansing to the final generation of complexity metrics, statistical tests, and publication-quality reports.

Traditional financial analysis often relies on the first two moments of the return distribution (mean and variance). However, these measures fail to capture the rich, nonlinear, and non-stationary dynamics that characterize modern financial markets. This project provides the tools to explore this hidden structure.

This codebase enables users to:
-   Rigorously validate, cleanse, and prepare financial time series data using a sophisticated, multi-stage protocol.
-   Quantify the irregularity and predictability of assets across dozens of time scales using Refined Composite Multiscale Sample Entropy (RCMSE).
-   Characterize the "fractal fingerprint" of an asset's volatility using Multifractal Detrended Fluctuation Analysis (MF-DFA).
-   Statistically test for the presence of significant nonlinear correlations by comparing results against a computationally generated null model (surrogate data).
-   Automatically generate a complete "complexity dashboard," including summary tables, scientific visualizations, and validation reports.

## Theoretical Background

The implemented methods are grounded in econophysics, a field that applies concepts from statistical physics and nonlinear dynamics to understand economic and financial systems.

**1. Refined Composite Multiscale Sample Entropy (RCMSE):**
Sample Entropy is a measure of the unpredictability or irregularity of a time series. It quantifies the conditional probability that two similar patterns of length `m` will remain similar when a new data point is added. A lower entropy implies more regularity and predictability. RCMSE extends this concept by:
-   **Multiscale Analysis:** Applying a "coarse-graining" procedure to analyze the series at different time scales (horizons), revealing how predictability changes from short-term to long-term.
-   **Refined Composite Method:** Using overlapping windows during coarse-graining to produce more stable and reliable entropy estimates, especially for the finite time series common in finance.
The final calculation is based on the logarithm of the ratio of aggregated pattern counts:
$$
\text{RCMSE}(x, \tau, m, r) = -\ln\left(\frac{\sum_{k=1}^{\tau} n^{m+1}_{(k,\tau)}}{\sum_{k=1}^{\tau} n^{m}_{(k,\tau)}}\right)
$$

**2. Multifractal Detrended Fluctuation Analysis (MF-DFA):**
Financial time series often exhibit multifractality, meaning their scaling properties (how volatility changes with the measurement scale) are not uniform but vary over time. MF-DFA is a powerful technique to quantify this phenomenon.
-   **Detrended Fluctuation:** The method systematically removes local trends from the data at different scales `s`, allowing for an accurate measurement of the underlying fluctuation dynamics.
-   **q-order Moments:** It calculates a `q`-dependent fluctuation function `F_q(s)` that magnifies either large (`q > 0`) or small (`q < 0`) fluctuations.
-   **Singularity Spectrum:** The final output is the singularity spectrum, `f(α)`. This spectrum is a "fingerprint" of the asset's complexity. The width of the spectrum, `Δα`, is a direct measure of the degree of multifractality: a wider spectrum implies a more complex, heterogeneous process with a richer mixture of behaviors.
$$
F_q(s) \sim s^{h(q)} \quad \xrightarrow{\text{Legendre Transform}} \quad (\alpha, f(\alpha))
$$

## Features

The provided iPython Notebook (`multifractal_multiscale_analysis_draft.ipynb`) implements the full research pipeline, including:

-   **Data Pipeline:** A robust validation and cleansing module that performs structural checks, advanced gap-filling, and outlier removal.
-   **High-Performance Analytics:** Elite-grade, vectorized implementations of RCMSE and MF-DFA using advanced NumPy features for maximum speed and memory efficiency.
-   **Statistical Rigor:** A parallelized surrogate data analysis framework to test the statistical significance of the complexity measures.
-   **Automated Orchestration:** A master function that runs the entire end-to-end workflow, from raw data to final reports, with a single call.
-   **Comprehensive Reporting:** Automated generation of publication-quality summary tables and scientific visualizations that replicate the key exhibits from the source paper.
-   **Built-in Quality Control:** An automated verification module to check the plausibility of results against theoretical expectations and known benchmarks.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Validation and Preparation (Task 1):** The pipeline ingests raw price data, performs over a dozen structural and quality checks, and applies a sophisticated cleansing protocol including targeted gap-filling and outlier removal.
2.  **Preprocessing (Task 2):** It transforms clean prices into log-returns, calculates their key statistical moments, and generates a shuffled surrogate dataset for control analysis.
3.  **RCMSE Analysis (Task 3):** It computes the full RCMSE profile and total complexity score for each asset.
4.  **MF-DFA Analysis (Task 4):** It computes the generalized Hurst exponents and the full singularity spectrum (`α`, `f(α)`, `Δα`) for each asset.
5.  **Control Analysis (Task 5):** It runs the RCMSE and MF-DFA analyses on an ensemble of shuffled datasets in parallel to generate a null distribution for the complexity metrics.
6.  **Statistical Testing (Task 7):** It performs formal hypothesis tests (empirical p-values, z-scores) to determine if the complexity of the original data is statistically significant.
7.  **Reporting and Verification (Task 8):** It synthesizes all outputs into summary tables, plots, and a final verification report.

## Core Components (Notebook Structure)

The `multifractal_multiscale_analysis_draft.ipynb` notebook is structured as a logical pipeline with modular functions for each task:

-   **Task 1:** Data Validation and Cleansing (`validate_and_prepare_data`).
-   **Task 2:** Preprocessing and Characterization (`preprocess_and_characterize_data`).
-   **Task 3:** RCMSE Implementation (`compute_rcmse_profile`).
-   **Task 4:** MF-DFA Implementation (`compute_mfdfa_analysis`).
-   **Task 5:** Shuffling Control Analysis (`conduct_shuffling_analysis`).
-   **Task 6:** Primary Pipeline Orchestrator (`run_complexity_analysis_pipeline`).
-   **Task 7:** Robustness and Statistical Significance Testing.
-   **Task 8:** Output Generation, Verification, and Master Orchestrator.

## Key Callable: market_complexity_dashboard_generator

The central function in this project is `market_complexity_dashboard_generator`. It orchestrates the entire analytical workflow from raw data to a final, comprehensive report object.

```python
def market_complexity_dashboard_generator(
    df_prices: pd.DataFrame,
    study_config: Dict[str, Any],
    run_sensitivity: bool = False
) -> Dict[str, Any]:
    """
    Master orchestrator for the entire Market Complexity Dashboard framework.
    """
    # ... (implementation is in the notebook)
```

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `matplotlib`, `seaborn`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/multifractal_multiscale_analysis.git
    cd multifractal_multiscale_analysis
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy matplotlib seaborn
    ```

## Input Data Structure

The pipeline requires two inputs passed to the `market_complexity_dashboard_generator` function:

1.  **`df_prices`**: A `pandas.DataFrame` where the index is a `DatetimeIndex` and columns are individual asset prices.
2.  **`study_config`**: A nested Python dictionary that controls all hyperparameters of the analysis. A fully specified example is provided in the notebook.

## Usage

The `multifractal_multiscale_analysis_draft.ipynb` notebook provides a complete, step-by-step guide. The core workflow is:

1.  **Prepare Inputs:** Load your asset prices into a DataFrame and define the `study_config` dictionary.
2.  **Execute Pipeline:** Call the master orchestrator function:
    ```python
    dashboard = market_complexity_dashboard_generator(
        df_prices=my_price_data_df,
        study_config=my_config
    )
    ```
3.  **Inspect Outputs:** Programmatically access any result from the returned `dashboard` dictionary. For example, to view the main MF-DFA summary table:
    ```python
    mdfa_summary_table = dashboard['summary_tables']['Table3_MFDFA_Spectrum_Widths']
    print(mdfa_summary_table)
    ```

## Output Structure

The `market_complexity_dashboard_generator` function returns a single, comprehensive dictionary with the following top-level keys:

-   `main_analysis_results`: A deeply nested dictionary containing all raw numerical results from the RCMSE, MF-DFA, and shuffling analyses.
-   `pipeline_validation_report`: A dictionary summarizing the results of the automated quality control checks.
-   `sensitivity_analysis_results`: A `pd.DataFrame` with the results of the parameter sensitivity analysis (if run).
-   `summary_tables`: A dictionary of `pd.DataFrame` objects, each representing a publication-quality summary table.
-   `figure_paths`: A dictionary mapping figure names to the file paths where they were saved.
-   `benchmark_verification_report`: A report detailing the comparison of key results against known benchmarks.

## Project Structure

```
multifractal_multiscale_analysis/
│
├── multifractal_multiscale_analysis_draft.ipynb  # Main implementation notebook   
├── requirements.txt                              # Python package dependencies
├── LICENSE                                       # MIT license file
└── README.md                                     # This documentation file
```

## Customization

The pipeline is highly customizable via the master `study_config` dictionary. Users can easily modify:
-   The `embedding_dimension_m` and `tolerance_r_factor` for RCMSE.
-   The `q_range` and `polynomial_order` for MF-DFA.
-   The `time_series_length` and `study_end_date` to change the analysis window.
-   The `parameter_grid` in the `run_parameter_sensitivity_analysis` function to test different hyperparameters.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{masoudi2025complexity,
  title={Complexity of Financial Time Series: Multifractal and Multiscale Entropy Analyses},
  author={Masoudi, Oday and Shahbazi, Farhad and Sharifi, Mohammad},
  journal={arXiv preprint arXiv:2507.23414},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Python Implementation of "Complexity of Financial Time Series: Multifractal and Multiscale Entropy Analyses".
GitHub repository: https://github.com/chirindaopensource/multifractal_multiscale_analysis
```

## Acknowledgments

-   Credit to Oday Masoudi, Farhad Shahbazi, and Mohammad Sharifi for their clear and insightful research.
-   Thanks to the developers of the scientific Python ecosystem (`numpy`, `pandas`, `scipy`, `matplotlib`) that makes this work possible.

--

*This README was generated based on the structure and content of `multifractal_multiscale_analysis_draft.ipynb` and follows best practices for research software documentation.*

# Paper

Title: "*Complexity of Financial Time Series: Multifractal and Multiscale Entropy Analyses*"

Authors: Oday Masoudi, Farhad Shahbazi, Mohammad Sharifi

E-Journal Submission Date: 31 July 2025

Link: https://arxiv.org/abs/2507.23414

Abstract:

We employed Multifractal Detrended Fluctuation Analysis (MF-DFA) and Refined Composite Multiscale Sample Entropy (RCMSE) to investigate the complexity of Bitcoin, GBP/USD, gold, and natural gas price log-return time series. This study provides a comparative analysis of these markets and offers insights into their predictability and associated risks. Each tool presents a unique method to quantify time series complexity. The RCMSE and MF-DFA methods demonstrate a higher complexity for the Bitcoin time series than others. It is discussed that the increased complexity of Bitcoin may be attributable to the presence of higher nonlinear correlations within its log-return time series.



# Summary

### **Overall Assessment**

The paper presents a comparative analysis of the complexity of four distinct financial assets—Bitcoin (cryptocurrency), GBP/USD (forex), gold (precious metal), and natural gas (commodity)—using two sophisticated time-series analysis techniques. The authors' primary contribution is not the development of new methods but rather the application and synthesis of these tools to provide a quantitative, comparative measure of complexity and to infer the nature of the underlying market dynamics. The choice of Refined Composite Multiscale Sample Entropy (RCMSE) is a methodologically sound decision that strengthens their analysis.

--

### **Step-by-Step Summary**

#### **Step 1: Problem Formulation and Data Preparation**

*   **Objective:** The central goal is to quantify and compare the "complexity" of different financial time series. Complexity here is not a vague term; it is specifically investigated through two lenses: (1) the regularity and predictability of the series across different time scales, and (2) the fractal nature and long-range correlations within the series.
*   **Assets Chosen:** The authors selected a diverse set of assets to ensure a broad comparison:
    *   **Bitcoin (BTC):** A highly volatile, relatively new digital asset class.
    *   **GBP/USD:** A major, mature foreign exchange pair.
    *   **Gold:** A traditional safe-haven asset and precious metal.
    *   **Natural Gas:** A volatile energy commodity.
*   **Data Transformation:** Crucially, the authors do not analyze the raw price series. They correctly convert the daily closing prices into **logarithmic returns**: `ln(P_t / P_{t-1})`. This is standard and essential practice in financial econometrics for two reasons:
    1.  **Stationarity:** Return series are typically more stationary (or at least have weaker trends) than price series, which is a prerequisite for many statistical models.
    2.  **Interpretation:** Log returns approximate the percentage change and are indicative of market volatility and risk.

#### **Step 2: Methodology I - Quantifying Regularity with Refined Composite Multiscale Sample Entropy (RCMSE)**

This method measures how predictable a time series is at different time horizons.

*   **Core Idea (Sample Entropy - SampEn):** At its heart, SampEn quantifies regularity. It calculates the probability that two similar short sequences of data points will remain similar when the next data point is added. A low SampEn value implies high regularity and predictability (i.e., patterns repeat), while a high value implies randomness and unpredictability.
*   **The "Multiscale" Component (MSE):** Financial dynamics can differ at short (e.g., daily) and long (e.g., monthly) time scales. The multiscale approach addresses this by creating "coarse-grained" versions of the original time series. For a scale factor `τ`, it averages non-overlapping blocks of `τ` data points. SampEn is then calculated for each of these new, shorter, coarse-grained series.
*   **The "Refined Composite" Improvement (RCMSE):** A known issue with standard MSE is that as the scale `τ` increases, the coarse-grained series becomes very short, leading to unreliable entropy estimates. RCMSE, the method used in this paper, improves this by creating `τ` different overlapping coarse-grained series for each scale factor `τ`. It then averages the pattern-matching statistics across all these series before calculating a single, more robust entropy value for that scale. This is the paper's key methodological strength in the entropy analysis.

#### **Step 3: Methodology II - Characterizing Fractal Nature with Multifractal Detrended Fluctuation Analysis (MF-DFA)**

This method investigates whether the time series exhibits self-similarity and long-range correlations, and whether these properties are uniform (monofractal) or varied (multifractal).

*   **Core Idea (DFA):** DFA is designed to detect long-range correlations in non-stationary signals. It measures how the average fluctuation (or "detrended" volatility) of a signal grows with the size of the time window over which it is observed. The relationship is typically a power law, `F(s) ~ s^H`, where `H` is the Hurst exponent.
    *   `H = 0.5`: No long-range correlation (a random walk).
    *   `H > 0.5`: Persistent, long-range correlations.
    *   `H < 0.5`: Anti-persistent correlations.
*   **The "Multifractal" Component (MF-DFA):** Financial series are often more complex than a single Hurst exponent can describe. MF-DFA generalizes this by calculating a whole spectrum of scaling exponents, `h(q)`, corresponding to different statistical moments `q`. This leads to the **singularity spectrum, f(α)**.
*   **Key Output:** The primary result from MF-DFA is the **width of the singularity spectrum (Δα = α_max - α_min)**. A wider spectrum (large Δα) indicates a richer multifractal structure, meaning the local scaling behavior varies significantly throughout the series. This is interpreted as a higher degree of complexity.

#### **Step 4: Execution and Key Findings**

The authors apply these two methods to the log-return series of the four assets.

1.  **RCMSE Analysis Results:**
    *   **Scale-Dependent Behavior (Fig. 6):** At short time scales (τ < 10), Bitcoin has the *lowest* entropy, suggesting it is the most regular and predictable on a day-to-day basis. In contrast, GBP/USD is the most random.
    *   **Crossover Effect:** As the time scale increases, the roles reverse. At larger scales, Bitcoin's entropy becomes the *highest*, indicating it is the most unpredictable and irregular over longer horizons.
    *   **Overall Complexity (Table 2):** By summing the entropy values across all scales (a common, though not unique, definition of total complexity), Bitcoin is found to be the most complex asset overall.

2.  **MF-DFA Analysis Results:**
    *   **Singularity Spectrum Width (Fig. 8, Table 3):** The MF-DFA results corroborate the RCMSE findings. Bitcoin exhibits the widest singularity spectrum (Δα = 0.62), while Natural Gas has the narrowest (Δα = 0.21). This confirms that Bitcoin's dynamics are the most multifractal and, by this measure, the most complex.

3.  **The Critical Test: Shuffling Analysis:**
    *   **Purpose:** To determine the source of the observed complexity. Is it due to (a) the distribution of returns (e.g., "fat tails") or (b) the temporal ordering and correlations in the data? Shuffling the time series preserves the distribution but destroys all temporal correlations.
    *   **Results:**
        *   For entropy (Fig. 7), shuffling significantly *increases* the sample entropy of Bitcoin, while having little effect on the others. This implies the original Bitcoin series had significant temporal structure/regularity that was destroyed by shuffling.
        *   For multifractality (Fig. 10), shuffling dramatically *narrows* the singularity spectrum for all assets, especially Bitcoin.
    *   **Interpretation:** This is a crucial finding. It demonstrates that the rich multifractality observed is primarily due to **long-range temporal correlations** and not just the fat-tailed nature of the returns distribution.

#### **Step 5: Synthesis and Final Conclusion**

The paper synthesizes these findings to draw a cohesive picture.

*   **Bitcoin's Unique Complexity:** Bitcoin is demonstrably more complex than the traditional assets analyzed. This complexity is multifaceted: it has the highest integrated multiscale entropy and the richest multifractal structure.
*   **Source of Complexity:** The authors combine the shuffling analysis with the standard Hurst exponent calculation (which shows H ≈ 0.5 for all assets, indicating no *linear* correlation). The conclusion is powerful: the complexity and multifractality in these financial assets, particularly Bitcoin, arise from **significant non-linear temporal correlations**.
*   **Implications:** The study shows that financial assets can have mixed characteristics. Bitcoin, for instance, exhibits more predictable behavior in the short term but becomes highly unstable and unpredictable over the long term. This complex dynamic is a hallmark of systems driven by a combination of factors like investor sentiment, algorithmic trading, and external shocks.

### **Critique and Further Thoughts**

This is a solid piece of work that correctly applies established econophysics methods.

*   **Strengths:** The comparative nature of the study is its main strength. The choice of RCMSE over standard MSE is technically robust. The use of shuffling to disentangle the sources of multifractality is excellent and leads to their most important conclusion about non-linear correlations.
*   **Limitations and Avenues for Future Research:**
    *   The analysis is purely statistical. It masterfully answers "what" the dynamics look like but cannot answer "why." The next step would be to link these complexity measures to underlying economic drivers, market microstructure data (like order book dynamics), or investor behavior.
    *   The definition of "complexity" as the sum of entropies is a heuristic. While reasonable, it's one of many possible definitions.
    *   It would be interesting to compare these physics-based complexity measures with standard econometric measures of risk and volatility, such as those from GARCH models, to see what new information they provide. Do periods of high multifractality correspond to periods of high volatility as measured by GARCH?

In summary, the authors provide compelling quantitative evidence that Bitcoin's market dynamics are fundamentally more complex and multifractal than those of established assets, and they correctly identify non-linear temporal dependencies as the primary source of this complexity.

# Import Essential Modules


In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  Market Complexity Dashboard: A Framework for Advanced Financial Time Series Analysis
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Complexity of Financial Time Series:
#  Multifractal and Multiscale Entropy Analyses" by Masoudi, Shahbazi, and
#  Sharifi (2025). It delivers a robust system for quantifying the dynamic
#  complexity of financial assets, moving beyond traditional linear measures to
#  characterize nonlinearity, predictability across time scales, and fractal
#  scaling behavior.
#
#  Core Methodological Components:
#  • Refined Composite Multiscale Sample Entropy (RCMSE) for measuring
#    irregularity and predictability across multiple time horizons.
#  • Multifractal Detrended Fluctuation Analysis (MF-DFA) for characterizing
#    the heterogeneous scaling properties and identifying the richness of
#    fractal dynamics.
#  • Surrogate Data Analysis (Shuffling) to rigorously test for the presence
#    of significant nonlinear temporal correlations against a null hypothesis
#    of randomness.
#  • Comprehensive data validation, cleansing, and preparation pipeline to
#    ensure the integrity and quality of the input financial data.
#
#  Technical Implementation Features:
#  • High-performance, vectorized algorithms for RCMSE and MF-DFA using
#    advanced NumPy features (broadcasting, stride tricks).
#  • Parallelized computation for surrogate data analysis to ensure tractable
#    execution times.
#  • A modular, end-to-end orchestrator that manages the entire workflow from
#    raw data ingestion to final report generation.
#  • Automated quality control, benchmark verification, and statistical
#    significance testing to ensure the robustness and credibility of results.
#  • Publication-quality output generation for both tabular summaries and
#    scientific visualizations.
#
#  Paper Reference:
#  Masoudi, O., Shahbazi, F., & Sharifi, M. (2025). Complexity of Financial
#  Time Series: Multifractal and Multiscale Entropy Analyses.
#  arXiv preprint arXiv:2507.23414.
#  https://arxiv.org/abs/2507.23414
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# --- Standard Library Imports ---
import os
import json
import logging
import itertools
import copy
from datetime import datetime
from multiprocessing import Pool, cpu_count
from typing import Dict, Any, List, Tuple, Optional

# --- Third-Party Library Imports ---
# Core data manipulation and numerical computation
import pandas as pd
import numpy as np

# Statistical functions and tests
from scipy import stats

# Visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Configure logging for status updates
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


# Implementation

## Draft 1

### Inputs, Processes and Output (IPO) Analysis of All Callables

#### **Module 1: Data Validation and Preparation**

**1. Callable: `_validate_study_config`**

*   **Inputs:**
    *   `study_config`: A `Dict[str, Any]` representing the entire configuration for the analysis.
*   **Processes:**
    1.  Performs a recursive, schema-based validation of the input dictionary's structure, ensuring all required keys and nested keys are present.
    2.  Validates the data type of each configuration value against a predefined schema (e.g., `int`, `float`, `np.ndarray`).
    3.  Executes a series of specific constraint checks on key hyperparameter values, such as ensuring the date format is correct, numerical values are within valid ranges (e.g., `0 < tolerance_r_factor < 1`), and the `q_range` does not contain zero.
    4.  If any check fails, it raises a `KeyError`, `TypeError`, or `ValueError` with a precise, informative message indicating the exact location of the failure.
*   **Outputs:**
    *   `None`: The function returns `None` upon successful validation, serving as a procedural gatekeeper.
*   **Data Transformation:** This function is a pure validator; it performs no data transformation. It reads the input dictionary and either allows the program to proceed or halts it with an exception.
*   **Role in Research Pipeline:** This callable serves as the foundational **gatekeeper for methodological fidelity**. It ensures that the entire analysis is executed with the exact hyperparameters and structural settings intended by the researcher, preventing silent failures or scientifically invalid results due to misconfiguration. It enforces the specific parameters mentioned throughout the paper, such as the `embedding_dimension_m = 3` and `tolerance_r_factor = 0.15` cited in Section 4.2.

**2. Callable: `_validate_dataframe_quality` **

*   **Inputs:**
    *   `df_prices`: A `pd.DataFrame` of raw asset prices.
    *   `expected_columns`: A `List[str]` of required column names.
*   **Processes:**
    1.  Conducts a comprehensive battery of structural and quality checks on the input DataFrame.
    2.  Validates structural properties: index is a `pd.DatetimeIndex`, columns match expectations, index is monotonic and has no duplicates, and the inferred frequency is Business Day (`'B'`).
    3.  Assesses data quality: checks for the presence of any `NaN` values and any non-positive (`<= 0`) prices.
    4.  Performs a non-mutating outlier check by calculating log-returns and identifying any daily changes that exceed a 3-sigma threshold based on a 30-day rolling window.
    5.  Aggregates all detected issues into a list of error messages.
*   **Outputs:**
    *   `None`: Returns `None` if all checks pass. Raises a consolidated `ValueError` if any issues are found.
*   **Data Transformation:** This is a pure validation function and does not transform the input data. It creates temporary data structures (e.g., log-returns) internally for its checks, but the input `df_prices` remains unmodified.
*   **Role in Research Pipeline:** This callable is the **guardian of data integrity**. Before any calculations are performed, it rigorously verifies that the raw input data meets the fundamental assumptions of a clean, well-structured financial time series. This prevents errors in subsequent stages (e.g., taking the logarithm of a non-positive price) and ensures that the analysis is not contaminated by data quality artifacts like gaps, duplicates, or extreme, erroneous price spikes.

**3. Callable: `_cleanse_and_truncate_data` **

*   **Inputs:**
    *   `df_prices`: A `pd.DataFrame` of raw asset prices.
    *   `time_series_length`: An `int` specifying the final desired length.
    *   `study_end_date`: A `str` specifying the final date of the study period.
*   **Processes:**
    1.  Creates a deep copy of the input DataFrame to ensure immutability.
    2.  Performs a strict, ordered sequence of cleansing operations: sorts the index, removes duplicate dates, and filters out any rows with non-positive prices.
    3.  Executes a sophisticated gap-filling protocol: it identifies contiguous blocks of `NaN`s and uses forward-and-backward filling *only* on isolated gaps of fewer than 3 periods, then drops any rows with remaining (larger) gaps.
    4.  Implements an outlier removal protocol by calculating log-returns and filtering out any *entire row* where any asset's daily change exceeds a dynamic 3-sigma threshold.
    5.  Filters the data to include only dates up to `study_end_date`.
    6.  Truncates the resulting clean DataFrame to the exact `time_series_length` by taking the tail.
*   **Outputs:**
    *   A new, cleansed, and truncated `pd.DataFrame`.
*   **Data Transformation:** This function is a pure transformation engine. It takes a raw DataFrame and systematically filters, fills, and slices it, transforming it into a clean, analysis-ready DataFrame of uniform length and high quality.
*   **Role in Research Pipeline:** This callable implements the crucial **data preparation** stage. The quality of the entire study depends on this step. It ensures that the time series fed into the complexity algorithms are free from common data issues that could severely bias the results, directly fulfilling the requirement for a uniform data length of N=3,730 as stated in Section 3, "Data Description".

**4. Callable: `validate_and_prepare_data`**

*   **Inputs:** `df_prices` (`pd.DataFrame`), `study_config` (`Dict`), `expected_columns` (`List[str]`).
*   **Processes:** Orchestrates the execution of the Task 1 helpers. It calls `_validate_study_config`, then `_validate_dataframe_quality`, and finally `_cleanse_and_truncate_data`.
*   **Outputs:** The clean, validated, and truncated `pd.DataFrame` returned by `_cleanse_and_truncate_data`.
*   **Data Transformation:** It orchestrates the transformation performed by its helper functions.
*   **Role in Research Pipeline:** This is the **master orchestrator for Task 1**, providing a single, clean entry point to the entire data validation and preparation workflow.

--

#### **Module 2: Data Preprocessing and Characterization**

**5. Callable: `_calculate_log_returns`**

*   **Inputs:** `df_prices`: A clean `pd.DataFrame` of asset prices.
*   **Processes:**
    1.  Validates that all prices are positive.
    2.  Applies the natural logarithm to every price in the DataFrame.
    3.  Computes the first discrete difference of the logged prices.
    4.  Removes the first row, which will contain `NaN` values as a result of the differencing.
*   **Outputs:** A new `pd.DataFrame` containing the log-returns, with `N-1` rows.
*   **Data Transformation:** It transforms a non-stationary price series into a more stationary log-return series, which represents the continuous compound rate of return.
*   **Role in Research Pipeline:** This function implements the fundamental data transformation described in Section 3 and Equation (15). This is the most critical transformation in the entire study, as both the RCMSE and MF-DFA analyses are performed on these log-return series.
    $$
    \text{Logarithmic Return} = \ln(P_t) - \ln(P_{t-1}) = \ln\left(\frac{P_t}{P_{t-1}}\right)
    $$

**6. Callable: `_characterize_log_returns`**

*   **Inputs:** `df_log_returns`: A `pd.DataFrame` of log-returns.
*   **Processes:**
    1.  Iterates through each column (asset) of the DataFrame.
    2.  For each asset's return series, it computes four key statistical moments: the mean, the sample standard deviation (`ddof=1`), the unbiased sample skewness, and the unbiased sample excess kurtosis (Fisher's definition).
    3.  It uses the robust `scipy.stats` library for skewness and kurtosis calculations.
*   **Outputs:** A nested `Dict[str, Dict[str, float]]` containing the computed statistics for each asset.
*   **Data Transformation:** This function transforms a DataFrame of time series data into a structured dictionary of summary statistics.
*   **Role in Research Pipeline:** This callable generates the quantitative results for the **preliminary data analysis**, as presented in Section 4.1 and Table 1. It provides the initial characterization of the return distributions, quantifying their deviation from normality via skewness and kurtosis, which is a key part of the paper's narrative. It implements the concepts behind:
    $$
    \text{Skewness} = \frac{\mu_3}{\sigma^3} \quad (\text{Equation 16})
    $$
    $$
    \text{ex-Kurtosis} = \frac{\mu_4}{\sigma^4} - 3 \quad (\text{Equation 17})
    $$

**7. Callable: `_generate_shuffled_control_set`**

*   **Inputs:** `df_log_returns` (`pd.DataFrame`), `seed` (`int`).
*   **Processes:**
    1.  Creates a deep copy of the input DataFrame.
    2.  Initializes a seeded random number generator for reproducibility.
    3.  Iterates through each column of the copied DataFrame.
    4.  For each column, it randomly permutes the temporal order of its values, leaving the index and other columns untouched.
*   **Outputs:** A new `pd.DataFrame` with the same shape, index, and columns as the input, but with shuffled values in each column.
*   **Data Transformation:** It transforms a temporally structured time series into a temporally random one while perfectly preserving its value distribution.
*   **Role in Research Pipeline:** This function creates the **null-hypothesis dataset** required for the control analysis. As discussed throughout the paper (e.g., in the analysis of Figure 7 and Figure 10), comparing the results from the original data to the results from this shuffled data is the method used to determine if the observed complexity is due to meaningful temporal correlations or simply the shape of the data's distribution.

**8. Callable: `preprocess_and_characterize_data`**

*   **Inputs:** `df_clean_prices` (`pd.DataFrame`), `seed` (`int`).
*   **Processes:** Orchestrates the execution of the Task 2 helpers. It calls `_calculate_log_returns`, `_characterize_log_returns`, and `_generate_shuffled_control_set`.
*   **Outputs:** A `Tuple` containing the `df_log_returns`, `df_log_returns_shuffled`, and the `statistical_properties` dictionary.
*   **Data Transformation:** It orchestrates the transformation from clean prices to the three core analytical inputs.
*   **Role in Research Pipeline:** This is the **master orchestrator for Task 2**, providing a single entry point to generate all the necessary data structures for the main complexity analyses.

--

#### **Module 3: RCMSE Implementation Framework**

**9. Callable: `_compute_sampen_components` **

*   **Inputs:** `series` (`np.ndarray`), `m` (`int`), `r` (`float`).
*   **Processes:**
    1.  Creates two memory-efficient "views" of the input series using `numpy.lib.stride_tricks.as_strided`: one for embedding vectors of length `m`, and one for length `m+1`.
    2.  For each embedding dimension, it calculates the Chebyshev distance between all pairs of vectors in a single, vectorized operation using NumPy broadcasting.
    3.  It counts the number of vector pairs whose distance is less than or equal to the tolerance `r`, excluding self-matches.
*   **Outputs:** A `Tuple[int, int]` containing `n_m` (the count of matching pairs of length `m`) and `n_m_plus_1` (the count of matching pairs of length `m+1`).
*   **Data Transformation:** It transforms a 1D time series into two scalar integer counts representing the frequency of repeating patterns.
*   **Role in Research Pipeline:** This is the high-performance **computational core of the Sample Entropy algorithm**. It implements the pattern-matching logic that underpins the entire RCMSE analysis, specifically the counting of vectors that satisfy the distance condition of Equation (2):
    $$
    d[X_m(i), X_m(j)] = \max_{0 \le k \le m-1} \{|x_{i+k} - x_{j+k}|\} \le r
    $$

**10. Callable: `compute_rcmse_profile`**

*   **Inputs:** `series` (`np.ndarray`), `rcmse_config` (`Dict`).
*   **Processes:**
    1.  Calculates a single, fixed tolerance `r` based on the standard deviation of the original series.
    2.  Iterates through each time scale `τ` from 1 to `max_scale_tau`.
    3.  For `τ=1`, it calls `_compute_sampen_components` on the original series.
    4.  For `τ>1`, it generates `τ` overlapping coarse-grained series according to the refined composite method.
    5.  For each coarse-grained series, it calls `_compute_sampen_components` and aggregates the resulting `n_m` and `n_m_plus_1` counts.
    6.  After processing all series for a given `τ`, it calculates the final RCMSE value using the logarithm of the ratio of the *total aggregated counts*.
    7.  It calculates the total complexity by summing the RCMSE values across all scales.
*   **Outputs:** A `Dict` containing the `rcmse_profile` (a 1D `np.ndarray` of entropy values) and the `total_complexity` (a `float`).
*   **Data Transformation:** It transforms a 1D time series into a complexity profile (entropy vs. scale) and a single scalar complexity score.
*   **Role in Research Pipeline:** This function implements the **full Refined Composite Multiscale Sample Entropy (RCMSE) analysis**, generating the results shown in Figure 6 and Table 2. It is the direct implementation of the multiscale entropy methodology, including the coarse-graining procedure of Equation (4) (conceptually) and the final aggregation and calculation of Equation (6):
    $$
    \text{RCMSE}(x, \tau, m, r) = -\ln\left(\frac{\sum_{k=1}^{\tau} n^{m+1}_{(k,\tau)}}{\sum_{k=1}^{\tau} n^{m}_{(k,\tau)}}\right)
    $$

--

#### **Module 4: MF-DFA Implementation Framework**

**11. Callable: `_construct_mdfa_profile`**

*   **Inputs:** `series` (`np.ndarray`).
*   **Processes:** Centers the input series by subtracting its mean, then computes the cumulative sum.
*   **Outputs:** A 1D `np.ndarray` representing the profile `Y(i)`.
*   **Data Transformation:** Transforms a stationary-like series into a non-stationary, random-walk-like profile.
*   **Role in Research Pipeline:** Implements **Step 1 of the MF-DFA algorithm**, as defined by Equation (7):
    $$
    Y(i) = \sum_{k=1}^{i} (x_k - \langle x \rangle)
    $$

**12. Callable: `_compute_fluctuation_function`**

*   **Inputs:** `profile` (`np.ndarray`), `s_range` (`np.ndarray`), `q_range` (`np.ndarray`), `polynomial_order` (`int`).
*   **Processes:**
    1.  Iterates through each scale `s` in `s_range`.
    2.  For each `s`, it divides the profile into `2*N_s` segments using the bidirectional approach.
    3.  For each segment, it fits and subtracts a polynomial of the specified order to find the local trend.
    4.  It calculates the variance of the residual (the detrended segment).
    5.  It then iterates through each moment `q` in `q_range` and computes the `q`-th order average of the variances to get `F_q(s)`. It handles the `q=0` case with a geometric mean.
*   **Outputs:** A 2D `np.ndarray` of shape `(len(q_range), len(s_range))` containing the `F_q(s)` values.
*   **Data Transformation:** Transforms the profile series into a matrix of fluctuation functions, which quantifies how the average fluctuation magnitude scales with both the observation window size `s` and the statistical moment `q`.
*   **Role in Research Pipeline:** This is the **computational core of the MF-DFA**, implementing Steps 2, 3, and 4 of the algorithm. It calculates the variance `F^2(s,v)` from Equations (8) & (9) and then the final fluctuation function `F_q(s)` from Equation (10):
    $$
    F_q(s) = \left[ \frac{1}{2N_s} \sum_{v=1}^{2N_s} [F^2(s,v)]^{q/2} \right]^{1/q}
    $$

**13. Callable: `_extract_scaling_exponents`**

*   **Inputs:** `f_q_s` (2D `np.ndarray`), `s_range` (`np.ndarray`).
*   **Processes:**
    1.  Iterates through each row of the `f_q_s` matrix (each corresponding to a value of `q`).
    2.  For each `q`, it performs a linear regression of `log(F_q(s))` against `log(s)`.
    3.  It extracts the slope of the regression line.
*   **Outputs:** A 1D `np.ndarray` containing the generalized Hurst exponents `h(q)`.
*   **Data Transformation:** It transforms the matrix of fluctuation functions into a single vector of scaling exponents.
*   **Role in Research Pipeline:** This function implements **Step 5 of the MF-DFA algorithm**, determining the scaling behavior of the time series. It finds the exponent `h(q)` in the power-law relationship defined by Equation (11):
    $$
    F_q(s) \sim s^{h(q)}
    $$

**14. Callable: `_compute_singularity_spectrum`**

*   **Inputs:** `h_q` (`np.ndarray`), `q_range` (`np.ndarray`).
*   **Processes:**
    1.  Calculates the mass exponent `τ(q)` from `h(q)` and `q`.
    2.  Numerically differentiates `τ(q)` with respect to `q` to find the Hölder exponent `α(q)`.
    3.  Performs a Legendre transform on `τ(q)` to find the singularity spectrum `f(α)`.
    4.  Calculates the width of the spectrum, `Δα`.
*   **Outputs:** A `Dict` containing `tau_q`, `alpha`, `f_alpha`, and `delta_alpha`.
*   **Data Transformation:** It transforms the vector of scaling exponents `h(q)` into the final multifractal spectrum, which is the primary "fingerprint" of the system's complexity.
*   **Role in Research Pipeline:** This function performs the final calculations of the MF-DFA, generating the results needed to plot the **singularity spectrum** (Figure 8) and populate Table 3. It is the direct implementation of the Legendre transform defined by Equations (12) and (13):
    $$
    \tau(q) = qh(q) - 1
    $$
    $$
    \alpha(q) = \frac{d\tau(q)}{dq}, \quad f(\alpha) = q\alpha - \tau(q)
    $$

**15. Callable: `compute_mfdfa_analysis`**

*   **Inputs:** `series` (`np.ndarray`), `mdfa_config` (`Dict`).
*   **Processes:** Orchestrates the entire MF-DFA workflow by calling `_construct_mdfa_profile`, `_compute_fluctuation_function`, `_extract_scaling_exponents`, and `_compute_singularity_spectrum` in sequence.
*   **Outputs:** A comprehensive `Dict` containing all intermediate and final results of the MF-DFA.
*   **Data Transformation:** It orchestrates the transformation of a 1D time series into a full multifractal characterization.
*   **Role in Research Pipeline:** This is the **master orchestrator for Task 4**, providing a single entry point to the entire MF-DFA analysis.

--

#### **Module 5-8: Analysis, Reporting, and Orchestration**

**16. Callable: `conduct_shuffling_analysis` **

*   **Inputs:** `df_log_returns` (`pd.DataFrame`), `original_results` (`Dict`), `study_config` (`Dict`), etc.
*   **Processes:**
    1.  Calls `_generate_shuffled_datasets` to create the surrogate ensemble.
    2.  Calls `_compute_shuffled_metrics` to compute complexity metrics for the ensemble in parallel.
    3.  Calculates summary statistics (mean, std, z-score, nonlinearity index) by comparing the original metrics to the distribution of shuffled metrics.
*   **Outputs:** A `Tuple` containing two dictionaries: one with the summary statistics and one with the raw lists of shuffled metrics.
*   **Data Transformation:** Transforms the original and surrogate time series data into a quantitative comparison of their complexity.
*   **Role in Research Pipeline:** This function executes the **entire control analysis**. It provides the statistical evidence needed to argue that the complexity observed in the original series is due to its temporal structure, a central conclusion of the paper.

**17. Callable: `run_complexity_analysis_pipeline` **

*   **Inputs:** `df_prices` (`pd.DataFrame`), `study_config` (`Dict`).
*   **Processes:** Serves as the top-level orchestrator, sequentially calling the master functions for Task 1, Task 2, Tasks 3 & 4 (per asset), and Task 5. It manages the entire data flow and aggregates all results.
*   **Outputs:** A single, comprehensive, nested `Dict` containing all results from all stages of the analysis, including the raw and summary shuffling results.
*   **Data Transformation:** Orchestrates the transformation of a raw price DataFrame into the final, all-encompassing results object.
*   **Role in Research Pipeline:** This is the **main entry point for the entire scientific study**, encapsulating the full research methodology in a single, reproducible function call.

**18. Callable: `conduct_statistical_significance_tests` **

*   **Inputs:** `shuffling_summary` (`Dict`), `shuffling_raw` (`Dict`), `alpha` (`float`).
*   **Processes:**
    1.  Calculates the Bonferroni-corrected significance threshold.
    2.  For each asset and metric, it computes an empirical one-sided p-value by comparing the original metric to the distribution of shuffled metrics.
    3.  It determines if the result is statistically significant based on the corrected alpha.
    4.  It calculates a 95% bootstrap confidence interval for the mean of the shuffled distribution.
*   **Outputs:** A `Dict` containing a detailed statistical report with p-values, significance flags, and confidence intervals.
*   **Data Transformation:** Transforms the raw distributions of shuffled metrics into a formal statistical hypothesis test report.
*   **Role in Research Pipeline:** This function provides the **formal statistical validation** for the shuffling analysis. It moves beyond the visual and descriptive comparisons to provide rigorous, quantitative evidence (p-values) for the paper's conclusions about the presence of nonlinear correlations.

**19. Callable: `generate_summary_tables`**

*   **Inputs:** `analysis_results` (`Dict`).
*   **Processes:** Navigates the nested results dictionary, extracts the specific scalar values needed for each of the paper's main tables, and formats them into human-readable `pd.DataFrame` objects.
*   **Outputs:** A `Dict` where keys are table names and values are the corresponding `pd.DataFrame` objects.
*   **Data Transformation:** Transforms the computational results dictionary into a set of presentation-ready tables.
*   **Role in Research Pipeline:** This is the **primary reporting tool for quantitative results**, directly generating the tabular summaries (Tables 1-4) that form the core of the paper's findings.

**20. Callable: `generate_publication_visualizations`**

*   **Inputs:** `analysis_results` (`Dict`), `output_dir` (`str`).
*   **Processes:** Uses `matplotlib` and `seaborn` to create plots that visually represent the key findings, such as the RCMSE profiles and singularity spectra, and saves them as image files.
*   **Outputs:** A `Dict` mapping figure names to the file paths where they were saved.
*   **Data Transformation:** Transforms numerical array data from the results dictionary into graphical representations.
*   **Role in Research Pipeline:** This is the **primary reporting tool for visual results**, generating the figures (e.g., Figures 6, 8) that provide an intuitive understanding of the data's complex behavior.

**21. Callable: `verify_results_against_benchmarks` **

*   **Inputs:** `analysis_results` (`Dict`), `benchmarks` (`Dict`).
*   **Processes:**
    1.  Iterates through a predefined set of benchmark checks.
    2.  For each check, it safely traverses the nested results dictionary using a robust path tuple to retrieve the actual computed value.
    3.  It compares the actual value to the expected benchmark value using a relative tolerance.
*   **Outputs:** A `Dict` containing a detailed verification report with the pass/fail status of each check.
*   **Data Transformation:** Transforms the results dictionary and a set of benchmarks into a validation report.
*   **Role in Research Pipeline:** This function serves as the **final automated quality assurance and replication check**. It programmatically verifies that the implementation produces results consistent with those reported in the source paper, providing a high degree of confidence in the correctness of the entire framework.

**22. Callable: `market_complexity_dashboard_generator` **

*   **Inputs:** `df_prices` (`pd.DataFrame`), `study_config` (`Dict`), `run_sensitivity` (`bool`).
*   **Processes:** Acts as the ultimate master orchestrator. It calls the main analysis pipeline, the optional sensitivity analysis, and all the final reporting and verification functions. It also handles the serialization of the final results to a JSON file using a custom encoder.
*   **Outputs:** A master `Dict` containing all artifacts produced during the run.
*   **Data Transformation:** Orchestrates all data transformations in the entire project.
*   **Role in Research Pipeline:** This is the **apex callable**, providing a single, all-encompassing interface to the entire research project, from raw data to final, verified, and persisted reports.


### Usage Example


The following is a granular, step-by-step guide on how to use the end-to-end pipeline via its master orchestrator, `market_complexity_dashboard_generator`. This example will serve as a template for conducting a complete and rigorous market complexity analysis, from initial data loading to the final interpretation of the generated reports.

--

### **Example Usage: End-to-End Market Complexity Analysis**

This example demonstrates the full workflow for analyzing the four assets mentioned in the paper: Bitcoin, GBP/USD, Gold, and Natural Gas.

#### **Step 1: Setup and Data Acquisition**

The first step is to prepare the environment by importing the necessary libraries and the framework itself. We then acquire the raw data. For a professional application, this data would come from a dedicated provider; for this demonstration, we will simulate this by creating a sample `pandas.DataFrame`.

```python
# Import the necessary libraries
import pandas as pd
import numpy as np

# Import the complete, remediated framework from its module
# (Assuming all functions are saved in a file named 'market_complexity_framework.py')
# from market_complexity_framework import market_complexity_dashboard_generator

# --- Data Acquisition ---
# In a real-world scenario, this data would be loaded from a CSV, a database,
# or a financial data API like Bloomberg or Refinitiv.
# For this example, we will generate a synthetic DataFrame that mimics the
# structure of the required input data.

# Define the date range and asset columns.
dates = pd.bdate_range(end='2024-12-02', periods=3730)
assets = ['BTC-USD', 'GBP-USD', 'GOLD', 'NATGAS']

# Generate synthetic price data: a geometric random walk.
# This ensures prices are always positive and have some realistic structure.
np.random.seed(42) # for reproducibility
initial_prices = [10000, 1.3, 1800, 3.5]
daily_vol = [0.04, 0.005, 0.01, 0.03]
log_returns = {
    asset: np.random.normal(0, vol, 3729)
    for asset, vol in zip(assets, daily_vol)
}

# Create the price DataFrame from the log returns.
df_prices = pd.DataFrame(index=dates)
for i, asset in enumerate(assets):
    # Calculate cumulative log returns and exponentiate to get the price path
    price_path = initial_prices[i] * np.exp(np.cumsum(np.insert(log_returns[asset], 0, 0)))
    df_prices[asset] = price_path

print("Sample of generated raw price data:")
print(df_prices.head())
print("\nShape of the raw price data:", df_prices.shape)
```

#### **Step 2: Define the Study Configuration**

The entire analysis is controlled by a single, comprehensive configuration dictionary. This is where all hyperparameters and global settings are defined. Using a dictionary for configuration is a critical best practice for ensuring reproducibility and making the analysis easy to modify. We will use the exact parameters specified in the original paper.

```python
# --- Study Configuration ---
# This dictionary defines every parameter for the analysis, ensuring that the
# entire study is reproducible from a single configuration object.

study_config = {
    "global_parameters": {
        "study_end_date": "2024-12-02",
        "time_series_length": 3730,
    },
    "analytical_modules": {
        "rcmse_analysis": {
            "hyperparameters": {
                "embedding_dimension_m": 3,
                "tolerance_r_factor": 0.15,
                "max_scale_tau": 100,
            }
        },
        "mdfa_analysis": {
            "hyperparameters": {
                "q_range": np.array(
                    [-5.0, -4.0, -3.0, -2.0, -1.0, -0.1, 0.1, 1.0, 2.0, 3.0, 4.0, 5.0]
                ), # A smaller q_range for a faster example run.
                "s_range_generation_params": {
                    "s_min": 10,
                    "s_max_divisor": 4,
                    "num_points": 30,
                },
                "polynomial_order": 1,
            }
        }
    }
}
```

#### **Step 3: Execute the End-to-End Pipeline**

With the data and configuration prepared, the entire analysis is executed with a single call to the master orchestrator function, `market_complexity_dashboard_generator`. This function encapsulates all tasks: validation, cleansing, preprocessing, core complexity analysis, shuffling analysis, reporting, and verification.

```python
# --- Pipeline Execution ---
# This single function call runs the entire end-to-end analysis.
# It ingests the raw data and the configuration, and returns a comprehensive
# dictionary containing all results, reports, tables, and figure paths.

# For this demonstration, we will set run_sensitivity to False to avoid the
# lengthy computation, but it could be set to True for a full robustness check.
final_dashboard_results = market_complexity_dashboard_generator(
    df_prices=df_prices,
    study_config=study_config,
    run_sensitivity=False
)
```

#### **Step 4: Review and Interpret the Outputs**

The `final_dashboard_results` object is a structured dictionary containing all artifacts from the study. The final step is to programmatically access and review these outputs.

##### **4.1 Reviewing Summary Tables**

The generated tables provide a high-level quantitative summary of the key findings, formatted for direct interpretation.

```python
# --- Reviewing Tabular Results ---
# The 'summary_tables' key holds a dictionary of pandas DataFrames.

print("\n--- Summary Tables ---")

# Display Table 1: Statistical Properties
print("\nTable 1: Statistical Properties")
print(final_dashboard_results['summary_tables']['Table1_Statistical_Properties'].round(3))

# Display Table 2: RCMSE Complexity
print("\nTable 2: RCMSE Complexity")
print(final_dashboard_results['summary_tables']['Table2_RCMSE_Complexity'].round(3))

# Display Table 3: MF-DFA Spectrum Widths
print("\nTable 3: MF-DFA Spectrum Widths")
print(final_dashboard_results['summary_tables']['Table3_MFDFA_Spectrum_Widths'].round(3))

# Display Table 4: Hurst Exponents
print("\nTable 4: Hurst Exponents")
print(final_dashboard_results['summary_tables']['Table4_Hurst_Exponents'].round(3))
```

##### **4.2 Reviewing Validation and Verification Reports**

Before trusting the results, it is imperative to check the automated validation and verification reports.

```python
# --- Reviewing Validation and Verification ---
print("\n--- Validation and Verification Reports ---")

# Check the pipeline validation report for any data or methodological issues.
pipeline_validation_status = final_dashboard_results['pipeline_validation_report']['summary']['overall_status']
print(f"\nPipeline Validation Status: {pipeline_validation_status}")
if pipeline_validation_status == 'FAIL':
    print("Errors:", final_dashboard_results['pipeline_validation_report']['summary']['errors'])

# Check the benchmark verification report to ensure our results align with known values.
# Note: This will likely fail with synthetic data but demonstrates the process.
benchmark_report = final_dashboard_results['benchmark_verification_report']
print(f"\nBenchmark Verification Status: {benchmark_report['summary']['overall_status']}")
print("Details:")
for check in benchmark_report['details']:
    print(f"  - Check: {check['check']}, Status: {check['status']}")

```

##### **4.3 Accessing Deeper Results**

The raw numerical results are available for more granular analysis, such as examining the full shuffling distributions or the RCMSE profile of a specific asset.

```python
# --- Accessing Granular Results ---
print("\n--- Granular Result Inspection ---")

# Example: Inspect the shuffling analysis summary for Bitcoin.
btc_shuffling_summary = final_dashboard_results['main_analysis_results']['shuffling_analysis_summary']['BTC-USD']
delta_alpha_z_score = btc_shuffling_summary['delta_alpha']['z_score']
print(f"\nZ-score for BTC-USD delta_alpha (original vs. shuffled): {delta_alpha_z_score:.2f}")
print("A large positive z-score indicates the original data's multifractality is significantly greater than random noise.")

# Example: Access the full RCMSE profile for Gold.
gold_rcmse_profile = final_dashboard_results['main_analysis_results']['GOLD']['rcmse']['rcmse_profile']
print(f"\nFirst 5 RCMSE values for GOLD: {np.round(gold_rcmse_profile[:5], 3)}")
```

This example provides a complete, professional, and reproducible workflow. It demonstrates how the modular, orchestrated framework allows a researcher to move systematically from raw data to a deep, multi-faceted, and verifiable understanding of market complexity with clarity and rigor.




In [None]:
# Task 1: Input Validation and Data Quality Assurance

def _validate_study_config(
    study_config: Dict[str, Any]
) -> None:
    """
    Validates the structure and values of the study configuration dictionary.

    This function performs a deep validation of the configuration dictionary,
    ensuring all required keys, data types, and value constraints are met
    according to the research paper's specifications. It raises specific
    TypeErrors or ValueErrors if any part of the configuration is invalid.

    Args:
        study_config (Dict[str, Any]): The configuration dictionary for the study.

    Returns:
        None: The function returns None if validation is successful.

    Raises:
        KeyError: If a required key or nested key is missing.
        TypeError: If a value has an incorrect data type.
        ValueError: If a value is outside its allowed range or format.
    """
    # Define the validation schema
    schema = {
        "global_parameters": {
            "study_end_date": str,
            "time_series_length": int,
        },
        "analytical_modules": {
            "rcmse_analysis": {
                "hyperparameters": {
                    "embedding_dimension_m": int,
                    "tolerance_r_factor": float,
                    "max_scale_tau": int,
                }
            },
            "mdfa_analysis": {
                "hyperparameters": {
                    "q_range": np.ndarray,
                    "s_range_generation_params": {
                        "s_min": int,
                        "s_max_divisor": int,
                        "num_points": int,
                    },
                    "polynomial_order": int,
                }
            },
        },
    }

    # Helper function for recursive validation
    def _recursive_validate(config_level, schema_level, path=""):
        # Check for missing keys in the config
        if not isinstance(config_level, dict):
            raise TypeError(f"Configuration path '{path}' must be a dictionary.")

        # Iterate through schema keys to ensure they exist in the config
        for key, expected_type in schema_level.items():
            # Build the current path for error messaging
            current_path = f"{path}.{key}" if path else key

            # Check if the key exists in the configuration
            if key not in config_level:
                raise KeyError(f"Missing required key in configuration: '{current_path}'")

            # Get the value from the configuration
            value = config_level[key]

            # If the expected type is a dictionary, recurse
            if isinstance(expected_type, dict):
                _recursive_validate(value, expected_type, current_path)
            # Otherwise, validate the type of the value
            elif not isinstance(value, expected_type):
                raise TypeError(
                    f"Invalid type for '{current_path}'. "
                    f"Expected {expected_type.__name__}, but got {type(value).__name__}."
                )

    # Perform the recursive validation
    _recursive_validate(study_config, schema)

    # --- Perform specific value constraint checks after structure validation ---

    # Validate global_parameters
    gp = study_config["global_parameters"]
    try:
        # Validate study_end_date format
        datetime.strptime(gp["study_end_date"], "%Y-%m-%d")
    except ValueError:
        raise ValueError(
            "Invalid format for 'study_end_date'. Expected 'YYYY-MM-DD'."
        )

    # Validate time_series_length
    if not gp["time_series_length"] >= 1000:
        raise ValueError(
            "'time_series_length' must be a positive integer >= 1000."
        )

    # Validate RCMSE hyperparameters
    rcmse_hp = study_config["analytical_modules"]["rcmse_analysis"]["hyperparameters"]
    m = rcmse_hp["embedding_dimension_m"]
    if not m >= 2:
        raise ValueError("'embedding_dimension_m' must be an integer >= 2.")

    # Validate tolerance_r_factor
    r_factor = rcmse_hp["tolerance_r_factor"]
    if not (0 < r_factor < 1):
        raise ValueError("'tolerance_r_factor' must be a float in the range (0, 1).")

    # Validate max_scale_tau
    if not rcmse_hp["max_scale_tau"] > 0:
        raise ValueError("'max_scale_tau' must be a positive integer.")

    # Validate MF-DFA hyperparameters
    mdfa_hp = study_config["analytical_modules"]["mdfa_analysis"]["hyperparameters"]
    q_range = mdfa_hp["q_range"]
    if 0 in q_range:
        raise ValueError("'q_range' in MF-DFA parameters must not contain zero.")

    # Validate s_range_generation_params
    s_params = mdfa_hp["s_range_generation_params"]
    for key, val in s_params.items():
        if not (isinstance(val, int) and val > 0):
            raise ValueError(f"'s_range_generation_params.{key}' must be a positive integer.")

    # Validate polynomial_order
    if not (isinstance(mdfa_hp["polynomial_order"], int) and mdfa_hp["polynomial_order"] >= 0):
        raise ValueError("'polynomial_order' must be a non-negative integer.")


def _validate_dataframe_quality(
    df_prices: pd.DataFrame,
    expected_columns: List[str]
) -> None:
    """
    Performs a comprehensive quality and structure check on the input DataFrame.

    This function validates that the DataFrame meets all structural and quality
    prerequisites for the analysis. It now includes previously omitted checks for
    business day frequency and the presence of outliers in daily returns, in
    addition to the original checks for index type, column names, data integrity,
    and chronological order. It aggregates all errors and raises a single,
    detailed ValueError if any checks fail.

    Args:
        df_prices (pd.DataFrame): The input DataFrame of daily closing prices.
        expected_columns (List[str]): A list of column names expected to be in
                                      the DataFrame.

    Returns:
        None: Returns None if all quality checks pass.

    Raises:
        TypeError: If the input is not a pandas DataFrame.
        ValueError: If any of the structural or quality checks fail, containing
                    a consolidated report of all issues found.
    """
    # Initialize a list to collect all identified error messages
    error_messages = []

    # --- Prerequisite Check: Input Type ---
    # Ensure the primary input is a pandas DataFrame before proceeding.
    if not isinstance(df_prices, pd.DataFrame):
        # Raise TypeError immediately as other checks are not possible.
        raise TypeError("Input 'df_prices' must be a pandas DataFrame.")

    # --- Step 1.2: DataFrame Structure Validation ---
    # Check if the index is a DatetimeIndex, a fundamental requirement.
    if not isinstance(df_prices.index, pd.DatetimeIndex):
        error_messages.append("DataFrame index is not a DatetimeIndex.")

    # Check if the DataFrame contains the exact set of expected columns.
    if set(df_prices.columns) != set(expected_columns):
        error_messages.append(
            f"DataFrame columns do not match expected. "
            f"Got {list(df_prices.columns)}, expected {expected_columns}."
        )

    # Check if the index is sorted chronologically.
    if not df_prices.index.is_monotonic_increasing:
        error_messages.append("DataFrame index is not chronologically sorted.")

    # Check for the presence of duplicate dates in the index.
    if df_prices.index.duplicated().any():
        error_messages.append("DataFrame contains duplicate index entries.")

    # Check for Business Day frequency.
    inferred_freq = pd.infer_freq(df_prices.index)
    if inferred_freq != 'B':
        error_messages.append(
            f"Inferred DataFrame frequency is not 'B' (Business Day). "
            f"Got '{inferred_freq}'."
        )

    # --- Step 1.3: Data Quality Assessment ---
    # Check for any missing values (NaNs) in the entire DataFrame.
    if df_prices.isnull().values.any():
        error_messages.append(
            "DataFrame contains missing values (NaNs). Counts per column:\n"
            f"{df_prices.isnull().sum()[df_prices.isnull().sum() > 0]}"
        )

    # Check for any non-positive (zero or negative) price values.
    if (df_prices <= 0).values.any():
        error_messages.append(
            "DataFrame contains non-positive prices. Counts per column:\n"
            f"{(df_prices <= 0).sum()[(df_prices <= 0).sum() > 0]}"
        )

    # Check for the presence of significant outliers in daily returns.
    # This is a non-mutating check to flag potential data quality issues.
    if not (df_prices <= 0).values.any(): # Only proceed if log can be taken
        # Calculate log-returns to assess price *changes*.
        log_returns = np.log(df_prices).diff()
        # Define outlier threshold: 3 standard deviations on a 30-day rolling window.
        rolling_std = log_returns.rolling(window=30, min_periods=10).std()
        outlier_threshold = 3 * rolling_std
        # Identify outliers where the absolute return exceeds the threshold.
        outliers_mask = log_returns.abs() > outlier_threshold

        # Check if any outliers were detected.
        if outliers_mask.values.any():
            error_messages.append(
                "Potential outliers detected (daily log-return > 3 rolling std). "
                f"Counts per asset:\n{outliers_mask.sum()[outliers_mask.sum() > 0]}"
            )

    # --- Final Error Reporting ---
    # If the error_messages list is not empty, raise a single comprehensive exception.
    if error_messages:
        # Consolidate all found issues into a single error message.
        consolidated_error_message = "DataFrame validation failed with the following issues:\n- " + "\n- ".join(error_messages)
        raise ValueError(consolidated_error_message)


def _cleanse_and_truncate_data(
    df_prices: pd.DataFrame,
    time_series_length: int,
    study_end_date: str
) -> pd.DataFrame:
    """
    Cleans and truncates the price DataFrame using a sophisticated protocol.

    This function applies a rigorous, multi-stage data cleansing pipeline:
    1.  Sorts the DataFrame chronologically and removes duplicate index entries.
    2.  Removes any rows containing non-positive prices.
    3.  Performs targeted interpolation (forward fill) ONLY on
        isolated gaps of missing data smaller than 3 consecutive periods.
    4.  Removes any remaining rows with missing data (i.e., larger gaps).
    5.  Removes outlier observations, defined as days where any asset's log-return
        exceeds 3 standard deviations from the 30-day rolling mean of absolute
        log-returns.
    6.  Truncates the final clean data to the specified length ending on or
        before the study end date.

    Args:
        df_prices (pd.DataFrame): The raw, validated price DataFrame.
        time_series_length (int): The desired final length of the time series.
        study_end_date (str): The final date for the time series ('YYYY-MM-DD').

    Returns:
        pd.DataFrame: A new, thoroughly cleansed and truncated DataFrame.

    Raises:
        ValueError: If the cleansed data has fewer rows than the required
                    'time_series_length'.
    """
    # --- Step 0: Initialization ---
    # Create a deep copy to ensure the original DataFrame is not modified.
    df_clean = df_prices.copy()

    # --- Step 1: Basic Structural Cleansing ---
    # Sort DataFrame by index to ensure proper chronological order for subsequent operations.
    df_clean = df_clean.sort_index(ascending=True)
    # Remove duplicate index entries, keeping the first occurrence.
    df_clean = df_clean.loc[~df_clean.index.duplicated(keep='first')]
    # Remove any rows with non-positive prices in any column, as log is undefined.
    df_clean = df_clean.loc[(df_clean > 0).all(axis=1)]

    # --- Step 2: Advanced Gap Filling for Isolated NaNs ---
    # This sophisticated logic fills only small, isolated gaps.
    for col in df_clean.columns:
        # Create a series that assigns a unique ID to each contiguous block of NaNs or non-NaNs.
        is_na = df_clean[col].isna()
        nan_blocks = is_na.ne(is_na.shift()).cumsum()
        # Calculate the size of each of these blocks.
        block_sizes = df_clean.groupby(nan_blocks)[col].transform('size')
        # Create a boolean mask that is True only for NaNs that are in small gaps (size < 3).
        small_gaps_mask = (block_sizes < 3) & is_na
        # Apply forward fill ONLY to these identified small gaps.
        df_clean.loc[small_gaps_mask, col] = df_clean[col].ffill()

    # Remove any rows that still contain NaNs (i.e., gaps of size >= 3).
    df_clean.dropna(inplace=True)

    # --- Step 3: Outlier Removal ---
    # This step removes entire rows where any asset exhibits an extreme price change.
    if not df_clean.empty:
        # Calculate daily log-returns for the remaining clean data.
        log_returns = np.log(df_clean).diff()
        # Define the outlier threshold based on a 30-day rolling window.
        # The threshold is 3 standard deviations above the rolling mean of *absolute* returns.
        rolling_mean_abs = log_returns.abs().rolling(window=30, min_periods=10).mean()
        rolling_std_abs = log_returns.abs().rolling(window=30, min_periods=10).std()
        outlier_threshold = rolling_mean_abs + 3 * rolling_std_abs
        # Create a boolean mask that is True for any row containing at least one outlier.
        is_outlier_row = (log_returns.abs() > outlier_threshold).any(axis=1)
        # Filter the DataFrame, keeping only the rows that are NOT outliers.
        df_clean = df_clean.loc[~is_outlier_row]

    # --- Step 4: Final Truncation ---
    # Filter data to be on or before the specified study end date.
    df_clean = df_clean.loc[df_clean.index <= pd.to_datetime(study_end_date)]

    # --- Final Verification ---
    # Check if there is enough data remaining after the full cleansing protocol.
    if len(df_clean) < time_series_length:
        raise ValueError(
            f"Insufficient data after cleansing. Required {time_series_length} "
            f"data points, but only {len(df_clean)} are available up to "
            f"{study_end_date}."
        )

    # Select the most recent 'time_series_length' data points to ensure uniform length.
    df_clean = df_clean.tail(time_series_length)

    return df_clean


def validate_and_prepare_data(
    df_prices: pd.DataFrame,
    study_config: Dict[str, Any],
    expected_columns: List[str] = ["BTC_USD", "GBP_USD", "GC", "NG"]
) -> pd.DataFrame:
    """
    Orchestrates the full data validation and preparation pipeline.

    This function serves as the main entry point for Task 1. It validates the
    study configuration, checks the quality of the input price DataFrame, and
    then applies a rigorous cleansing and truncation protocol.

    Args:
        df_prices (pd.DataFrame): The raw DataFrame of daily closing prices with
                                  a DatetimeIndex.
        study_config (Dict[str, Any]): The configuration dictionary for the study.
        expected_columns (List[str], optional): A list of required column names.
                                                Defaults to the assets from the paper.

    Returns:
        pd.DataFrame: A clean, validated, and truncated DataFrame ready for
                      the log-return transformation and subsequent analysis.
    """
    # Step 1.1: Validate the entire study configuration dictionary
    _validate_study_config(study_config)

    # Step 1.2 & 1.3: Perform comprehensive quality checks on the raw DataFrame
    _validate_dataframe_quality(df_prices, expected_columns)

    # Extract necessary parameters for cleansing
    time_series_length = study_config["global_parameters"]["time_series_length"]
    study_end_date = study_config["global_parameters"]["study_end_date"]

    # Step 1.4: Apply the cleansing and truncation protocol
    df_clean_prices = _cleanse_and_truncate_data(
        df_prices=df_prices,
        time_series_length=time_series_length,
        study_end_date=study_end_date
    )

    # Final verification of the output shape
    if df_clean_prices.shape[0] != time_series_length:
        raise RuntimeError(
            "Internal error: Final cleansed DataFrame shape is incorrect. "
            f"Expected {time_series_length} rows, got {df_clean_prices.shape[0]}."
        )

    return df_clean_prices


In [None]:
# Task 2: Data Preprocessing and Transformation

def _calculate_log_returns(
    df_prices: pd.DataFrame
) -> pd.DataFrame:
    """
    Calculates the logarithmic returns of financial time series.

    This function transforms a DataFrame of prices into a DataFrame of
    logarithmic returns, which is a standard procedure in financial econometrics
    to achieve stationarity and analyze percentage changes.

    The transformation is based on the formula:
    Equation (15): Logarithmic Return = ln(P_t) - ln(P_{t-1})

    Args:
        df_prices (pd.DataFrame): A DataFrame with a DatetimeIndex and columns
                                  of asset prices. All prices must be positive.

    Returns:
        pd.DataFrame: A new DataFrame containing the log-returns for each asset.
                      The resulting DataFrame will have one less row than the input.

    Raises:
        ValueError: If the input DataFrame contains non-positive prices.
    """
    # --- Input Validation ---
    # Ensure all price data is strictly positive before taking the logarithm.
    if (df_prices <= 0).any().any():
        raise ValueError("Input DataFrame contains non-positive prices. Logarithm is undefined.")

    # --- Log-Return Calculation ---
    # Equation (15): Logarithmic Return = ln(P_t) - ln(P_{t-1})
    # This is efficiently implemented by taking the log, then the difference.
    df_log_returns = np.log(df_prices).diff().dropna()

    # --- Output Verification ---
    # Ensure the output is of the correct shape (N-1 rows).
    expected_rows = df_prices.shape[0] - 1
    if df_log_returns.shape[0] != expected_rows:
        raise RuntimeError("Internal error: Log-return calculation resulted in unexpected shape.")

    return df_log_returns.astype(np.float64)


def _characterize_log_returns(
    df_log_returns: pd.DataFrame
) -> Dict[str, Dict[str, float]]:
    """
    Calculates key statistical properties for each log-return series.

    This function computes the mean, standard deviation, skewness, and excess
    kurtosis for each column in the log-returns DataFrame. These statistics
    provide a quantitative description of the return distributions, as
    presented in Table 1 of the source paper.

    Formulas used for characterization:
    - Skewness (γ₁): Based on Equation (16)
    - Excess Kurtosis (γ₂): Based on Equation (17)

    Args:
        df_log_returns (pd.DataFrame): DataFrame of logarithmic returns.

    Returns:
        Dict[str, Dict[str, float]]: A nested dictionary where outer keys are
                                     asset names (columns) and inner keys are
                                     the names of the statistics ('mean',
                                     'std_dev', 'skewness', 'ex_kurtosis').
    """
    # --- Input Validation ---
    if not isinstance(df_log_returns, pd.DataFrame) or df_log_returns.empty:
        raise ValueError("Input must be a non-empty pandas DataFrame.")

    # Initialize the dictionary to store results
    statistical_properties = {}

    # Iterate over each asset (column) in the DataFrame
    for asset_name in df_log_returns.columns:
        # Extract the series for the current asset as a NumPy array for efficiency
        series = df_log_returns[asset_name].values

        # Calculate the mean of the series
        mean_val = np.mean(series)

        # Calculate the sample standard deviation (with Bessel's correction, ddof=1)
        std_dev_val = np.std(series, ddof=1)

        # Calculate the sample skewness using scipy.stats for robustness.
        # bias=False provides an unbiased estimator.
        # Equation (16): Skewness = μ₃ / σ³
        skewness_val = stats.skew(series, bias=False)

        # Calculate the sample excess kurtosis (Fisher's definition) using scipy.stats.
        # This function already calculates (μ₄/σ⁴ - 3).
        # bias=False provides an unbiased estimator.
        # Equation (17): ex-Kurtosis = (μ₄ / σ⁴) - 3
        ex_kurtosis_val = stats.kurtosis(series, fisher=True, bias=False)

        # Store the computed statistics in the results dictionary
        statistical_properties[asset_name] = {
            "mean": mean_val,
            "std_dev": std_dev_val,
            "skewness": skewness_val,
            "ex_kurtosis": ex_kurtosis_val,
        }

    return statistical_properties


def _generate_shuffled_control_set(
    df_log_returns: pd.DataFrame,
    seed: int = 42
) -> pd.DataFrame:
    """
    Generates a shuffled version of the log-return data for control analysis.

    This function creates a surrogate dataset by randomly permuting the temporal
    order of each asset's log-return series independently. This process destroys
    all temporal correlations while preserving the exact marginal distribution
    (i.e., mean, variance, and all higher-order moments) of each series.
    This shuffled set serves as a null-hypothesis benchmark in subsequent
    entropy and multifractal analyses.

    Args:
        df_log_returns (pd.DataFrame): The original DataFrame of log-returns.
        seed (int, optional): A seed for the random number generator to ensure
                              reproducibility. Defaults to 42.

    Returns:
        pd.DataFrame: A new DataFrame with the same shape, index, and columns,
                      but with the values in each column randomly shuffled.
    """
    # --- Input Validation ---
    if not isinstance(df_log_returns, pd.DataFrame) or df_log_returns.empty:
        raise ValueError("Input must be a non-empty pandas DataFrame.")

    # Create a deep copy to ensure the original DataFrame is not modified
    df_shuffled = df_log_returns.copy()

    # Initialize a modern random number generator for reproducibility and best practices
    rng = np.random.default_rng(seed)

    # Iterate over each asset (column) to shuffle it independently
    for col in df_shuffled.columns:
        # Permute the values of the column in-place using the seeded generator
        df_shuffled[col] = rng.permutation(df_shuffled[col].values)

    # --- Output Verification ---
    # A sanity check to confirm the shuffling preserved the distribution (mean).
    # Using a tolerance for floating point comparisons.
    if not np.allclose(df_shuffled.mean(), df_log_returns.mean()):
        raise RuntimeError("Internal error: Shuffling altered series mean unexpectedly.")

    return df_shuffled


def preprocess_and_characterize_data(
    df_clean_prices: pd.DataFrame,
    seed: int = 42
) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Dict[str, float]]]:
    """
    Orchestrates the full data preprocessing and characterization pipeline (Task 2).

    This function takes a clean price DataFrame and performs three key steps:
    1. Calculates the logarithmic returns for each asset.
    2. Computes a detailed statistical characterization of the log-return series.
    3. Generates a shuffled surrogate dataset for control analysis.

    Args:
        df_clean_prices (pd.DataFrame): A clean, validated DataFrame of daily
                                        closing prices from Task 1.
        seed (int, optional): A random seed for generating the shuffled dataset,
                              ensuring reproducibility. Defaults to 42.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Dict[str, float]]]: A tuple
        containing:
            - df_log_returns: The DataFrame of logarithmic returns.
            - df_log_returns_shuffled: The shuffled control DataFrame.
            - statistical_properties: A nested dictionary of statistical moments.
    """
    # Step 2.1: Log-Return Transformation
    df_log_returns = _calculate_log_returns(df_clean_prices)

    # Step 2.2: Statistical Characterization
    statistical_properties = _characterize_log_returns(df_log_returns)

    # Step 2.3: Shuffled Control Dataset Generation
    df_log_returns_shuffled = _generate_shuffled_control_set(df_log_returns, seed)

    return df_log_returns, df_log_returns_shuffled, statistical_properties


In [None]:
# Task 3: RCMSE Implementation Framework

def _compute_sampen_components(
    series: np.ndarray,
    m: int,
    r: float
) -> Tuple[int, int]:
    """
    Core computational engine for Sample Entropy pattern counting.

    This highly optimized function calculates the number of matching template
    vector pairs for embedding dimensions 'm' (n_m) and 'm+1' (n_m_plus_1).
    It uses numpy.lib.stride_tricks to create zero-copy, memory-efficient
    views for the embedding vectors, and then uses NumPy broadcasting for
    ultra-fast, vectorized distance calculations.

    Args:
        series (np.ndarray): The 1D time series data, must be contiguous.
        m (int): The embedding dimension (length of the template vectors).
        r (float): The tolerance radius for matching.

    Returns:
        Tuple[int, int]: A tuple containing:
            - n_m: The total number of matching pairs of length 'm'.
            - n_m_plus_1: The total number of matching pairs of length 'm+1'.
    """
    # --- Input Validation and Preparation ---
    # Ensure the series is a 1D numpy array.
    if not isinstance(series, np.ndarray) or series.ndim != 1:
        raise TypeError("Input 'series' must be a 1D NumPy array.")
    # Get the length of the time series.
    n = series.shape[0]
    # Ensure the series is long enough for the given embedding dimension.
    if n <= m:
        # If the series is too short, no components can be formed.
        return 0, 0

    # --- Step 1: Embedding Vector Construction via Stride Tricks ---
    # This is a highly efficient method to create sliding window views without copying data.
    # Get the byte size of an element in the array.
    itemsize = series.itemsize

    # Create the embedding matrix for dimension m.
    # Equation (1): Xm(i) = {xi, xi+1, ..., xi+m-1}
    shape_m = (n - m + 1, m)
    strides_m = (itemsize, itemsize)
    x_m = np.lib.stride_tricks.as_strided(series, shape=shape_m, strides=strides_m)

    # Create the embedding matrix for dimension m+1.
    shape_m_plus_1 = (n - m, m + 1)
    strides_m_plus_1 = (itemsize, itemsize)
    x_m_plus_1 = np.lib.stride_tricks.as_strided(series, shape=shape_m_plus_1, strides=strides_m_plus_1)

    # --- Step 2: Distance Calculation and Pattern Counting ---
    # Calculate n_m: Number of matches for dimension m.
    # This computes the Chebyshev distance (max absolute difference) between all pairs of vectors.
    # Equation (2): d[Xm(i), Xm(j)] = max(|xi+k - xj+k|)
    # The operation is fully vectorized using NumPy broadcasting for maximum speed.
    dist_m = np.max(np.abs(x_m[:, np.newaxis, :] - x_m[np.newaxis, :, :]), axis=2)

    # Count pairs where the distance is less than or equal to the tolerance r.
    # We sum the boolean mask, subtract the diagonal (to exclude self-matches, i.e., i != j),
    # and divide by 2 to correct for double-counting symmetric pairs (i,j) and (j,i).
    n_m = (np.sum(dist_m <= r) - shape_m[0]) // 2

    # Calculate n_m_plus_1: Number of matches for dimension m+1.
    # The same vectorized logic is applied to the (m+1)-dimensional vectors.
    dist_m_plus_1 = np.max(np.abs(x_m_plus_1[:, np.newaxis, :] - x_m_plus_1[np.newaxis, :, :]), axis=2)

    # Count the unique matching pairs for dimension m+1.
    n_m_plus_1 = (np.sum(dist_m_plus_1 <= r) - shape_m_plus_1[0]) // 2

    return n_m, n_m_plus_1


def compute_rcmse_profile(
    series: np.ndarray,
    rcmse_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Computes the Refined Composite Multiscale Sample Entropy (RCMSE) profile.

    This function implements the full RCMSE algorithm as described in the paper.
    It analyzes the irregularity of a time series across multiple time scales,
    from 1 to 'max_scale_tau'. It uses an overlapping coarse-graining procedure
    for improved stability, especially for shorter time series.

    Methodology:
    1. For scale τ=1, it calculates standard Sample Entropy.
    2. For scales τ>1, it generates τ overlapping coarse-grained series.
    3. It computes pattern counts (n_m, n_m+1) for each coarse-grained series.
    4. It aggregates these counts across all τ series for that scale.
    5. It calculates the final RCMSE value using the aggregated counts, as per
       Equation (6): RCMSE = -ln( (Σ n^(m+1)) / (Σ n^m) ).

    Args:
        series (np.ndarray): The 1D log-return time series for a single asset.
        rcmse_config (Dict[str, Any]): A dictionary containing the RCMSE
                                       hyperparameters: 'embedding_dimension_m',
                                       'tolerance_r_factor', 'max_scale_tau'.

    Returns:
        Dict[str, Any]: A dictionary containing the results:
            - 'rcmse_profile': A 1D NumPy array of RCMSE values for each scale.
            - 'total_complexity': The sum of all values in the rcmse_profile.
    """
    # --- Input Validation ---
    if not isinstance(series, np.ndarray) or series.ndim != 1:
        raise TypeError("Input 'series' must be a 1D NumPy array.")
    if not isinstance(rcmse_config, dict):
        raise TypeError("Input 'rcmse_config' must be a dictionary.")

    # --- Hyperparameter Extraction ---
    m = rcmse_config['embedding_dimension_m']
    r_factor = rcmse_config['tolerance_r_factor']
    max_tau = rcmse_config['max_scale_tau']

    # --- Pre-computation ---
    # Calculate the tolerance 'r' based on the standard deviation of the original series.
    # This is a crucial step: 'r' is fixed across all scales.
    r = r_factor * np.std(series, ddof=1)

    # Get the length of the original series
    n = series.shape[0]

    # Initialize an array to store the RCMSE values for each scale
    rcmse_values = np.zeros(max_tau)

    # --- RCMSE Calculation across Scales ---
    for tau in range(1, max_tau + 1):
        # Initialize total pattern counts for the current scale 'tau'
        total_n_m = 0
        total_n_m_plus_1 = 0

        # --- Case 1: Standard Sample Entropy (Scale tau = 1) ---
        if tau == 1:
            # Directly compute components on the original series
            n_m, n_m_plus_1 = _compute_sampen_components(series, m, r)
            total_n_m += n_m
            total_n_m_plus_1 += n_m_plus_1

        # --- Case 2: Refined Composite MSE (Scale tau > 1) ---
        else:
            # Generate 'tau' overlapping coarse-grained series
            # Equation (5): y^(τ)_(k,j) = (1/τ) * Σ x_i
            for k in range(tau):
                # Create the k-th coarse-grained series
                # We select the appropriate slice and then reshape and average
                end_index = n - (n - k) % tau
                coarse_grained_series = np.mean(series[k:end_index].reshape(-1, tau), axis=1)

                # Ensure the series is long enough for at least one m+1 pattern
                if coarse_grained_series.shape[0] > m:
                    # Compute components for this coarse-grained series
                    n_m, n_m_plus_1 = _compute_sampen_components(coarse_grained_series, m, r)
                    # Aggregate the counts
                    total_n_m += n_m
                    total_n_m_plus_1 += n_m_plus_1

        # --- Calculate Final RCMSE for the current scale 'tau' ---
        # Equation (6): RCMSE(τ) = -ln( Σ(n_m+1) / Σ(n_m) )
        # Check for undefined entropy (zero matches)
        if total_n_m == 0 or total_n_m_plus_1 == 0:
            # If no matches are found, entropy is undefined. Store as NaN.
            rcmse_values[tau - 1] = np.nan
        else:
            # Calculate the RCMSE value and store it
            rcmse_values[tau - 1] = -np.log(total_n_m_plus_1 / total_n_m)

    # --- Calculate Total Complexity ---
    # The total complexity is the sum of the entropy values over all scales.
    # We use np.nansum to handle any potential NaN values gracefully.
    total_complexity = np.nansum(rcmse_values)

    # --- Package and Return Results ---
    results = {
        "rcmse_profile": rcmse_values,
        "total_complexity": total_complexity
    }

    return results


In [None]:
# Task 4: MF-DFA Implementation Framework

def _construct_mdfa_profile(
    series: np.ndarray
) -> np.ndarray:
    """
    Constructs the profile series for MF-DFA analysis.

    This function implements Step 1 of the MF-DFA algorithm. It transforms the
    original time series (typically stationary log-returns) into a non-stationary,
    random walk-like profile by computing the cumulative sum of deviations from
    the mean.

    The transformation is based on the formula:
    Equation (7): Y(i) = Σ_{k=1 to i} (x_k - <x>)

    Args:
        series (np.ndarray): A 1D NumPy array of the time series data.

    Returns:
        np.ndarray: The resulting profile series, Y(i).
    """
    # Calculate the mean of the series
    series_mean = np.mean(series)

    # Center the series by subtracting the mean
    centered_series = series - series_mean

    # Compute the cumulative sum to create the profile
    profile = np.cumsum(centered_series)

    return profile

def _compute_fluctuation_function(
    profile: np.ndarray,
    s_range: np.ndarray,
    q_range: np.ndarray,
    polynomial_order: int
) -> np.ndarray:
    """
    Computes the q-order fluctuation function F_q(s) for all scales and moments.

    This is the computational core of the MF-DFA algorithm, implementing Steps 2, 3,
    and 4. For each scale 's' and moment 'q', it performs:
    1. Bidirectional segmentation of the profile.
    2. Polynomial detrending of each segment.
    3. Calculation of the variance of the residuals.
    4. Averaging of variances to compute F_q(s) as per Equation (10).

    Args:
        profile (np.ndarray): The profile series generated by _construct_mdfa_profile.
        s_range (np.ndarray): An array of integer scales 's' to analyze.
        q_range (np.ndarray): An array of moments 'q' to use.
        polynomial_order (int): The degree of the polynomial for detrending.

    Returns:
        np.ndarray: A 2D array of shape (len(q_range), len(s_range)) containing
                    the F_q(s) values.
    """
    # Get the length of the profile series
    n = profile.shape[0]

    # Initialize the matrix to store F_q(s) values
    f_q_s = np.zeros((len(q_range), len(s_range)))

    # Iterate over each scale in the s_range
    for i, s in enumerate(s_range):
        # --- Step 2: Segmentation ---
        # Calculate the number of segments
        n_s = n // s

        # Reshape the profile for forward and backward segmentation
        # This creates 2*n_s segments in total
        forward_segments = profile[:n_s * s].reshape(n_s, s)
        backward_segments = profile[n - n_s * s:].reshape(n_s, s)
        all_segments = np.vstack([forward_segments, backward_segments])

        # --- Step 3: Detrending ---
        # Create the x-axis (time indices) for polynomial fitting
        x_indices = np.arange(s)

        # Fit polynomial and calculate variance for all segments in a vectorized way
        # np.polyfit can operate on all segments at once if y is 2D
        coeffs = np.polyfit(x_indices, all_segments.T, polynomial_order)
        fitted_trends = np.polyval(coeffs, x_indices).T

        # Calculate the variance (F^2(s,v)) for each segment
        # Equations (8) & (9)
        variances = np.mean((all_segments - fitted_trends)**2, axis=1)

        # --- Step 4: q-order Fluctuation Function ---
        # Iterate over each moment in the q_range
        for j, q in enumerate(q_range):
            # Use only segments with non-zero variance to avoid errors
            valid_variances = variances[variances > 0]

            # Handle the special case for q = 0
            if q == 0:
                # Equation (10) for q=0: Geometric mean
                f_q_s[j, i] = np.exp(0.5 * np.mean(np.log(valid_variances)))
            else:
                # Equation (10) for q!=0: q-th order root mean square
                f_q_s[j, i] = np.mean(valid_variances**(q / 2))**(1 / q)

    return f_q_s

def _extract_scaling_exponents(
    f_q_s: np.ndarray,
    s_range: np.ndarray
) -> np.ndarray:
    """
    Extracts the generalized Hurst exponents h(q) from F_q(s).

    This function implements Step 5 of the MF-DFA algorithm. It performs a
    log-log linear regression of F_q(s) against 's' for each moment 'q'.
    The slope of this regression line is the generalized Hurst exponent h(q).

    The relationship is based on the power law:
    Equation (11): F_q(s) ~ s^h(q)

    Args:
        f_q_s (np.ndarray): The 2D matrix of fluctuation functions.
        s_range (np.ndarray): The array of scales 's' used.

    Returns:
        np.ndarray: A 1D array of the generalized Hurst exponents h(q).
    """
    # Take the logarithm of the scales for the regression
    log_s = np.log10(s_range)

    # Initialize an array to store the h(q) values
    h_q = np.zeros(f_q_s.shape[0])

    # Iterate over each moment q
    for i in range(f_q_s.shape[0]):
        # Take the logarithm of the fluctuation function for the current q
        log_f_q = np.log10(f_q_s[i, :])

        # Perform linear regression (degree 1 polynomial fit)
        # The slope of the fit is the h(q) exponent
        coeffs = np.polyfit(log_s, log_f_q, 1)
        h_q[i] = coeffs[0]

    return h_q

def _compute_singularity_spectrum(
    h_q: np.ndarray,
    q_range: np.ndarray
) -> Dict[str, Any]:
    """
    Calculates the multifractal singularity spectrum from h(q).

    This function performs the final step of the analysis, converting the
    generalized Hurst exponents h(q) into the singularity spectrum f(α) via
    a Legendre transform.

    Formulas used:
    - Equation (12): τ(q) = q*h(q) - 1
    - Equation (13): α(q) = dτ(q)/dq
    - Equation (13): f(α) = q*α - τ(q)

    Args:
        h_q (np.ndarray): The array of generalized Hurst exponents.
        q_range (np.ndarray): The array of moments 'q'.

    Returns:
        Dict[str, Any]: A dictionary containing the singularity spectrum:
            - 'tau_q': The mass exponents.
            - 'alpha': The Hölder exponents (singularity strengths).
            - 'f_alpha': The singularity spectrum.
            - 'delta_alpha': The width of the spectrum (a complexity measure).
    """
    # Equation (12): Calculate the mass exponent tau(q)
    tau_q = q_range * h_q - 1

    # Equation (13): Calculate alpha(q) by differentiating tau(q) w.r.t. q
    # np.gradient provides a robust numerical derivative
    alpha = np.gradient(tau_q, q_range)

    # Equation (13): Calculate the singularity spectrum f(alpha)
    f_alpha = q_range * alpha - tau_q

    # Calculate the width of the spectrum, a key measure of multifractality
    delta_alpha = np.max(alpha) - np.min(alpha)

    # Package the results
    spectrum = {
        "tau_q": tau_q,
        "alpha": alpha,
        "f_alpha": f_alpha,
        "delta_alpha": delta_alpha
    }

    return spectrum

def compute_mfdfa_analysis(
    series: np.ndarray,
    mdfa_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the full Multifractal Detrended Fluctuation Analysis (MF-DFA).

    This function serves as the main pipeline for Task 4. It takes a time series
    and a configuration dictionary, and executes the complete MF-DFA workflow
    to characterize the series' multifractal properties.

    Args:
        series (np.ndarray): The 1D log-return time series for a single asset.
        mdfa_config (Dict[str, Any]): A dictionary containing the MF-DFA
                                      hyperparameters.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing all MF-DFA results,
                        including h(q), the full singularity spectrum, and the
                        spectrum width.
    """
    # --- Input Validation ---
    if not isinstance(series, np.ndarray) or series.ndim != 1:
        raise TypeError("Input 'series' must be a 1D NumPy array.")
    if not isinstance(mdfa_config, dict):
        raise TypeError("Input 'mdfa_config' must be a dictionary.")

    # --- Hyperparameter and Parameter Extraction ---
    hp = mdfa_config['hyperparameters']
    q_range = hp['q_range']
    s_params = hp['s_range_generation_params']
    polynomial_order = hp['polynomial_order']
    n = series.shape[0]

    # Dynamically generate the scale range 's'
    s_min = s_params['s_min']
    s_max = n // s_params['s_max_divisor']
    s_range = np.unique(np.geomspace(s_min, s_max, num=s_params['num_points'], dtype=int))

    # --- Execute MF-DFA Pipeline ---
    # Step 1: Construct the profile
    profile = _construct_mdfa_profile(series)

    # Steps 2-4: Compute the fluctuation function F_q(s)
    f_q_s = _compute_fluctuation_function(profile, s_range, q_range, polynomial_order)

    # Step 5: Extract the scaling exponents h(q)
    h_q = _extract_scaling_exponents(f_q_s, s_range)

    # Final Step: Compute the singularity spectrum
    singularity_spectrum = _compute_singularity_spectrum(h_q, q_range)

    # --- Package and Return All Results ---
    # The Hurst exponent H is h(q) for q=2
    hurst_exponent = h_q[np.where(q_range == 2.0)[0][0]] if 2.0 in q_range else np.nan

    results = {
        "h_q": h_q,
        "hurst_exponent": hurst_exponent,
        "singularity_spectrum": singularity_spectrum,
        "q_range": q_range,
        "s_range": s_range,
        "f_q_s": f_q_s
    }

    return results


In [None]:
# Task 5: Shuffling Control Analysis Framework

def _generate_shuffled_datasets(
    df_log_returns: pd.DataFrame,
    n_shuffles: int = 100,
    seed: int = 42
) -> List[pd.DataFrame]:
    """
    Generates an ensemble of surrogate datasets by shuffling time series.

    This function creates a specified number of surrogate datasets by randomly
    permuting each column of the input log-return DataFrame independently. This
    destroys temporal correlations while preserving the marginal distribution of
    each series, creating a null-model ensemble for statistical testing.

    Args:
        df_log_returns (pd.DataFrame): The original DataFrame of log-returns.
        n_shuffles (int, optional): The number of shuffled surrogate datasets
                                    to generate. Defaults to 100.
        seed (int, optional): A seed for the random number generator to ensure
                              reproducibility. Defaults to 42.

    Returns:
        List[pd.DataFrame]: A list of shuffled pandas DataFrames.
    """
    # --- Input Validation ---
    if not isinstance(df_log_returns, pd.DataFrame) or df_log_returns.empty:
        raise ValueError("Input must be a non-empty pandas DataFrame.")
    if not isinstance(n_shuffles, int) or n_shuffles <= 0:
        raise ValueError("'n_shuffles' must be a positive integer.")

    # Initialize a modern random number generator for reproducibility
    rng = np.random.default_rng(seed)

    # Create the list of shuffled datasets
    shuffled_datasets = []

    for _ in range(n_shuffles):
        # Create a deep copy to avoid modifying the original or other surrogates
        df_shuffled = df_log_returns.copy()

        # Iterate over each column to shuffle it independently
        for col in df_shuffled.columns:
            # Permute the values of the column using the seeded generator
            df_shuffled[col] = rng.permutation(df_shuffled[col].values)

        # Add the new surrogate to the list
        shuffled_datasets.append(df_shuffled)

    return shuffled_datasets

def _worker_compute_metrics(
    args: Tuple[pd.DataFrame, Dict[str, Any], Dict[str, Any]]
) -> Dict[str, Dict[str, float]]:
    """
    A helper function for parallel processing to compute metrics for one surrogate.

    Args:
        args (Tuple): A tuple containing the shuffled DataFrame, RCMSE config,
                      and MF-DFA config.

    Returns:
        Dict[str, Dict[str, float]]: A dictionary of computed metrics for the
                                     given surrogate DataFrame.
    """
    # Unpack arguments
    df_shuffled, rcmse_config, mdfa_config = args

    # Initialize dictionary to store results for this worker
    worker_results = {}

    # Analyze each asset in the shuffled DataFrame
    for asset in df_shuffled.columns:
        # Extract the time series as a NumPy array
        series = df_shuffled[asset].values

        # Compute RCMSE and extract SampEn at scale 1
        rcmse_results = compute_rcmse_profile(series, rcmse_config)
        sampen_tau1 = rcmse_results['rcmse_profile'][0]

        # Compute MF-DFA and extract the spectrum width
        mdfa_results = compute_mfdfa_analysis(series, mdfa_config)
        delta_alpha = mdfa_results['singularity_spectrum']['delta_alpha']

        # Store the results for this asset
        worker_results[asset] = {
            'sampen_tau1': sampen_tau1,
            'delta_alpha': delta_alpha
        }

    return worker_results

def _compute_shuffled_metrics(
    shuffled_datasets: List[pd.DataFrame],
    study_config: Dict[str, Any],
    num_workers: int = -1
) -> Dict[str, Dict[str, List[float]]]:
    """
    Computes complexity metrics for the ensemble of shuffled datasets in parallel.

    This function takes the list of surrogate datasets and applies the RCMSE and
    MF-DFA analyses to each one, distributing the workload across multiple CPU
    cores for efficiency. It collects the key complexity metrics (Sample Entropy
    at scale 1 and multifractal spectrum width) for each shuffle.

    Args:
        shuffled_datasets (List[pd.DataFrame]): The list of surrogate datasets.
        study_config (Dict[str, Any]): The main study configuration dictionary.
        num_workers (int, optional): The number of parallel processes to use.
                                     Defaults to os.cpu_count() - 1.

    Returns:
        Dict[str, Dict[str, List[float]]]: A nested dictionary containing the
                                           distribution of metrics for each asset.
                                           e.g., {'BTC_USD': {'delta_alpha': [0.3, 0.32, ...]}}
    """
    # Determine the number of workers for the pool
    if num_workers < 1:
        num_workers = max(1, os.cpu_count() - 1)

    # Extract necessary configs
    rcmse_config = study_config['analytical_modules']['rcmse_analysis']
    mdfa_config = study_config['analytical_modules']['mdfa_analysis']

    # Prepare arguments for the parallel map function
    tasks = [(df, rcmse_config, mdfa_config) for df in shuffled_datasets]

    # Initialize the final results dictionary
    assets = shuffled_datasets[0].columns
    collated_results = {asset: {'sampen_tau1': [], 'delta_alpha': []} for asset in assets}

    # Run the computations in parallel
    with Pool(processes=num_workers) as pool:
        # map() distributes tasks and collects results in order
        results_list = pool.map(_worker_compute_metrics, tasks)

    # Process and collate the results from the parallel workers
    for single_shuffle_result in results_list:
        for asset in assets:
            collated_results[asset]['sampen_tau1'].append(single_shuffle_result[asset]['sampen_tau1'])
            collated_results[asset]['delta_alpha'].append(single_shuffle_result[asset]['delta_alpha'])

    return collated_results

def conduct_shuffling_analysis(
    df_log_returns: pd.DataFrame,
    original_results: Dict[str, Any],
    study_config: Dict[str, Any],
    n_shuffles: int = 100,
    seed: int = 42
) -> Dict[str, Any]:
    """
    Orchestrates the full shuffling control analysis framework (Task 5).

    This function performs a complete surrogate data analysis to test the
    statistical significance of the complexity metrics found in the original
    data. It generates an ensemble of shuffled datasets, computes their
    complexity metrics in parallel, and then compares these to the original
    results to quantify the presence of meaningful temporal structure.

    Args:
        df_log_returns (pd.DataFrame): The original DataFrame of log-returns.
        original_results (Dict[str, Any]): The dictionary of results from the
                                           analysis of the original data.
        study_config (Dict[str, Any]): The main study configuration dictionary.
        n_shuffles (int, optional): The number of surrogate datasets to create.
                                    Defaults to 100.
        seed (int, optional): A seed for the random number generator.
                              Defaults to 42.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing the comparison
                        between original and shuffled metrics for each asset.
    """
    # --- Step 5.1: Surrogate Data Generation ---
    shuffled_datasets = _generate_shuffled_datasets(df_log_returns, n_shuffles, seed)

    # --- Step 5.2: Parallel Metrics Computation ---
    shuffled_metrics_dist = _compute_shuffled_metrics(shuffled_datasets, study_config)

    # --- Step 5.3: Nonlinearity Quantification ---
    final_analysis_results = {}
    assets = df_log_returns.columns

    for asset in assets:
        # --- Process Delta Alpha (Multifractal) ---
        orig_delta_alpha = original_results[asset]['mdfa']['singularity_spectrum']['delta_alpha']
        shuffled_delta_alphas = np.array(shuffled_metrics_dist[asset]['delta_alpha'])

        mean_shuffled_da = np.mean(shuffled_delta_alphas)
        std_shuffled_da = np.std(shuffled_delta_alphas)

        # Calculate the reduction in complexity
        nl_multifractal = (orig_delta_alpha - mean_shuffled_da) / orig_delta_alpha if orig_delta_alpha != 0 else 0
        # Calculate the z-score (how significant is the original value?)
        z_score_da = (orig_delta_alpha - mean_shuffled_da) / std_shuffled_da if std_shuffled_da != 0 else np.inf

        # --- Process Sample Entropy (Irregularity) ---
        orig_sampen = original_results[asset]['rcmse']['rcmse_profile'][0]
        shuffled_sampens = np.array(shuffled_metrics_dist[asset]['sampen_tau1'])

        mean_shuffled_se = np.mean(shuffled_sampens)
        std_shuffled_se = np.std(shuffled_sampens)

        # Calculate the increase in entropy (loss of regularity)
        nl_entropy = (mean_shuffled_se - orig_sampen) / orig_sampen if orig_sampen != 0 else 0
        # Calculate the z-score
        z_score_se = (orig_sampen - mean_shuffled_se) / std_shuffled_se if std_shuffled_se != 0 else -np.inf

        # Package all results for this asset
        final_analysis_results[asset] = {
            'delta_alpha': {
                'original': orig_delta_alpha,
                'shuffled_mean': mean_shuffled_da,
                'shuffled_std': std_shuffled_da,
                'z_score': z_score_da,
                'nonlinearity_index': nl_multifractal
            },
            'sampen_tau1': {
                'original': orig_sampen,
                'shuffled_mean': mean_shuffled_se,
                'shuffled_std': std_shuffled_se,
                'z_score': z_score_se,
                'nonlinearity_index': nl_entropy
            }
        }

    return final_analysis_results


In [None]:
# Task 6: Orchestrator Function Creation

def _compute_shuffled_metrics(
    shuffled_datasets: List[pd.DataFrame],
    study_config: Dict[str, Any],
    num_workers: int = -1
) -> Dict[str, Dict[str, List[float]]]:
    """
    Computes complexity metrics for the ensemble of shuffled datasets in parallel.
    [Amended to clarify its role as the source of raw metric distributions.]

    This function takes the list of surrogate datasets and applies the RCMSE and
    MF-DFA analyses to each one, distributing the workload across multiple CPU
    cores for efficiency. It collects the key complexity metrics (Sample Entropy
    at scale 1 and multifractal spectrum width) for each shuffle.

    Args:
        shuffled_datasets (List[pd.DataFrame]): The list of surrogate datasets.
        study_config (Dict[str, Any]): The main study configuration dictionary.
        num_workers (int, optional): The number of parallel processes to use.
                                     Defaults to os.cpu_count() - 1.

    Returns:
        Dict[str, Dict[str, List[float]]]: A nested dictionary containing the
                                           raw distribution of metrics for each asset.
                                           e.g., {'BTC_USD': {'delta_alpha': [0.3, 0.32, ...]}}
    """
    # Determine the number of workers for the pool
    if num_workers < 1:
        num_workers = max(1, os.cpu_count() - 1)

    # Extract necessary configs
    rcmse_config = study_config['analytical_modules']['rcmse_analysis']
    mdfa_config = study_config['analytical_modules']['mdfa_analysis']

    # Prepare arguments for the parallel map function
    tasks = [(df, rcmse_config, mdfa_config) for df in shuffled_datasets]

    # Initialize the final results dictionary
    assets = shuffled_datasets[0].columns
    collated_results = {asset: {'sampen_tau1': [], 'delta_alpha': []} for asset in assets}

    # Run the computations in parallel
    with Pool(processes=num_workers) as pool:
        # map() distributes tasks and collects results in order
        results_list = pool.map(_worker_compute_metrics, tasks)

    # Process and collate the results from the parallel workers
    for single_shuffle_result in results_list:
        for asset in assets:
            collated_results[asset]['sampen_tau1'].append(single_shuffle_result[asset]['sampen_tau1'])
            collated_results[asset]['delta_alpha'].append(single_shuffle_result[asset]['delta_alpha'])

    # Return the raw, collated distributions
    return collated_results


def conduct_shuffling_analysis(
    df_log_returns: pd.DataFrame,
    original_results: Dict[str, Any],
    study_config: Dict[str, Any],
    n_shuffles: int = 100,
    seed: int = 42
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """
    Orchestrates the full shuffling control analysis framework (Task 5).
    [Amended to return both summary statistics and raw distributions.]

    This function performs a complete surrogate data analysis. It generates an
    ensemble of shuffled datasets, computes their complexity metrics in parallel,
    and then calculates summary statistics (mean, std, z-score) to compare
    against the original results.

    Args:
        df_log_returns (pd.DataFrame): The original DataFrame of log-returns.
        original_results (Dict[str, Any]): The dictionary of results from the
                                           analysis of the original data.
        study_config (Dict[str, Any]): The main study configuration dictionary.
        n_shuffles (int, optional): The number of surrogate datasets to create.
                                    Defaults to 100.
        seed (int, optional): A seed for the random number generator.
                              Defaults to 42.

    Returns:
        Tuple[Dict[str, Any], Dict[str, Any]]: A tuple containing:
            - summary_results: A dictionary with aggregated comparison metrics.
            - raw_results: A dictionary with the raw lists of shuffled metrics.
    """
    # --- Step 5.1: Surrogate Data Generation ---
    shuffled_datasets = _generate_shuffled_datasets(df_log_returns, n_shuffles, seed)

    # --- Step 5.2: Parallel Metrics Computation ---
    # This now returns the raw distributions of metrics from the shuffled ensemble
    shuffled_metrics_raw = _compute_shuffled_metrics(shuffled_datasets, study_config)

    # --- Step 5.3: Nonlinearity Quantification (Summary Statistics) ---
    summary_results = {}
    assets = df_log_returns.columns

    for asset in assets:
        # --- Process Delta Alpha (Multifractal) ---
        orig_delta_alpha = original_results[asset]['mdfa']['singularity_spectrum']['delta_alpha']
        shuffled_delta_alphas = np.array(shuffled_metrics_raw[asset]['delta_alpha'])

        mean_shuffled_da = np.mean(shuffled_delta_alphas)
        std_shuffled_da = np.std(shuffled_delta_alphas)

        nl_multifractal = (orig_delta_alpha - mean_shuffled_da) / orig_delta_alpha if orig_delta_alpha != 0 else 0
        z_score_da = (orig_delta_alpha - mean_shuffled_da) / std_shuffled_da if std_shuffled_da != 0 else np.inf

        # --- Process Sample Entropy (Irregularity) ---
        orig_sampen = original_results[asset]['rcmse']['rcmse_profile'][0]
        shuffled_sampens = np.array(shuffled_metrics_raw[asset]['sampen_tau1'])

        mean_shuffled_se = np.mean(shuffled_sampens)
        std_shuffled_se = np.std(shuffled_sampens)

        nl_entropy = (mean_shuffled_se - orig_sampen) / orig_sampen if orig_sampen != 0 else 0
        z_score_se = (orig_sampen - mean_shuffled_se) / std_shuffled_se if std_shuffled_se != 0 else -np.inf

        # Package the summary results for this asset
        summary_results[asset] = {
            'delta_alpha': {
                'original': orig_delta_alpha,
                'shuffled_mean': mean_shuffled_da,
                'shuffled_std': std_shuffled_da,
                'z_score': z_score_da,
                'nonlinearity_index': nl_multifractal
            },
            'sampen_tau1': {
                'original': orig_sampen,
                'shuffled_mean': mean_shuffled_se,
                'shuffled_std': std_shuffled_se,
                'z_score': z_score_se,
                'nonlinearity_index': nl_entropy
            }
        }

    # Return both the summary and the raw distributions
    return summary_results, shuffled_metrics_raw


def run_complexity_analysis_pipeline(
    df_prices: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the complete, end-to-end financial complexity analysis pipeline.

    This master function serves as the single entry point to execute the entire
    research methodology described in the paper.

    Args:
        df_prices (pd.DataFrame): The raw DataFrame of daily closing prices.
        study_config (Dict[str, Any]): The configuration dictionary for the study.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing all results.
    """
    try:
        # --- Task 1: Input Validation and Data Quality Assurance ---
        logging.info("Starting Task 1: Data Validation and Preparation...")
        df_clean_prices = validate_and_prepare_data(df_prices, study_config)
        logging.info("Task 1 completed successfully.")

        # --- Task 2: Data Preprocessing and Transformation ---
        logging.info("Starting Task 2: Data Preprocessing and Characterization...")
        df_log_returns, _, statistical_properties = preprocess_and_characterize_data(df_clean_prices)
        logging.info("Task 2 completed successfully.")

        # Initialize the main results dictionary
        all_results = {asset: {} for asset in df_log_returns.columns}

        # --- Tasks 3 & 4: Per-Asset Complexity Analysis ---
        for asset in df_log_returns.columns:
            logging.info(f"Analyzing asset: {asset}...")
            series = df_log_returns[asset].values
            all_results[asset]['stats'] = statistical_properties[asset]

            logging.info(f"  Running RCMSE analysis for {asset}...")
            rcmse_config = study_config['analytical_modules']['rcmse_analysis']
            all_results[asset]['rcmse'] = compute_rcmse_profile(series, rcmse_config)

            logging.info(f"  Running MF-DFA analysis for {asset}...")
            mdfa_config = study_config['analytical_modules']['mdfa_analysis']
            all_results[asset]['mdfa'] = compute_mfdfa_analysis(series, mdfa_config)

        # --- Task 5: Shuffling Control Analysis ---
        logging.info("Starting Task 5: Shuffling Control Analysis...")
        # The call now unpacks two results: the summary and the raw distributions
        shuffling_summary, shuffling_raw = conduct_shuffling_analysis(
            df_log_returns, all_results, study_config
        )

        # Add both results to the main dictionary under distinct, clear keys
        all_results['shuffling_analysis_summary'] = shuffling_summary
        all_results['shuffling_analysis_raw'] = shuffling_raw
        logging.info("Task 5 completed successfully.")

        logging.info("Complexity analysis pipeline finished successfully.")
        return all_results

    except (ValueError, TypeError, KeyError, RuntimeError) as e:
        logging.error(f"Pipeline failed with an error: {e}")
        raise

def validate_analysis_results(
    analysis_results: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Performs a quality control check on the final analysis results.

    This function runs a series of "sanity checks" on the computed metrics to
    ensure they are physically and theoretically plausible. It verifies the
    properties of entropy, multifractal spectra, and the effects of shuffling.

    Args:
        analysis_results (Dict[str, Any]): The comprehensive results dictionary
                                            from the main analysis pipeline.

    Returns:
        Dict[str, Any]: A report dictionary summarizing the pass/fail status
                        of each validation check.
    """
    logging.info("Starting final results validation...")
    validation_report = {'summary': {'overall_status': 'PASS'}, 'details': {}}
    error_list = []

    assets = [k for k in analysis_results.keys() if k != 'shuffling_analysis']

    for asset in assets:
        asset_report = {}
        # --- RCMSE Validation ---
        rcmse_profile = analysis_results[asset]['rcmse']['rcmse_profile']
        # Check if all entropy values are non-negative (ignoring NaNs)
        if not np.all(rcmse_profile[~np.isnan(rcmse_profile)] >= 0):
            msg = f"{asset}: RCMSE profile contains negative values."
            error_list.append(msg)
            asset_report['rcmse_non_negative'] = f'FAIL - {msg}'
        else:
            asset_report['rcmse_non_negative'] = 'PASS'

        # --- MF-DFA Validation ---
        mdfa = analysis_results[asset]['mdfa']
        spec = mdfa['singularity_spectrum']
        # Check if spectrum width is non-negative
        if not spec['delta_alpha'] >= 0:
            msg = f"{asset}: MF-DFA delta_alpha is negative ({spec['delta_alpha']:.4f})."
            error_list.append(msg)
            asset_report['mdfa_delta_alpha_positive'] = f'FAIL - {msg}'
        else:
            asset_report['mdfa_delta_alpha_positive'] = 'PASS'

        # Check if f(alpha) peak is close to 1.0
        f_alpha_max = np.max(spec['f_alpha'])
        if not np.isclose(f_alpha_max, 1.0, atol=0.15):
            msg = f"{asset}: MF-DFA f(alpha) peak is not close to 1.0 (is {f_alpha_max:.4f})."
            error_list.append(msg)
            asset_report['mdfa_f_alpha_peak'] = f'FAIL - {msg}'
        else:
            asset_report['mdfa_f_alpha_peak'] = 'PASS'

        # Check if Hurst exponent is in a plausible range
        H = mdfa['hurst_exponent']
        if not (0.3 < H < 0.8):
            msg = f"{asset}: Hurst exponent H is outside plausible range [0.3, 0.8] (is {H:.4f})."
            error_list.append(msg)
            asset_report['mdfa_hurst_plausible'] = f'FAIL - {msg}'
        else:
            asset_report['mdfa_hurst_plausible'] = 'PASS'

        # --- Shuffling Analysis Validation ---
        shuffling = analysis_results['shuffling_analysis'][asset]
        # Check if shuffling reduced multifractality (positive z-score)
        if not shuffling['delta_alpha']['z_score'] > 2.0:
            msg = f"{asset}: Shuffling did not significantly reduce delta_alpha (z-score: {shuffling['delta_alpha']['z_score']:.2f})."
            error_list.append(msg)
            asset_report['shuffling_delta_alpha_reduction'] = f'FAIL - {msg}'
        else:
            asset_report['shuffling_delta_alpha_reduction'] = 'PASS'

        # Check if shuffling increased entropy (negative z-score)
        if not shuffling['sampen_tau1']['z_score'] < -2.0:
            msg = f"{asset}: Shuffling did not significantly increase SampEn (z-score: {shuffling['sampen_tau1']['z_score']:.2f})."
            error_list.append(msg)
            asset_report['shuffling_sampen_increase'] = f'FAIL - {msg}'
        else:
            asset_report['shuffling_sampen_increase'] = 'PASS'

        validation_report['details'][asset] = asset_report

    if error_list:
        validation_report['summary']['overall_status'] = 'FAIL'
        validation_report['summary']['errors'] = error_list
        logging.warning("Results validation failed with issues.")
    else:
        logging.info("Results validation passed successfully.")

    return validation_report


In [None]:
# Task 7: Robustness Analysis Implementation

def _sensitivity_worker(
    args: Tuple[pd.DataFrame, Dict[str, Any], Tuple[str, ...], Tuple[Any, ...]]
) -> Dict[str, Any]:
    """
    Worker function for parallel sensitivity analysis. Executes one pipeline run.

    Args:
        args: A tuple containing the raw price data, the base study config,
              a tuple of parameter names being varied, and a tuple of their values.

    Returns:
        A dictionary with the parameters used and the key results (complexity ranks).
    """
    # Unpack arguments
    df_prices, base_config, param_names, param_values = args

    # Create a deep copy of the config to modify for this run
    run_config = copy.deepcopy(base_config)

    # Update the config with the current parameter combination
    param_dict = {}
    for name, value in zip(param_names, param_values):
        # Navigate the nested dictionary to set the parameter value
        keys = name.split('.')
        d = run_config
        for key in keys[:-1]:
            d = d[key]
        d[keys[-1]] = value
        param_dict[name] = value

    try:
        # Run the full analysis pipeline with the modified config
        results = run_complexity_analysis_pipeline(df_prices, run_config)

        # Extract key results for ranking: RCMSE total complexity and MF-DFA delta_alpha
        assets = [k for k in results.keys() if k != 'shuffling_analysis']
        rcmse_scores = {asset: results[asset]['rcmse']['total_complexity'] for asset in assets}
        mdfa_scores = {asset: results[asset]['mdfa']['singularity_spectrum']['delta_alpha'] for asset in assets}

        # Convert scores to ranks (higher score = higher rank)
        rcmse_ranks = stats.rankdata(list(rcmse_scores.values()), method='ordinal')
        mdfa_ranks = stats.rankdata(list(mdfa_scores.values()), method='ordinal')

        # Combine results
        output = param_dict
        for i, asset in enumerate(assets):
            output[f'rank_rcmse_{asset}'] = rcmse_ranks[i]
            output[f'rank_mdfa_{asset}'] = mdfa_ranks[i]

        return output

    except Exception as e:
        # Log the error and return a failure indicator
        logging.error(f"Sensitivity run failed for params {param_dict}: {e}")
        return {**param_dict, 'status': 'FAIL'}


def run_parameter_sensitivity_analysis(
    df_prices: pd.DataFrame,
    base_config: Dict[str, Any],
    parameter_grid: Dict[str, List[Any]],
    num_workers: int = -1
) -> pd.DataFrame:
    """
    Conducts a systematic sensitivity analysis on key model hyperparameters.

    This function explores how the final complexity rankings of assets change
    when key hyperparameters (e.g., embedding dimension 'm', polynomial order)
    are varied. It runs the entire analysis pipeline for each combination of
    parameters in the provided grid, distributing the computations in parallel.

    Args:
        df_prices (pd.DataFrame): The raw DataFrame of daily closing prices.
        base_config (Dict[str, Any]): The baseline study configuration.
        parameter_grid (Dict[str, List[Any]]): A dictionary where keys are the
            full path to a hyperparameter (e.g., 'analytical_modules.rcmse_analysis.
            hyperparameters.embedding_dimension_m') and values are lists of
            alternative values to test.
        num_workers (int, optional): Number of parallel processes. Defaults to
                                     os.cpu_count() - 1.

    Returns:
        pd.DataFrame: A DataFrame summarizing the complexity rankings for each
                      parameter combination, allowing for stability assessment.
    """
    logging.info("Starting parameter sensitivity analysis...")

    # --- Input Validation ---
    if not parameter_grid:
        logging.warning("Parameter grid is empty. Skipping sensitivity analysis.")
        return pd.DataFrame()

    # Determine the number of workers
    if num_workers < 1:
        num_workers = max(1, os.cpu_count() - 1)

    # Generate all combinations of parameters from the grid
    param_names = list(parameter_grid.keys())
    param_values_list = list(parameter_grid.values())
    param_combinations = list(itertools.product(*param_values_list))

    logging.info(f"Testing {len(param_combinations)} parameter combinations across {num_workers} workers.")

    # Prepare tasks for the multiprocessing pool
    tasks = [(df_prices, base_config, tuple(param_names), combo) for combo in param_combinations]

    # Run the analysis in parallel
    with Pool(processes=num_workers) as pool:
        results_list = pool.map(_sensitivity_worker, tasks)

    # Filter out any failed runs and convert the results to a DataFrame
    successful_results = [res for res in results_list if res.get('status') != 'FAIL']
    results_df = pd.DataFrame(successful_results)

    logging.info("Parameter sensitivity analysis completed.")
    return results_df

def conduct_statistical_significance_tests(
    shuffling_summary: Dict[str, Any],
    shuffling_raw: Dict[str, Any],
    alpha: float = 0.05
) -> Dict[str, Any]:
    """
    Performs statistical tests on the shuffling analysis results.
    [Amended to correctly access summary and raw shuffling data.]

    This function assesses the statistical significance of the difference
    between the original complexity metrics and the distribution of metrics
    from the surrogate (shuffled) data. It calculates empirical p-values,
    effect sizes (z-scores), and applies a Bonferroni correction to account
    for multiple comparisons.

    Args:
        shuffling_summary (Dict[str, Any]): The dictionary containing the
            aggregated comparison metrics (original value, shuffled mean, etc.).
        shuffling_raw (Dict[str, Any]): The dictionary containing the raw lists
            of metrics from each shuffle.
        alpha (float, optional): The significance level for hypothesis testing.
                                 Defaults to 0.05.

    Returns:
        Dict[str, Any]: A dictionary containing detailed statistical test
                        results for each asset and metric.
    """
    logging.info("Conducting statistical significance tests...")

    # --- Input Validation ---
    if not shuffling_summary or not shuffling_raw:
        raise ValueError("Input dictionaries for shuffling analysis cannot be empty.")

    assets = list(shuffling_summary.keys())
    metrics = list(shuffling_summary[assets[0]].keys())
    num_tests = len(assets) * len(metrics)

    # Apply Bonferroni correction to the significance level
    corrected_alpha = alpha / num_tests

    test_report = {'summary': {'alpha': alpha, 'corrected_alpha': corrected_alpha}, 'details': {}}

    for asset in assets:
        asset_report = {}
        for metric in metrics:
            # --- Data Extraction from Correct Sources ---
            # Get the summary data for the current metric
            metric_summary = shuffling_summary[asset][metric]
            # Get the original value from the summary
            original_value = metric_summary['original']
            # Get the raw shuffled values for testing
            shuffled_values = np.array(shuffling_raw[asset][metric])
            n_shuffles = len(shuffled_values)

            # --- Empirical P-value Calculation ---
            # This is a one-sided test based on the expected direction of change.
            if metric == 'delta_alpha':
                # Test hypothesis: original > shuffled. p-value is the probability
                # of observing a shuffled value as large or larger than the original.
                p_value = np.sum(shuffled_values >= original_value) / n_shuffles
            else:  # For sampen_tau1
                # Test hypothesis: original < shuffled. p-value is the probability
                # of observing a shuffled value as small or smaller than the original.
                p_value = np.sum(shuffled_values <= original_value) / n_shuffles

            # --- Bootstrap Confidence Interval for the Shuffled Mean ---
            # Generate bootstrap samples of the mean
            bootstrap_means = [np.mean(np.random.choice(shuffled_values, size=n_shuffles, replace=True)) for _ in range(1000)]
            # Calculate the 95% confidence interval
            ci_lower = np.percentile(bootstrap_means, 2.5)
            ci_upper = np.percentile(bootstrap_means, 97.5)

            # --- Final Assessment ---
            # Determine if the result is statistically significant after correction
            is_significant = p_value < corrected_alpha

            # Package the detailed statistical report for this metric
            asset_report[metric] = {
                'p_value': p_value,
                'is_significant_at_corrected_alpha': is_significant,
                'z_score': metric_summary['z_score'],
                'shuffled_mean_ci_95': (ci_lower, ci_upper)
            }
        # Add the asset's full report to the main report
        test_report['details'][asset] = asset_report

    logging.info("Statistical significance testing completed.")
    return test_report


In [None]:
# Main Pipeline

def generate_summary_tables(
    analysis_results: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Generates publication-quality summary tables from the analysis results.

    This function processes the raw results dictionary and formats the key
    findings into pandas DataFrames that replicate the main tables presented
    in the source research paper.

    Args:
        analysis_results (Dict[str, Any]): The comprehensive results dictionary
                                            from the main analysis pipeline.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary of pandas DataFrames, where each
                                 key corresponds to a table number (e.g., 'Table1').
    """
    logging.info("Generating summary tables...")
    assets = [k for k in analysis_results.keys() if k not in ['shuffling_analysis_summary', 'shuffling_analysis_raw']]
    tables = {}

    # --- Table 1: Skewness, ex-Kurtosis, and standard deviation ---
    table1_data = {asset: {
        'Skewness': analysis_results[asset]['stats']['skewness'],
        'ex-Kurtosis': analysis_results[asset]['stats']['ex_kurtosis'],
        'Standard Deviation': analysis_results[asset]['stats']['std_dev']
    } for asset in assets}
    df_table1 = pd.DataFrame(table1_data).T
    tables['Table1_Statistical_Properties'] = df_table1

    # --- Table 2: RCMSE Complexity values ---
    table2_data = {asset: {
        'Complexity': analysis_results[asset]['rcmse']['total_complexity'],
        'SampEn (τ=1)': analysis_results[asset]['rcmse']['rcmse_profile'][0]
    } for asset in assets}
    df_table2 = pd.DataFrame(table2_data).T
    tables['Table2_RCMSE_Complexity'] = df_table2

    # --- Table 3: MF-DFA Spectrum Widths ---
    table3_data = {asset: {
        'Spectrum Width': analysis_results[asset]['mdfa']['singularity_spectrum']['delta_alpha'],
        'α_max': np.max(analysis_results[asset]['mdfa']['singularity_spectrum']['alpha']),
        'α_min': np.min(analysis_results[asset]['mdfa']['singularity_spectrum']['alpha'])
    } for asset in assets}
    df_table3 = pd.DataFrame(table3_data).T
    tables['Table3_MFDFA_Spectrum_Widths'] = df_table3

    # --- Table 4: Hurst Exponents ---
    table4_data = {asset: {
        'Hurst Exponent': analysis_results[asset]['mdfa']['hurst_exponent']
    } for asset in assets}
    df_table4 = pd.DataFrame(table4_data).T
    tables['Table4_Hurst_Exponents'] = df_table4

    logging.info("Summary tables generated successfully.")
    return tables

def generate_publication_visualizations(
    analysis_results: Dict[str, Any],
    output_dir: str = 'results/figures'
) -> Dict[str, str]:
    """
    Generates and saves publication-quality visualizations of the results.

    Args:
        analysis_results (Dict[str, Any]): The comprehensive results dictionary.
        output_dir (str, optional): Directory to save the plot images.

    Returns:
        Dict[str, str]: A dictionary mapping figure names to their file paths.
    """
    logging.info(f"Generating visualizations in '{output_dir}'...")
    os.makedirs(output_dir, exist_ok=True)
    plt.style.use('seaborn-v0_8-whitegrid')
    paths = {}
    assets = [k for k in analysis_results.keys() if k not in ['shuffling_analysis_summary', 'shuffling_analysis_raw']]

    # --- Figure 6: RCMSE Profile ---
    fig, ax = plt.subplots(figsize=(10, 6))
    for asset in assets:
        ax.plot(analysis_results[asset]['rcmse']['rcmse_profile'], label=asset, marker='o', markersize=3, linestyle='-')
    ax.set_xlabel('Scale (τ)')
    ax.set_ylabel('RCMSE')
    ax.set_title('Refined Composite Multiscale Sample Entropy (RCMSE) Profiles')
    ax.legend()
    path = os.path.join(output_dir, 'Fig6_RCMSE_Profiles.png')
    fig.savefig(path, dpi=300, bbox_inches='tight')
    plt.close(fig)
    paths['Fig6_RCMSE_Profiles'] = path

    # --- Figure 8: Singularity Spectra ---
    fig, ax = plt.subplots(figsize=(10, 6))
    for asset in assets:
        spec = analysis_results[asset]['mdfa']['singularity_spectrum']
        ax.plot(spec['alpha'], spec['f_alpha'], label=f"{asset} (Δα = {spec['delta_alpha']:.2f})")
    ax.set_xlabel('Hölder Exponent (α)')
    ax.set_ylabel('Singularity Spectrum (f(α))')
    ax.set_title('Multifractal Singularity Spectra')
    ax.legend()
    path = os.path.join(output_dir, 'Fig8_Singularity_Spectra.png')
    fig.savefig(path, dpi=300, bbox_inches='tight')
    plt.close(fig)
    paths['Fig8_Singularity_Spectra'] = path

    logging.info("Visualizations generated successfully.")
    return paths


def _get_nested_value(
    data: Dict[str, Any],
    path: Tuple[str, ...]
) -> Tuple[Optional[Any], bool]:
    """
    Safely retrieves a value from a nested dictionary using a path tuple.

    Args:
        data (Dict[str, Any]): The nested dictionary to search within.
        path (Tuple[str, ...]): A tuple of keys representing the path to the value.

    Returns:
        Tuple[Optional[Any], bool]: A tuple containing:
            - The retrieved value, or None if the path is invalid.
            - A boolean indicating if the retrieval was successful.
    """
    # Start at the top level of the dictionary.
    current_level = data
    # Iterate through the keys in the path tuple to traverse the dictionary.
    for key in path:
        # Check if the current level is a dictionary and contains the next key.
        if isinstance(current_level, dict) and key in current_level:
            # Move to the next level down.
            current_level = current_level[key]
        else:
            # If the key is not found or the level is not a dict, the path is invalid.
            logging.warning(f"Benchmark verification failed: Path '{path}' not found at key '{key}'.")
            return None, False
    # If the loop completes, the value was found successfully.
    return current_level, True

def verify_results_against_benchmarks(
    analysis_results: Dict[str, Any],
    benchmarks: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
    """
    Verifies key results against established benchmarks from the source paper.
    [Re-implemented with a robust and safe nested data access mechanism.]

    This function provides a final, critical layer of validation by comparing
    key quantitative outputs of the analysis pipeline against known, expected
    values (e.g., from the original research paper). It uses a robust method
    for accessing nested data and provides a detailed, transparent report of
    the outcome of each check.

    The benchmark dictionary must use a tuple of keys for the 'path'. For example:
    'path': ('BTC-USD', 'rcmse', 'total_complexity')

    Args:
        analysis_results (Dict[str, Any]): The comprehensive results dictionary
                                            from the main analysis pipeline.
        benchmarks (Dict[str, Dict[str, Any]]): A dictionary where each key is a
            check name and the value is a dictionary containing the 'path'
            (as a tuple), the 'expected' value, and the relative tolerance 'rtol'.

    Returns:
        Dict[str, Any]: A verification report with an overall status and a
                        detailed list of results for each individual check.
    """
    # Initialize the report with a default pass status.
    logging.info("Verifying results against benchmarks...")
    report = {'summary': {'overall_status': 'PASS'}, 'details': []}

    # Iterate through each benchmark check defined in the input dictionary.
    for check_name, check_params in benchmarks.items():
        # --- Step 1: Robustly Extract Actual Value ---
        # Use the safe helper function to retrieve the nested value.
        actual_value, success = _get_nested_value(analysis_results, check_params['path'])

        # If the path was not found, record the failure and continue.
        if not success:
            report['summary']['overall_status'] = 'FAIL'
            report['details'].append({
                'check': check_name,
                'status': 'PATH_NOT_FOUND',
                'path': check_params['path'],
                'message': 'The specified path does not exist in the results dictionary.'
            })
            continue

        # --- Step 2: Perform Comparison ---
        # Extract the expected value and tolerance for the current check.
        expected_value = check_params['expected']
        relative_tolerance = check_params['rtol']

        # Use numpy.isclose for robust floating-point comparison.
        is_pass = np.isclose(actual_value, expected_value, rtol=relative_tolerance)

        # Determine the status string for the report.
        status = 'PASS' if is_pass else 'FAIL'

        # If any check fails, update the overall status of the report.
        if not is_pass:
            report['summary']['overall_status'] = 'FAIL'

        # --- Step 3: Append Detailed Results to Report ---
        # Store a complete record of the check in the report details.
        report['details'].append({
            'check': check_name,
            'status': status,
            'path': check_params['path'],
            'actual': actual_value,
            'expected': expected_value,
            'tolerance': relative_tolerance
        })

    # Log the final outcome of the verification process.
    logging.info(f"Verification complete. Overall status: {report['summary']['overall_status']}")

    return report


class NumpyJSONEncoder(json.JSONEncoder):
    """
    Custom JSON encoder for NumPy data types.

    This encoder handles the serialization of common NumPy types that are not
    natively supported by the standard `json` library, such as ndarrays,
    integers, and floats.
    """
    def default(self, obj):
        # If the object is a NumPy integer, convert it to a Python int.
        if isinstance(obj, np.integer):
            return int(obj)
        # If the object is a NumPy float, convert it to a Python float.
        elif isinstance(obj, np.floating):
            return float(obj)
        # If the object is a NumPy boolean, convert it to a Python bool.
        elif isinstance(obj, np.bool_):
            return bool(obj)
        # If the object is a NumPy array, convert it to a nested Python list.
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        # For any other types, let the base class handle it.
        return super(NumpyJSONEncoder, self).default(obj)

def market_complexity_dashboard_generator(
    df_prices: pd.DataFrame,
    study_config: Dict[str, Any],
    run_sensitivity: bool = False
) -> Dict[str, Any]:
    """
    Master orchestrator for the entire Market Complexity Dashboard framework.
    [Re-implemented to be a complete, fully functional, end-to-end pipeline.]

    This function serves as the ultimate single entry point, executing the full
    scientific workflow from raw data to final, verified reports. It runs the
    main analysis pipeline, optionally conducts a parameter sensitivity analysis,
    generates all tables and figures, verifies the results against known
    benchmarks, and saves the complete output for reproducibility.

    Args:
        df_prices (pd.DataFrame): The raw DataFrame of daily closing prices.
        study_config (Dict[str, Any]): The configuration dictionary for the study.
        run_sensitivity (bool, optional): Whether to run the computationally
            intensive parameter sensitivity analysis. Defaults to False.

    Returns:
        Dict[str, Any]: A master dictionary containing all artifacts of the
                        study: main results, validation reports, tables,
                        figure paths, and sensitivity analysis results.
    """
    # --- a. Run the main analysis pipeline and validate its raw output ---
    # Execute the primary, end-to-end analysis.
    main_results = run_complexity_analysis_pipeline(df_prices, study_config)
    # Perform a quality control check on the generated results.
    pipeline_validation_report = validate_analysis_results(main_results)

    # --- b. Optionally, run the robustness analysis ---
    sensitivity_results_df = None
    if run_sensitivity:
        # Define a sample parameter grid for the sensitivity analysis.
        param_grid = {
            'analytical_modules.rcmse_analysis.hyperparameters.embedding_dimension_m': [2, 3],
            'analytical_modules.mdfa_analysis.hyperparameters.polynomial_order': [1, 2]
        }
        sensitivity_results_df = run_parameter_sensitivity_analysis(df_prices, study_config, param_grid)

    # --- c. Execute Task 8: Output Generation and Verification ---
    # Step 8.1: Generate summary tables from the main results.
    summary_tables = generate_summary_tables(main_results)

    # Step 8.2: Generate and save publication-quality visualizations.
    figure_paths = generate_publication_visualizations(main_results)

    # Step 8.3: Verify key results against established benchmarks.
    # Define benchmarks with the robust tuple-based path structure.
    # Asset names must exactly match the DataFrame column names.
    asset_names = df_prices.columns
    benchmarks = {
        'BTC_RCMSE_Complexity': {
            'path': (asset_names[0], 'rcmse', 'total_complexity'),
            'expected': 74.66,
            'rtol': 0.15 # 15% tolerance due to potential minor implementation differences
        },
        'BTC_MFDFA_Width': {
            'path': (asset_names[0], 'mdfa', 'singularity_spectrum', 'delta_alpha'),
            'expected': 0.62,
            'rtol': 0.15 # 15% tolerance
        }
    }
    benchmark_verification_report = verify_results_against_benchmarks(main_results, benchmarks)

    # --- d. Assemble the final dashboard dictionary and save results ---
    # Create the final, comprehensive output object.
    final_dashboard = {
        'main_analysis_results': main_results,
        'pipeline_validation_report': pipeline_validation_report,
        'sensitivity_analysis_results': sensitivity_results_df,
        'summary_tables': summary_tables,
        'figure_paths': figure_paths,
        'benchmark_verification_report': benchmark_verification_report
    }

    # --- Data Persistence for Reproducibility ---
    # Define the output directory for raw data.
    output_dir = 'results/data'
    # Create the directory if it does not exist.
    os.makedirs(output_dir, exist_ok=True)
    # Define the full path for the output file.
    results_filepath = os.path.join(output_dir, 'main_analysis_results.json')

    # Save the main results dictionary to a JSON file.
    # Use the custom NumpyJSONEncoder to handle NumPy data types.
    logging.info(f"Saving complete results to '{results_filepath}'...")
    with open(results_filepath, 'w') as f:
        json.dump(main_results, f, indent=4, cls=NumpyJSONEncoder)
    logging.info("Results saved successfully.")

    return final_dashboard
