# README.md

# *Mapping Crisis-Driven Market Dynamics: A Transfer Entropy and Kramers–Moyal Approach to Financial Networks* Implementation
<br>

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/imports-isort-1674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Type Checking: mypy](https://img.shields.io/badge/type_checking-mypy-blue)](http://mypy-lang.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%230C55A5.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![Matplotlib](https://img.shields.io/badge/Matplotlib-%23ffffff.svg?style=flat&logo=Matplotlib&logoColor=black)](https://matplotlib.org/)
[![Seaborn](https://img.shields.io/badge/seaborn-%233776AB.svg?style=flat&logo=python&logoColor=white)](https://seaborn.pydata.org/)
[![NetworkX](https://img.shields.io/badge/NetworkX-blue.svg?style=flat&logo=python&logoColor=white)](https://networkx.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2507.09554-b31b1b.svg)](https://arxiv.org/abs/2507.09554)
[![DOI](https://img.shields.io/badge/DOI-10.48550/arXiv.2507.09554-blue)](https://doi.org/10.48550/arXiv.2507.09554)
[![Research](https://img.shields.io/badge/Research-Financial%20Networks-green)](https://github.com/chirindaopensource/mapping_crisis_driven_market_dynamics)
[![Discipline](https://img.shields.io/badge/Discipline-Econophysics-blue)](https://github.com/chirindaopensource/mapping_crisis_driven_market_dynamics)
[![Methodology](https://img.shields.io/badge/Methodology-Information%20Theory%20%26%20Stochastic%20Methods-orange)](https://github.com/chirindaopensource/mapping_crisis_driven_market_dynamics)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/mapping_crisis_driven_market_dynamics)
<br>

**Repository:** https://github.com/chirindaopensource/mapping_crisis_driven_market_dynamics

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade implementation of the research methodology from the 2025 paper entitled **"Mapping Crisis-Driven Market Dynamics: A Transfer Entropy and Kramers–Moyal Approach to Financial Networks"** by:

*   Pouriya Khalilian
*   Amirhossein N. Golestani
*   Mohammad Eslamifar
*   Mostafa T. Firouzjaee
*   Javad T. Firouzjaee

The project provides a robust, end-to-end Python pipeline for constructing a multi-layered "Digital Twin" of financial market interactions. It moves beyond traditional correlation analysis to map the dynamic, non-linear, and directed relationships between assets, offering a powerful tool for systemic risk analysis, adaptive hedging, and macro-prudential policy assessment.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: run_master_pipeline](#key-callable-run_master_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "Mapping Crisis-Driven Market Dynamics." The core of this repository is the iPython Notebook `mapping_crisis_driven_market_dynamics_draft.ipynb`, which contains a comprehensive suite of functions to model financial networks using a dual information-theoretic and stochastic approach.

Traditional measures like Pearson correlation are symmetric and linear, failing to capture the complex, directed, and non-linear feedback loops that characterize modern financial markets, especially during crises. This framework addresses these shortcomings by integrating two advanced methodologies:
1.  **Transfer Entropy (TE):** A non-parametric measure from information theory that quantifies the directed flow of information between time series.
2.  **Kramers-Moyal (KM) Expansion:** A method from stochastic calculus that approximates the underlying deterministic "drift" forces governing the system's dynamics.

This codebase enables researchers, quantitative analysts, and portfolio managers to:
-   Rigorously compute static and dynamic TE and KM network matrices.
-   Quantify the intensification of information flow during market crises.
-   Identify persistent, stable relationships (e.g., safe-haven effects) and moments of significant structural change (regime shifts).
-   Perform advanced robustness and error analysis to validate findings.
-   Replicate and extend the results of the original research paper.



## Theoretical Background

The methodology implemented in this project is a direct translation of the unified framework presented in the source paper. It is designed to overcome the limitations of traditional linear correlation by employing a dual approach rooted in information theory and stochastic calculus. The theoretical pipeline can be understood in four distinct stages:

### 1. Foundational Data Transformation

The analysis begins with a critical econometric principle: financial asset prices ($P_t$) are generally non-stationary (i.e., they contain unit roots), making them unsuitable for most statistical models. To address this, the pipeline first transforms the raw price series into a stationary log-return series ($r_t$) using the standard formula:

$r_t = \log(P_t) - \log(P_{t-1})$

This transformation yields a series whose statistical properties (like mean and variance) are more stable over time, forming a valid basis for the subsequent analyses. The pipeline empirically verifies this transformation using the Augmented Dickey-Fuller test.

### 2. Layer 1: Information-Theoretic Linkages (Transfer Entropy)

To map the directed, non-linear flow of information between assets, the framework employs Transfer Entropy (TE). TE measures the reduction in uncertainty about a target asset's future state given knowledge of a source asset's past state, beyond what the target's own past already explains. It is formally defined in the paper's **Equation 2**:

$T_{j \to i} = \sum_{i_{t+1}, i_t, j_t} P(i_{t+1}, i_t, j_t) \log_2 \frac{P(i_{t+1} | i_t, j_t)}{P(i_{t+1} | i_t)}$

-   $T_{j \to i}$ represents the information flowing from asset `j` to asset `i`.
-   The measure is inherently **asymmetric** ($T_{j \to i} \neq T_{i \to j}$), allowing us to identify sources and sinks of information flow.
-   The calculation requires discretizing the continuous return data into bins to estimate the necessary joint and conditional probability distributions, as outlined conceptually in **Algorithm 1** of the paper's framework.

### 3. Layer 2: Stochastic System Dynamics (Kramers-Moyal Expansion)

To complement the TE analysis, the framework models the system's evolution using the Kramers-Moyal (KM) expansion. This method describes a stochastic process in terms of its deterministic "drift" and stochastic "diffusion" components. This implementation focuses on the first KM coefficient—the drift vector $D^{(1)}$—which represents the deterministic forces governing the system's expected movement.

The paper makes a crucial simplification by approximating this drift with a linear model:

$\frac{d}{dt}\mathbf{x}(t) \approx A\mathbf{x}(t)$

Here, $\mathbf{x}(t)$ is the vector of asset returns, and $A$ is the $N \times N$ drift coefficient matrix. The elements $A_{ij}$ represent the signed, linear influence of asset `j`'s current return on the expected change in asset `i`'s return. This matrix is estimated by solving a system of linear equations derived from the moment conditions specified in **Equation 9** of the paper:

$\langle (x_i(t+dt) - x_i(t)) x_k(t) \rangle = \sum_{j=1}^{N} A_{ij} \langle x_j(t) x_k(t) \rangle$

This provides a signed, directed map of the linear relationships, where a negative $A_{ij}$ can be interpreted as a hedging or mean-reverting influence, and a positive $A_{ij}$ suggests co-movement.

### 4. Dynamic Analysis via Sliding Window

A static analysis of the full time series provides only an average picture of the network. To capture the evolving nature of market dynamics, both the TE and KM analyses are applied within a **sliding window** framework, as described conceptually in **Algorithm 3**. The pipeline moves a window of a fixed size (e.g., 252 days) across the entire dataset with a given step size (e.g., 21 days), re-calculating the TE and KM matrices for each window.

This procedure generates a time series of network matrices, transforming the static snapshot into a dynamic "movie" of the financial system. This allows for the direct observation of how network structures change over time and, most importantly, how they are reshaped by major market events, which is the central empirical contribution of the source paper.

## Features

The provided iPython Notebook (`mapping_crisis_driven_market_dynamics_draft.ipynb`) implements the full research pipeline, including:

-   **Rigorous Validation:** Comprehensive checks for all input data and configurations.
-   **Professional Preprocessing:** A robust pipeline for cleaning financial time series and transforming them into stationary log-returns.
-   **Methodologically Pure Calculations:** Precise, numerically stable implementations of the Transfer Entropy and Kramers-Moyal drift matrix calculations.
-   **Dynamic Analysis Engine:** A flexible sliding-window framework for time-resolved analysis.
-   **Automated Interpretation:** Algorithms to automatically identify persistent network links and detect significant regime shifts.
-   **Crisis and Robustness Analysis:** A full suite of tools to quantify crisis impacts and perform sensitivity analysis on key hyperparameters.
-   **Error Analysis:** Block bootstrapping to generate confidence intervals for model estimates.
-   **Integrated Reporting:** A final function that generates a self-contained, interactive HTML report with embedded tables and figures.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Preparation (Tasks 1-3):** The pipeline ingests raw price data, validates it, preprocesses it into clean log-returns, and confirms the stationarity properties of the resulting series.
2.  **Static Network Calculation (Tasks 4-6):** It computes the static (full-period) TE and KM matrices, providing a baseline snapshot of the network.
3.  **Dynamic Network Analysis (Tasks 7-8):** It implements the sliding window procedure to generate a time series of network matrices and analyzes these to quantify the average network structure during specific crisis periods versus a normal baseline.
4.  **Interpretation and Meta-Analysis (Tasks 10-12):** The pipeline automatically interprets the dynamic results to find persistent links and regime shifts, performs robustness checks across different parameters, and conducts error analysis via bootstrapping.
5.  **Reporting (Task 13-14):** All findings are compiled into a comprehensive results bundle and a final, user-selectable HTML or Markdown report.

## Core Components (Notebook Structure)

The `mapping_crisis_driven_market_dynamics_draft.ipynb` notebook is structured as a logical pipeline with modular functions for each task:

-   **`validate_inputs`**: The initial quality gate for all inputs.
-   **`preprocess_price_data`**: The data cleaning and transformation engine.
-   **`perform_stationarity_analysis`**: Econometric validation of the data.
-   **`generate_descriptive_statistics`**: Initial data characterization.
-   **`calculate_transfer_entropy`**: Core information-theoretic calculation.
-   **`calculate_kramers_moyal_drift_matrix`**: Core stochastic dynamics calculation.
-   **`perform_time_resolved_analysis`**: The dynamic analysis engine.
-   **`analyze_crisis_periods`**: Crisis impact quantification.
-   **`plot_matrix_heatmap`, `plot_network_graph`**: Visualization utilities.
-   **`interpret_dynamic_results`**: Automated interpretation engine.
-   **`run_full_analysis_pipeline`**: Orchestrator for a single, complete analysis run.
-   **`perform_robustness_analysis`**: Higher-level orchestrator for sensitivity analysis.
-   **`perform_error_analysis`**: Higher-level orchestrator for confidence interval estimation.
-   **`run_master_pipeline`**: The single, top-level entry point to the entire project.

## Key Callable: run_master_pipeline

The central function in this project is `run_master_pipeline`. It orchestrates the entire analytical workflow from raw data to final report.

```python
def run_master_pipeline(
    raw_price_df: pd.DataFrame,
    study_config: Dict[str, Any],
    param_grid: Dict[str, List[Any]],
    report_format: str = 'html',
    run_robustness_analysis: bool = True,
    run_error_analysis: bool = True,
    bootstrap_samples: int = 100
) -> Tuple[Dict[str, Any], Optional[str]]:
    """
    The master orchestrator for the end-to-end Digital Twin analysis pipeline.
    ... (full docstring is in the notebook)
    """
    # ... (implementation is in the notebook)
```

## Prerequisites

-   Python 3.9+
-   Core dependencies as listed in `requirements.txt`: `pandas`, `numpy`, `scipy`, `matplotlib`, `seaborn`, `networkx`, `statsmodels`, `tqdm`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/mapping_crisis_driven_market_dynamics.git
    cd mapping_crisis_driven_market_dynamics
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies from `requirements.txt`:**
    ```sh
    pip install -r requirements.txt
    ```

## Input Data Structure

The primary input is a `pandas.DataFrame` with a `DatetimeIndex` and columns containing the daily closing prices of the assets to be analyzed. The index should consist of business days.

**Example:**
```
                  Nasdaq  Crude-oil      Gold  US-dollar
Date
2014-08-11  4401.330078  98.080002  1310.300049  81.489998
2014-08-12  4434.129883  97.370003  1310.500000  81.620003
2014-08-13  4456.020020  97.570000  1314.599976  81.580002
...                 ...        ...          ...        ...
```

## Usage

The entire pipeline is executed through the `run_master_pipeline` function. The user must provide the raw price data, a study configuration dictionary, and a parameter grid for robustness checks.

```python
import pandas as pd

# 1. Load your data
# raw_price_df = pd.read_csv("your_data.csv", index_col=0, parse_dates=True)
# For this example, we create synthetic data.
date_rng = pd.date_range(start='2014-08-01', end='2024-09-30', freq='B')
price_data = {asset: 100 * np.exp(np.cumsum(np.random.randn(len(date_rng)) * 0.01)) for asset in ['Nasdaq', 'Crude-oil', 'Gold', 'US-dollar']}
raw_price_df = pd.DataFrame(price_data, index=date_rng)

# 2. Define your configurations (see notebook for full example)
study_config = { ... } # As defined in the notebook
param_grid = { ... }   # As defined in the notebook

# 3. Run the master pipeline
# from mapping_crisis_driven_market_dynamics_draft import run_master_pipeline
master_results, html_report = run_master_pipeline(
    raw_price_df=raw_price_df,
    study_config=study_config,
    param_grid=param_grid,
    report_format='html'
)

# 4. Save the report and explore results
with open("analysis_report.html", "w", encoding="utf-8") as f:
    f.write(html_report)

# Programmatically access results
robust_links = master_results['robustness_analysis']['persistent_links']['transfer_entropy']
print(robust_links.head())
```

## Output Structure

The `run_master_pipeline` function returns a tuple: `(master_results, report_string)`.

-   `master_results`: A deeply nested dictionary containing all data artifacts. Top-level keys include:
    -   `main_analysis`: Results from the baseline run (processed data, static matrices, dynamic results, interpretations, figures).
    -   `robustness_analysis`: Results from the sensitivity analysis, including robustness scores for key findings.
    -   `error_analysis`: Results from the bootstrapping, including confidence intervals for static estimates.
-   `report_string`: A string containing the full source code of the generated HTML or Markdown report.

## Project Structure

```
mapping_crisis_driven_market_dynamics/
│
├── mapping_crisis_driven_market_dynamics_draft.ipynb  # Main implementation notebook
├── requirements.txt                                   # Python package dependencies
├── LICENSE                                            # MIT license file
└── README.md                                          # This documentation file
```

## Customization

The pipeline is highly customizable via the `study_config` and `param_grid` dictionaries passed to `run_master_pipeline`. Users can easily modify:
-   The `date_range` and `crisis_periods` to analyze different time frames.
-   The `discretization_bins` and `window_size_days` to test different model specifications.
-   All `visualization_params` to change the aesthetic of plots and graphs.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{khalilian2025mapping,
  title={Mapping Crisis-Driven Market Dynamics: A Transfer Entropy and Kramers--Moyal Approach to Financial Networks},
  author={Khalilian, Pouriya and Golestani, Amirhossein N and Eslamifar, Mohammad and Firouzjaee, Mostafa T and Firouzjaee, Javad T},
  journal={arXiv preprint arXiv:2507.09554},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Python Implementation of the Transfer Entropy and Kramers-Moyal Framework for Financial Networks.
GitHub repository: https://github.com/chirindaopensource/mapping_crisis_driven_market_dynamics
```

## Acknowledgments

-   Credit to Pouriya Khalilian, Amirhossein N. Golestani, Mohammad Eslamifar, Mostafa T. Firouzjaee, and Javad T. Firouzjaee for the novel analytical framework.
-   Thanks to the developers of the `pandas`, `numpy`, `scipy`, `matplotlib`, `seaborn`, `networkx`, `statsmodels`, and `tqdm` libraries, which are the foundational pillars of this analytical pipeline.

--

*This README was generated to document the code and methodology contained within `mapping_crisis_driven_market_dynamics_draft.ipynb` and follows best practices for open-source research software.*

# Paper

**Title:** "*Mapping Crisis-Driven Market Dynamics: A Transfer Entropy and KramersMoyal Approach to Financial Networks*"

**Link:** https://arxiv.org/abs/2507.09554

**Authors:** Pouriya Khalilian, Amirhossein N. Golestani, Mohammad Eslamifar, Mostafa T. Firouzjaee, Javad T. Firouzjaee

**E-Journal Submission Date:** 13th of July 2025

**Abstract:**

Financial markets are dynamic, interconnected systems where local shocks can trigger widespread instability, challenging portfolio managers and policymakers. Traditional correlation analysis often miss the directionality and temporal dynamics of information flow. To address this, we present a unified framework integrating Transfer Entropy (TE) and the N-dimensional KramersMoyal (KM) expansion to map static and time-resolved coupling among four major indices: Nasdaq Composite (^IXIC), WTI crude oil (WTI), gold (GC=F), and the US Dollar Index (this http URL). TE captures directional information flow. KM models non-linear stochastic dynamics, revealing interactions often overlooked by linear methods. Using daily data from August 11, 2014, to September 8, 2024, we compute returns, confirm non-stationary using a conduct sliding-window TE and KM analyses. We find that during the COVID-19 pandemic (March-June 2020) and the Russia-Ukraine crisis (Feb-Apr 2022), average TE increases by 35% and 28%, respectively, indicating heightened directional flow. Drift coefficients highlight gold-dollar interactions as a persistent safe-haven channel, while oil-equity linkages show regime shifts, weakening under stress and rebounding quickly. Our results expose the shortcomings of linear measures and underscore the value of combining information-theoretic and stochastic drift methods. This approach offers actionable insights for adaptive hedging and informs macro-prudential policy by revealing the evolving architecture of systemic risk.

# Summary

#### **Step 1: The Core Problem and the Paper's Proposed Solution**

First, let's understand the fundamental problem the authors are trying to solve. For decades, we've used the **correlation coefficient** to understand how financial assets move together. However, this tool is fundamentally limited:

1.  **It's Symmetric:** The correlation between Gold and the US Dollar is a single number. It doesn't tell you if Gold is driving the Dollar or if the Dollar is driving Gold. It lacks **directionality**.
2.  **It's Linear:** It only captures linear relationships. It completely misses complex, non-linear interactions, which we know are rampant in financial markets, especially during crises.
3.  **It's Static:** A single correlation number for a whole period hides how relationships evolve over time.

The authors propose a more sophisticated, two-pronged approach to overcome these limitations. They aim to create a "unified framework" by combining:

*   **Transfer Entropy (TE):** An information-theoretic tool to capture **directed, non-linear information flow**.
*   **Kramers–Moyal (KM) Expansion:** A technique from stochastic calculus to derive the underlying **dynamic equations (the "rules of the game")** that govern the assets' movements.

The goal is to not just see *that* assets are connected, but to understand *how* they influence each other, in which direction, and how these rules change, particularly during market stress.

--

#### **Step 2: Deconstructing the Methodology**

This is the heart of the paper. Let's look at the two tools they use.

**Part A: Transfer Entropy (TE)**
Think of TE as a sophisticated measure of Granger causality. In simple terms, TE from asset X to asset Y measures the reduction in uncertainty about the future of Y when you know the past of X, over and above what you already knew from the past of Y alone.

*   **What it gives you:** A directed measure (`T_XY` is not necessarily equal to `T_YX`). It's model-free and can capture non-linear relationships.
*   **How they use it:** They calculate TE between all pairs of their four indices (Nasdaq, Oil, Gold, US Dollar) and use a sliding window to see how these information flows evolve over time. This allows them to create dynamic, directed networks.

**Part B: The N-dimensional Kramers–Moyal (KM) Expansion**
This is the more complex part. Imagine a stock price moving randomly. The KM expansion is a formal way to extract the underlying "forces" driving that movement directly from the time-series data. Any stochastic process can be described by its KM coefficients. The two most important are:

*   **Drift Coefficient (D⁽¹⁾):** The deterministic force. This is the "push" or "pull" on the asset's price at any given moment. It tells you the expected direction of the next move.
*   **Diffusion Coefficient (D⁽²⁾):** The stochastic force. This is the magnitude of the random noise or volatility.

The authors make a **critical simplification**: They focus only on the drift term (D⁽¹⁾) and, more importantly, they *assume it has a linear form*. They model the system's dynamics as a set of linear differential equations: `dx/dt = Ax(t)`.

*   **What this gives them:** The matrix `A` becomes a network of influences. The element `A_ij` represents the linear push that asset `j` exerts on asset `i`. A negative diagonal element `A_ii` implies mean-reversion (stability), while a positive one would imply explosive behavior.
*   **How they use it:** They estimate this `A` matrix for the whole period and also within a sliding window to see how these deterministic influences change over time.

--

#### **Step 3: Key Empirical Findings**

The authors apply these methods to daily data for the Nasdaq, WTI crude oil, gold, and the US Dollar Index from 2014 to 2024. Their results are quite intuitive.

1.  **Crises Intensify Information Flow:** The Transfer Entropy analysis (Figures 6 & 8) clearly shows that the total information flow in the network spikes dramatically during the COVID-19 pandemic (starting 2020) and the Russia-Ukraine conflict (starting 2022). This confirms our intuition that in times of panic, markets become more tightly coupled and information (or noise) transmits more rapidly.
2.  **Gold's Persistent Safe-Haven Role:** The Kramers-Moyal drift analysis (Figure 11) reveals a consistent, stable interaction between gold and the US dollar. This provides a dynamic, model-based confirmation of gold's role as a safe-haven asset that tends to move in opposition to the dollar.
3.  **Fragile Oil-Equity Link:** In contrast, the relationship between oil and the Nasdaq (a proxy for equities/tech sector) is shown to be less stable. The linkage appears to weaken or change character during crises and then re-form afterward, highlighting a "regime-dependent" dynamic that simple correlation would miss.

--

#### **Step 4: The Paper's Main Contribution**

The primary contribution here is the **synthesis of the two methods**.

*   TE tells you *if* and *in which direction* information is flowing.
*   The KM drift analysis provides a *mechanistic model* of that flow.

For example, TE might show a strong flow from the US Dollar to Gold. The KM analysis then adds color to this, showing that the drift coefficient is negative, meaning a stronger dollar exerts a downward "push" on the price of gold. Combining these gives a richer story than either one could tell alone. It's a step up from simply drawing lines on a network graph to trying to understand the physics of the system.

--

#### **Step 5: A Professor's Critique and Avenues for Future Work**

While the paper is a valuable proof-of-concept, if this were a student's dissertation proposal, I would raise a few critical points:

*   **The Linearity Assumption is a Major Caveat:** The biggest weakness is simplifying the Kramers-Moyal drift term to a linear model (`Ax(t)`). The whole point of using advanced methods like KM is to escape the confines of linearity. By making this assumption, their drift analysis becomes conceptually very similar to a standard Vector Autoregressive (VAR) model. A key area for future work would be to estimate the *non-linear* drift functions, which is computationally challenging but would unlock the true power of the KM approach.
*   **The "N-dimensional" Claim:** The paper calls this an "N-dimensional" KM approach. However, true non-parametric estimation of drift and diffusion in N-dimensions suffers heavily from the "curse of dimensionality." The linear assumption is what makes their N=4 case tractable. The methodology for estimating the matrix `A` from the data (Eq. 10) is essentially solving a system of linear equations, which sidesteps the harder non-parametric problem.
*   **Parameter Sensitivity:** The robustness of both TE and KM estimations depends heavily on parameter choices (e.g., embedding delays for TE, the time-step `dt` for KM, the sliding window size). The paper does not discuss the sensitivity of its results to these choices, which is essential for validating the findings.
*   **Stationarity:** The use of a sliding window is a pragmatic way to handle the non-stationarity of financial data. However, it implicitly assumes that the process is "locally stationary" within each window. This is a necessary evil in financial econometrics, but it's a tension worth acknowledging.

**In conclusion,** Khalilian et al. present a compelling and well-motivated study. They successfully demonstrate that combining information theory and stochastic modeling provides a more nuanced and dynamic picture of financial networks than traditional methods. The paper serves as an excellent illustration of how these advanced tools can be applied. Its main limitation—the linearity assumption in the KM analysis—is also its greatest opportunity for future research to build upon.

# Import Essential Modules

In [None]:
# ==========================================================================================================================================
#
#  Python Implementation of "Mapping Crisis-Driven Market Dynamics: A Transfer Entropy and KramersMoyal Approach to Financial Networks"
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Mapping Crisis-Driven Market Dynamics:
#  A Transfer Entropy and Kramers-Moyal Approach to Financial Networks".
#
#  It includes a full suite of functions for:
#   - Rigorous data validation and preprocessing.
#   - Core calculation of Transfer Entropy and Kramers-Moyal drift coefficients.
#   - Time-resolved dynamic analysis via a sliding window procedure.
#   - Automated interpretation, crisis analysis, and network visualization.
#   - Advanced meta-analysis including robustness and error analysis.
#
#  The final output is a multi-layered "Digital Twin" of market interactions,
#  designed for advanced risk management and systemic analysis.
#
# ==========================================================================================================================================

# Standard Library Imports
import base64
import io
import itertools
from collections import Counter
from copy import deepcopy
from typing import (Any, Dict, Generator, List, Optional, Tuple, Union)

# Third-Party Library Imports
# Core Data Handling and Numerical Computation
import numpy as np
import pandas as pd
import scipy.stats
from numpy.linalg import LinAlgError

# Data Visualization
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import seaborn as sns

# Network Analysis
import networkx as nx

# Econometric and Statistical Modeling
from statsmodels.tsa.stattools import adfuller

# Utility for Progress Monitoring
from tqdm import tqdm


# Implementation

## Draft 1

### Inputs, Processes and Outputs (IPO) Analysis of Key Callables

### **Comprehensive Callable-by-Callable Analysis**

#### **Module 1: Input Validation**

**1.1. `_validate_dataframe`**
*   **Inputs:** `df` (a `pandas.DataFrame`).
*   **Processes:** Performs three specific checks on the input DataFrame: (1) verifies that the index is a `pandas.DatetimeIndex` and is monotonically increasing; (2) verifies that all data columns are of a numeric data type; (3) verifies that the DataFrame contains no `NaN` (missing) values.
*   **Outputs:** A `list` of `str`, where each string is a descriptive error message. An empty list signifies successful validation.
*   **Role in Research Pipeline:** This is a granular **Data Integrity Check**. It ensures the foundational data structure meets the absolute minimum requirements for time series analysis before any further processing.

**1.2. `_validate_study_config`**
*   **Inputs:** `config` (a `dict`).
*   **Processes:** (1) Recursively validates the input dictionary against a predefined schema to ensure all required keys and nested structures are present. (2) For all date-string values, it attempts to parse them into `pandas.Timestamp` objects to validate their format. (3) It performs logical checks, such as ensuring start dates precede end dates and that crisis periods fall within the main study date range. (4) It validates the data types and value ranges of key numerical parameters (e.g., `discretization_bins` must be a positive integer).
*   **Outputs:** A `tuple` containing: (1) a deep copy of the input `config` with date strings converted to `pd.Timestamp` objects, and (2) a `list` of `str` error messages.
*   **Role in Research Pipeline:** This is the **Parameter Validation** unit. It ensures that all user-defined parameters for the study are present, correctly formatted, and logically consistent.

**1.3. `_cross_validate_inputs`**
*   **Inputs:** `df` (a `pandas.DataFrame`), `config` (a `dict` where dates are `pd.Timestamp` objects).
*   **Processes:** Performs a temporal consistency check by comparing the minimum and maximum dates in the `df.index` with the `start` and `end` dates specified in the `config`.
*   **Outputs:** A `list` of `str` error messages if the DataFrame's date range does not fully encompass the required study range.
*   **Role in Research Pipeline:** This is the **Data-Parameter Compatibility Check**. It confirms that the provided data is sufficient to actually perform the analysis requested in the parameters.

**1.4. `validate_inputs`**
*   **Inputs:** `df` (a `pandas.DataFrame`), `config` (a `dict`).
*   **Processes:** Orchestrates the validation by calling `_validate_dataframe`, `_validate_study_config`, and `_cross_validate_inputs` in sequence. It aggregates all potential error messages from these helpers. If any errors are found, it raises a single, comprehensive `ValueError`.
*   **Outputs:** The validated and type-converted configuration `dict`.
*   **Role in Research Pipeline:** This is the master **Gatekeeper and Initializer** for the entire pipeline, ensuring all inputs are sound before computation begins.

#### **Module 2: Data Preprocessing**

**2.1. `preprocess_price_data`**
*   **Inputs:** `df` (a raw, validated `pandas.DataFrame`), `validated_config` (a `dict`).
*   **Processes:** Executes a strict, four-step data transformation pipeline: (1) **Filtering:** Slices the DataFrame to the study's date range. (2) **Cleaning:** Replaces infinities, then applies forward-fill and backward-fill to handle missing values. (3) **Transformation:** Checks for non-positive prices, then calculates log-returns using the formula $r_t = \log(P_t) - \log(P_{t-1})$. (4) **Finalization:** Drops any remaining `NaN` rows to ensure a perfectly aligned dataset.
*   **Outputs:** A `tuple` containing: (1) a clean, stationary `pandas.DataFrame` of log-returns, and (2) a `dict` of metadata documenting the process.
*   **Role in Research Pipeline:** This function is the **Data Transformation Engine**, converting raw, non-stationary price data into the clean, stationary log-return series required by the core analytical models.

#### **Module 3: Econometric Diagnostics**

**3.1. `_run_adf_test`**
*   **Inputs:** `series` (a `pandas.Series`), `significance_level` (a `float`).
*   **Processes:** Executes the Augmented Dickey-Fuller (ADF) test on the input series using `statsmodels.tsa.stattools.adfuller`. It uses a constant and trend (`ct`) regression and automatic lag selection (`AIC`). It interprets the resulting p-value to determine if the series is stationary.
*   **Outputs:** A `dict` containing the ADF statistic, p-value, lags used, and a boolean `is_stationary` flag with a human-readable interpretation.
*   **Role in Research Pipeline:** This is a granular **Statistical Test Unit**. It performs a single, well-configured stationarity test.

**3.2. `perform_stationarity_analysis`**
*   **Inputs:** `price_df` (a `pandas.DataFrame`), `log_return_df` (a `pandas.DataFrame`).
*   **Processes:** For each asset, it calls `_run_adf_test` on both its price series and its log-return series. It then compiles these paired results into a single summary table.
*   **Outputs:** A multi-indexed `pandas.DataFrame` that presents the stationarity test results for prices and returns side-by-side for each asset.
*   **Role in Research Pipeline:** This function performs the **Econometric Validation** of the data transformation, empirically confirming that the log-return series are stationary and suitable for analysis, while the original price series are not.

#### **Module 4: Core Analysis**

**4.1. `_discretize_data`**
*   **Inputs:** `log_return_df` (a `pandas.DataFrame`), `num_bins` (an `int`).
*   **Processes:** For each column in the input DataFrame, it applies `pandas.qcut` to perform equal-frequency (quantile) binning, converting the continuous data into discrete integer labels from 0 to `num_bins-1`.
*   **Outputs:** A `tuple` containing: (1) a `pandas.DataFrame` of the same shape with integer bin labels, and (2) a `dict` mapping each asset to its calculated bin edges.
*   **Role in Research Pipeline:** This is the **Data Discretization** step, a necessary prerequisite for any analysis based on probability mass functions, specifically Transfer Entropy.

**4.2. `calculate_transfer_entropy`**
*   **Inputs:** `log_return_df` (a `pandas.DataFrame`), `validated_config` (a `dict`).
*   **Processes:** Implements the direct calculation of Transfer Entropy as defined by **Equation 2** of the source paper. The process involves: (1) discretizing the data using `_discretize_data`; (2) for each pair of assets (j->i), constructing the 3D joint probability distribution $P(i_{t+1}, i_t, j_t)$; (3) calculating the required marginal and conditional probabilities with numerically stable division; (4) computing the log-ratio of the conditional probabilities; (5) calculating the final TE value as the expectation of this log-ratio over the joint distribution.
*   **Outputs:** An $N \times N$ `pandas.DataFrame` where the element at `[row_i, col_j]` is the Transfer Entropy from asset `j` to asset `i`.
*   **Role in Research Pipeline:** This is a core analytical function that implements the **Information-Theoretic Network Mapping**, quantifying non-linear, directed information flows.

**4.3. `calculate_kramers_moyal_drift_matrix`**
*   **Inputs:** `log_return_df` (a `pandas.DataFrame`).
*   **Processes:** Implements the estimation of the first Kramers-Moyal coefficient (the drift matrix). It (1) calculates the one-step increments of the log-return series; (2) computes the covariance matrix ($C_{xx}$) and the time-lagged cross-covariance matrix ($C_{yx}$); (3) checks the condition number of $C_{xx}$ for stability; (4) solves the linear matrix equation $A = C_{yx} C_{xx}^{-1}$ to find the drift matrix $A$. This solves the system derived from **Equation 9**.
*   **Outputs:** An $N \times N$ `pandas.DataFrame` representing the estimated linear drift matrix $A$.
*   **Role in Research Pipeline:** This is the second core analytical function, implementing the **Stochastic Dynamics Network Mapping**, providing a linear approximation of the directed forces within the system.

#### **Module 5: Dynamic Analysis and Interpretation**

**5.1. `perform_time_resolved_analysis`**
*   **Inputs:** `log_return_df` (a `pandas.DataFrame`), `validated_config` (a `dict`).
*   **Processes:** Implements the **Algorithm 3** sliding window procedure. It (1) generates sequential, overlapping windows of the data; (2) iteratively calls `calculate_transfer_entropy` and `calculate_kramers_moyal_drift_matrix` on each window; (3) robustly handles errors in individual windows; (4) stores the resulting sequence of matrices in dictionaries indexed by the window end-date.
*   **Outputs:** A `dict` containing the time series of TE and KM matrices.
*   **Role in Research Pipeline:** This function provides the **Dynamic Evolution** of the network, allowing for the study of how market structure changes over time.

**5.2. `analyze_crisis_periods`**
*   **Inputs:** `time_resolved_results` (a `dict`), `validated_config` (a `dict`).
*   **Processes:** (1) Partitions the time-resolved matrices into "crisis" and "baseline" sets based on dates in the configuration. (2) Aggregates the matrices in each set by computing the element-wise mean. (3) Calculates the percentage change of the average crisis matrix relative to the baseline.
*   **Outputs:** A nested `dict` containing the average matrices and percentage change matrices for each crisis period.
*   **Role in Research Pipeline:** This function performs the **Crisis Impact Quantification**, providing the evidence for claims about how crises intensify information flow.

**5.3. `_unstack_results_to_dataframe`**
*   **Inputs:** `time_resolved_results` (a `dict`), `analysis_type` (a `str`).
*   **Processes:** Transforms the 3D data structure (dictionary of matrices over time) into a 2D `pandas.DataFrame`. It "melts" the data so that the index is time and the columns are a `MultiIndex` representing each unique network link `(target, source)`.
*   **Outputs:** A 2D `pandas.DataFrame` where each column is the time series of a single network link's strength.
*   **Role in Research Pipeline:** This is a crucial **Data Reshaping Utility** that prepares the dynamic results for standard time series analysis.

**5.4. `interpret_dynamic_results`**
*   **Inputs:** `time_resolved_results` (a `dict`) and several threshold parameters.
*   **Processes:** Performs a two-part automated interpretation. (1) **Persistence Analysis:** It identifies links that have a high average strength (magnitude) and are consistently strong over time (consistency). (2) **Regime Shift Detection:** It uses a rolling z-score method to screen for dates where a link's short-term behavior deviates significantly from its long-term norm.
*   **Outputs:** A `dict` containing DataFrames of persistent links and a dictionary of detected regime shift events.
*   **Role in Research Pipeline:** This is the **Automated Interpretation Engine**, which sifts through the dynamic results to flag the most stable relationships and the most significant moments of change.

#### **Module 6: Visualization and Reporting**

**6.1. `plot_matrix_heatmap`**
*   **Inputs:** `matrix_df` (a `pandas.DataFrame`), `title` (a `str`), `validated_config` (a `dict`).
*   **Processes:** Uses `seaborn.heatmap` to create a graphical color-coded representation of the input matrix, with intelligent colormap selection and annotations.
*   **Outputs:** A `matplotlib.figure.Figure` object.
*   **Role in Research Pipeline:** A **Visualization Tool** for displaying matrix-based results, analogous to the heatmap in Figure 2 of the paper.

**6.2. `plot_network_graph`**
*   **Inputs:** `matrix_df` (a `pandas.DataFrame`), `title` (a `str`), `validated_config` (a `dict`).
*   **Processes:** Uses `networkx` to convert the matrix into a directed graph object. It then draws this graph, using data-driven visual properties (edge width, color, node size) and filters edges by a percentile threshold to improve clarity.
*   **Outputs:** A `matplotlib.figure.Figure` object.
*   **Role in Research Pipeline:** A **Visualization Tool** for displaying network structures, analogous to the network diagram in Figure 2 of the paper.

**6.3. `_figure_to_base64_str`**
*   **Inputs:** `fig` (a `matplotlib.figure.Figure` object).
*   **Processes:** Renders the figure to an in-memory binary buffer, encodes the binary data to Base64, and formats it as a self-contained data URI string.
*   **Outputs:** A `str` containing the data URI.
*   **Role in Research Pipeline:** A **Reporting Utility** that enables the embedding of plots directly into HTML documents.

**6.4. `_generate_html_report`**
*   **Inputs:** `master_results` (a `dict`).
*   **Processes:** Constructs a complete, self-contained HTML document as a string. It converts result DataFrames to styled HTML tables using `.to_html()` and embeds figures by calling `_figure_to_base64_str` and inserting the resulting data URI into `<img>` tags. It uses `<details>` tags for interactivity.
*   **Outputs:** A `str` containing the full HTML source of the report.
*   **Role in Research Pipeline:** The **Dynamic Report Generator**, creating a portable, interactive, and visually rich summary of the entire analysis.

#### **Module 7: Master Orchestration**

**7.1. `run_full_analysis_pipeline`**
*   **Inputs:** `raw_price_df` (a `pandas.DataFrame`), `study_config` (a `dict`).
*   **Processes:** Orchestrates the execution of Tasks 1 through 10 in a single, sequential workflow. It manages the flow of data artifacts between the component functions.
*   **Outputs:** A comprehensive `dict` containing all results from the main analysis pipeline.
*   **Role in Research Pipeline:** This is the **Core Analysis Orchestrator**, representing a single, complete run of the paper's methodology.

**7.2. `perform_robustness_analysis`**
*   **Inputs:** `raw_price_df` (a `pandas.DataFrame`), `base_config` (a `dict`), `param_grid` (a `dict`).
*   **Processes:** (1) Generates all possible hyperparameter combinations from the `param_grid`. (2) For each combination, it calls `run_full_analysis_pipeline`. (3) It aggregates the key findings (persistent links and regime shifts) from all runs. (4) It calculates a "Robustness Score" for each finding, representing the percentage of runs in which it was detected.
*   **Outputs:** A `dict` containing DataFrames that summarize the stability of the key findings.
*   **Role in Research Pipeline:** This is the **Sensitivity Analysis Engine**, which stress-tests the conclusions of the study against changes in its underlying assumptions.

**7.3. `perform_error_analysis`**
*   **Inputs:** `log_return_df` (a `pandas.DataFrame`), `validated_config` (a `dict`), `num_samples` (an `int`).
*   **Processes:** (1) Implements block bootstrapping to generate numerous resampled versions of the time series data. (2) For each resampled dataset, it re-calculates the static TE and KM matrices. (3) It uses the resulting distribution of matrix coefficients to compute percentile-based confidence intervals for each link.
*   **Outputs:** A `dict` containing the point estimates and the lower/upper confidence interval matrices.
*   **Role in Research Pipeline:** This is the **Statistical Inference Engine**, which quantifies the uncertainty of the model's estimates due to sampling noise.

**7.4. `run_master_pipeline`**
*   **Inputs:** `raw_price_df`, `study_config`, `param_grid`, and several boolean flags.
*   **Processes:** The ultimate orchestrator. It (1) calls `run_full_analysis_pipeline` to get the main results; (2) conditionally calls `perform_robustness_analysis` and `perform_error_analysis` based on user flags; (3) compiles all results into a single master bundle; (4) conditionally calls `_generate_html_report` (or `_generate_report_string`) to create a final summary document.
*   **Outputs:** A `tuple` containing: (1) the master results `dict`, and (2) the generated report `str` (or `None`).
*   **Role in Research Pipeline:** This is the **Master Conductor**, the single entry point that encapsulates the entire research project from raw data to final, documented, and stress-tested conclusions.

### Usage Illustration
### **Example: Executing the Digital Twin Master Pipeline**

This example demonstrates a complete, end-to-end workflow for analyzing the relationships between four major financial assets, as described in the source paper. We will:
1.  Load sample data.
2.  Define the comprehensive study configuration.
3.  Define the parameter grid for robustness checks.
4.  Execute the master pipeline function.
5.  Process and display the outputs.

--

#### **Step 1: Setup and Data Loading**

First, we import the necessary libraries and our master pipeline function. For this example, we will create a synthetic dataset that mimics the structure of real financial data. In a real-world scenario, this data would be loaded from a file or downloaded via an API.

```python
# Import the master pipeline function and necessary libraries
# In a real package, this would be: from digital_twin_pipeline import run_master_pipeline
# For this example, we assume all functions are in the same script.
import pandas as pd
import numpy as np

# --- Create Synthetic Raw Price Data ---
# In a real application, you would load your data here, e.g., from a CSV:
# raw_price_df = pd.read_csv("asset_prices.csv", index_col=0, parse_dates=True)

# For this example, we generate synthetic data that resembles the assets in the paper.
date_rng = pd.date_range(start='2014-08-01', end='2024-09-30', freq='B')
num_days = len(date_rng)
assets = ['Nasdaq', 'Crude-oil', 'Gold', 'US-dollar']

# Generate random walks to simulate price series
np.random.seed(42)
price_data = {
    asset: 100 * np.exp(np.cumsum(np.random.randn(num_days) * 0.01))
    for asset in assets
}
raw_price_df = pd.DataFrame(price_data, index=date_rng)

print("Sample Raw Price Data:")
print(raw_price_df.head())
```
**Discursive Text:** The code above sets up our environment. We create a `pandas.DataFrame` named `raw_price_df`. The index is a `DatetimeIndex` of business days, and the columns represent the different financial assets we want to analyze. This structure is the required input format for our pipeline.

--

#### **Step 2: Defining the Study Configuration**

Next, we define the two critical configuration dictionaries. These parameters control every aspect of the analysis, from the date ranges to the visualization styles.

```python
# --- Define the Main Study Configuration ---
# This dictionary controls the primary analysis run.
study_config = {
    'data_params': {
        'date_range': {'start': '2014-08-11', 'end': '2024-09-08'},
        'crisis_periods': {
            'covid_19': {
                'start': '2020-03-01',
                'end': '2020-06-30',
                'label': 'COVID-19 Pandemic'
            },
            'ukraine_war': {
                'start': '2022-02-24',
                'end': '2022-05-31',
                'label': 'Ukraine War'
            }
        }
    },
    'analysis_params': {
        'information_theory': {
            'discretization_bins': 10,
            'discretization_strategy': 'equal_frequency'
        },
        'sliding_window': {
            'window_size_days': 252, # Approx. 1 trading year
            'step_size_days': 21     # Approx. 1 trading month
        },
        'kramers_moyal': {
            'model_assumption': 'linear_drift'
        }
    },
    'visualization_params': {
        'general': {'figure_dpi': 300, 'font_size': 10},
        'network_graphs': {
            'layout_function': 'circular_layout',
            'node_size': 3000,
            'node_color': 'skyblue',
            'edge_width_multiplier': 5.0,
            'arrow_size': 20
        },
        'heatmaps': {
            'colormap_diverging': 'RdBu_r',
            'colormap_sequential': 'viridis',
            'crisis_line_color': 'black',
            'crisis_line_style': '--'
        }
    }
}

# --- Define the Parameter Grid for Robustness Checks ---
# This dictionary specifies which parameters to vary and what values to test.
param_grid = {
    'analysis_params.information_theory.discretization_bins': [8, 10, 12],
    'analysis_params.sliding_window.window_size_days': [189, 252] # Approx. 9 months and 12 months
}
```
**Discursive Text:** We have created two dictionaries. `study_config` defines our baseline analysis. `param_grid` tells the robustness analysis to re-run the entire pipeline for every combination of the specified parameters (in this case, 3 bin sizes * 2 window sizes = 6 total runs). The dot-notation in the keys of `param_grid` allows the function to navigate the nested `study_config` dictionary.

--

#### **Step 3: Executing the Master Pipeline**

With our data and configurations prepared, we can now execute the entire analysis with a single function call. We will enable all analyses and request the superior HTML report format.

```python
# --- Execute the Master Pipeline ---
# This single function call runs all the analysis, interpretation, and reporting.
# We set the optional flags to True to perform all available analyses.
# We choose 'html' for the richest report format.

# Note: This is a computationally intensive operation.
master_results, html_report = run_master_pipeline(
    raw_price_df=raw_price_df,
    study_config=study_config,
    param_grid=param_grid,
    report_format='html',
    run_robustness_analysis=True,
    run_error_analysis=True,
    bootstrap_samples=100 # A smaller number for a quick example; 500-1000 is better for rigor.
)
```
**Discursive Text:** The `run_master_pipeline` function is called with all necessary inputs. It will now perform every task we have defined, from validation to bootstrapping. The function will return two objects: `master_results`, a deeply nested dictionary containing every single data artifact and result, and `html_report`, a string containing the full source code of a self-contained HTML report.

--

#### **Step 4: Processing and Displaying the Outputs**

The final step is to use the artifacts returned by the pipeline. We can save the HTML report to a file and programmatically access the results bundle to display specific findings or figures.

```python
# --- Process and Display Outputs ---

# 1. Save the HTML report to a file
# This file can be opened in any web browser to view the full interactive report.
report_filename = "financial_network_analysis_report.html"
with open(report_filename, "w", encoding="utf-8") as f:
    f.write(html_report)
print(f"\nComprehensive HTML report saved to: {report_filename}")

# 2. Programmatically access and display a specific result
# For example, let's display the table of robustly identified persistent TE links.
print("\n--- Most Robust Persistent Transfer Entropy Links ---")
robust_te_links = master_results['robustness_analysis']['persistent_links']['transfer_entropy']
print(robust_te_links.head(10))

# 3. Display one of the generated figures in an interactive environment (e.g., Jupyter)
# This demonstrates how to access the matplotlib Figure objects.
print("\nDisplaying the static Kramers-Moyal network graph...")
km_network_fig = master_results['main_analysis']['visualizations']['static_km_graph']

# In a Jupyter Notebook, simply having `km_network_fig` as the last line of a cell
# would display the plot. Alternatively, you can explicitly show it or save it.
# km_network_fig.show() # This would open the plot in a new window.
km_network_fig.savefig("km_network_graph.png", dpi=300)
print("Saved 'km_network_graph.png'")

# Close all figures to free up memory
plt.close('all')
```
**Discursive Text:** This final block demonstrates the utility of the pipeline's outputs. We save the rich HTML report for easy sharing and viewing. We then show how to programmatically dive into the `master_results` dictionary to extract a specific piece of data—the table of robust TE links—for further analysis or display. Finally, we access one of the `matplotlib.Figure` objects and save it to a high-quality PNG file, suitable for inclusion in a presentation or academic paper. This showcases the dual nature of the output: a human-readable report and a machine-readable data bundle for deeper, customized exploration.

In [None]:
# Task 1: Parameter and Data Validation

def _validate_dataframe(df: pd.DataFrame) -> List[str]:
    """
    Validates the structure and integrity of the input price DataFrame.

    This helper function performs a series of checks to ensure the DataFrame
    is suitable for the subsequent financial analysis pipeline. It verifies
    the index type, data types of columns, and the presence of missing values.

    Args:
        df (pd.DataFrame): The input DataFrame, expected to have a
                           datetime index and numeric price columns.

    Returns:
        List[str]: A list of error messages. An empty list indicates
                   successful validation.
    """
    # Initialize a list to collect error messages.
    errors = []

    # Step 1.1: Verify the index is a pandas DatetimeIndex.
    # This is crucial for all time series operations.
    if not isinstance(df.index, pd.DatetimeIndex):
        # If the index is not a DatetimeIndex, append a specific error message.
        errors.append("DataFrame index is not a DatetimeIndex. Please convert using pd.to_datetime().")
        # Return immediately as other checks depend on a valid DatetimeIndex.
        return errors

    # Step 1.2: Verify the index is sorted chronologically.
    # A non-monotonic index can lead to incorrect shift/diff operations.
    if not df.index.is_monotonic_increasing:
        # If the index is not sorted, append a specific error message.
        errors.append("DataFrame index is not monotonically increasing. Please sort the index.")

    # Step 1.3: Check that all columns contain numeric data.
    # Non-numeric data would cause failures in mathematical operations like log-returns.
    non_numeric_cols = df.select_dtypes(exclude=np.number).columns
    # Check if there are any non-numeric columns.
    if not non_numeric_cols.empty:
        # If non-numeric columns are found, format a detailed error message.
        errors.append(
            f"DataFrame contains non-numeric columns: {list(non_numeric_cols)}. "
            "All columns must be of a numeric type (e.g., float64, int64)."
        )

    # Step 1.4: Ensure there are no missing values (NaNs) in the DataFrame.
    # Missing values must be handled explicitly in the preprocessing stage, not passed in.
    if df.isnull().sum().sum() > 0:
        # If any NaN values are present, identify which columns contain them.
        nan_counts = df.isnull().sum()
        cols_with_nan = nan_counts[nan_counts > 0].index.tolist()
        # Append a detailed error message.
        errors.append(f"DataFrame contains missing values (NaNs) in columns: {cols_with_nan}.")

    # Return the list of all found errors.
    return errors

def _validate_study_config(config: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]:
    """
    Validates the structure, types, and values of the configuration dictionary.

    This function recursively checks the configuration against a predefined
    schema. It validates data types, value ranges, and converts date strings
    to pd.Timestamp objects for downstream consistency.

    Args:
        config (Dict[str, Any]): The study configuration dictionary.

    Returns:
        Tuple[Dict[str, Any], List[str]]: A tuple containing the validated and
                                         type-converted configuration dictionary
                                         and a list of error messages.
    """
    # Create a deep copy of the configuration to modify (e.g., convert dates).
    validated_config = deepcopy(config)
    # Initialize a list to collect error messages.
    errors = []

    # Define the expected schema for validation. This is the "golden" structure.
    schema = {
        'data_params': {
            'date_range': {'start': str, 'end': str},
            'crisis_periods': dict
        },
        'analysis_params': {
            'information_theory': {
                'discretization_bins': int,
                'discretization_strategy': str
            },
            'sliding_window': {
                'window_size_days': int,
                'step_size_days': int
            },
            'kramers_moyal': {
                'model_assumption': str
            }
        },
        'visualization_params': dict # Further checks are not schema-based
    }

    # Helper function to recursively check the schema.
    def check_schema(conf_level, schema_level, path):
        # Check if the current configuration level is a dictionary.
        if not isinstance(conf_level, dict):
            errors.append(f"Configuration path '{' -> '.join(path)}' must be a dictionary.")
            return

        # Iterate through the keys in the schema for the current level.
        for key, expected_type in schema_level.items():
            # Check if a required key is missing.
            if key not in conf_level:
                errors.append(f"Missing required key '{key}' in path '{' -> '.join(path)}'.")
                continue

            # If the expected type is a dictionary, recurse.
            if expected_type == dict:
                check_schema(conf_level[key], schema[key], path + [key])
            # Otherwise, check if the value has the correct type.
            elif not isinstance(conf_level[key], expected_type):
                errors.append(
                    f"Invalid type for key '{key}' in path '{' -> '.join(path)}'. "
                    f"Expected {expected_type.__name__}, got {type(conf_level[key]).__name__}."
                )

    # Start the recursive schema check from the top level.
    check_schema(validated_config, schema, [])

    # If schema validation fails, return early.
    if errors:
        return validated_config, errors

    # --- Value-specific validations ---

    # Validate 'data_params'
    try:
        # Convert date range strings to Timestamp objects.
        start_date_str = validated_config['data_params']['date_range']['start']
        end_date_str = validated_config['data_params']['date_range']['end']
        start_date = pd.to_datetime(start_date_str)
        end_date = pd.to_datetime(end_date_str)

        # Check if the start date is before the end date.
        if start_date >= end_date:
            errors.append("In 'date_range', 'start' date must be before 'end' date.")

        # Update the config with Timestamp objects.
        validated_config['data_params']['date_range']['start'] = start_date
        validated_config['data_params']['date_range']['end'] = end_date

        # Validate crisis periods.
        for name, period in validated_config['data_params']['crisis_periods'].items():
            # Convert crisis period dates to Timestamps.
            crisis_start = pd.to_datetime(period['start'])
            crisis_end = pd.to_datetime(period['end'])

            # Check for valid temporal order.
            if crisis_start >= crisis_end:
                errors.append(f"In crisis period '{name}', 'start' date must be before 'end' date.")

            # Check if the crisis period is within the main study date range.
            if not (start_date <= crisis_start and crisis_end <= end_date):
                errors.append(f"Crisis period '{name}' is outside the main study 'date_range'.")

            # Update the config with Timestamp objects.
            period['start'] = crisis_start
            period['end'] = crisis_end

    except (ValueError, TypeError) as e:
        # Catch errors from pd.to_datetime if format is wrong.
        errors.append(f"Invalid date format in 'data_params'. Use 'YYYY-MM-DD'. Error: {e}")

    # Validate 'analysis_params'
    # Check discretization_bins.
    bins = validated_config['analysis_params']['information_theory']['discretization_bins']
    if not (isinstance(bins, int) and bins > 0):
        errors.append("'discretization_bins' must be a positive integer.")

    # Check sliding window parameters.
    window_size = validated_config['analysis_params']['sliding_window']['window_size_days']
    step_size = validated_config['analysis_params']['sliding_window']['step_size_days']
    if not (isinstance(window_size, int) and window_size > 0):
        errors.append("'window_size_days' must be a positive integer.")
    if not (isinstance(step_size, int) and step_size > 0):
        errors.append("'step_size_days' must be a positive integer.")
    if window_size < step_size:
        errors.append("'window_size_days' should be greater than or equal to 'step_size_days'.")

    # Return the validated config and any errors found.
    return validated_config, errors

def _cross_validate_inputs(df: pd.DataFrame, config: Dict[str, Any]) -> List[str]:
    """
    Performs cross-validation between the DataFrame and the configuration.

    This function ensures that the provided data is sufficient to perform the
    analysis specified in the configuration, primarily by checking if the
    DataFrame's date range covers the study's required date range.

    Args:
        df (pd.DataFrame): The validated input DataFrame.
        config (Dict[str, Any]): The validated and type-converted configuration
                                 dictionary.

    Returns:
        List[str]: A list of error messages. An empty list indicates
                   successful validation.
    """
    # Initialize a list to collect error messages.
    errors = []

    # Extract the required date range from the configuration.
    required_start = config['data_params']['date_range']['start']
    required_end = config['data_params']['date_range']['end']

    # Extract the available date range from the DataFrame's index.
    available_start = df.index.min()
    available_end = df.index.max()

    # Check if the DataFrame's start date is later than the required start date.
    if available_start > required_start:
        errors.append(
            f"DataFrame data starts on {available_start.date()}, which is after the "
            f"required study start date of {required_start.date()}."
        )

    # Check if the DataFrame's end date is earlier than the required end date.
    if available_end < required_end:
        errors.append(
            f"DataFrame data ends on {available_end.date()}, which is before the "
            f"required study end date of {required_end.date()}."
        )

    # Return the list of all found errors.
    return errors

def validate_inputs(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the validation of all inputs for the financial network analysis.

    This master function validates the input DataFrame and the study
    configuration dictionary by calling specialized helper functions. It ensures
    that all inputs are structurally sound, internally consistent, and
    compatible with each other. If validation is successful, it returns a
    validated and type-converted configuration dictionary. Otherwise, it raises
    a single comprehensive ValueError with all identified issues.

    Args:
        df (pd.DataFrame): A datetime-indexed DataFrame with daily closing
                           prices of multiple assets in its columns.
        config (Dict[str, Any]): A dictionary with study configuration
                                 parameters.

    Returns:
        Dict[str, Any]: The validated configuration dictionary, with date
                        strings converted to pd.Timestamp objects.

    Raises:
        ValueError: If any validation check fails, this error is raised with
                    a detailed list of all identified problems.
        TypeError: If the inputs are not of the expected type.
    """
    # --- Initial Type Checking ---
    # Ensure df is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    # Ensure config is a dictionary.
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Validate DataFrame Structure ---
    # Call the helper function to validate the DataFrame.
    df_errors = _validate_dataframe(df)

    # --- Step 2: Validate Configuration Dictionary ---
    # Call the helper function to validate the configuration.
    # This also returns a config with date strings converted to Timestamps.
    validated_config, config_errors = _validate_study_config(config)

    # --- Step 3: Cross-Validate DataFrame and Configuration ---
    # Initialize cross-validation errors list.
    cross_errors = []
    # Only perform cross-validation if the individual inputs are valid.
    if not df_errors and not config_errors:
        cross_errors = _cross_validate_inputs(df, validated_config)

    # --- Aggregate and Report Errors ---
    # Combine all error messages from all validation steps.
    all_errors = df_errors + config_errors + cross_errors

    # Check if any errors were found.
    if all_errors:
        # If there are errors, compile them into a single, readable error message.
        error_message = "Input validation failed with the following errors:\n"
        # Enumerate each error for clarity.
        for i, error in enumerate(all_errors, 1):
            error_message += f"  {i}. {error}\n"
        # Raise a single ValueError with the comprehensive report.
        raise ValueError(error_message)

    # If no errors were found, print a success message.
    print("Input validation successful. All checks passed.")

    # Return the validated and type-converted configuration dictionary.
    return validated_config


In [None]:
# Task 2: Data Preprocessing

def preprocess_price_data(
    df: pd.DataFrame,
    validated_config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Performs a rigorous, sequential preprocessing of raw financial price data.

    This function transforms a raw, datetime-indexed price DataFrame into a
    clean, stationary DataFrame of log-returns, ready for advanced econometric
    and information-theoretic analysis. The process is executed in a strict,
    non-negotiable order:
    1.  Temporal Filtering: Slices the DataFrame to the exact date range
        specified in the configuration.
    2.  Data Cleaning: Replaces infinities and systematically fills missing
        values using forward-fill, then backward-fill.
    3.  Log-Return Calculation: Transforms the non-stationary price series into
        a more stationary log-return series, checking for data integrity
        (non-positive prices) before transformation.
    4.  Finalization: Removes any remaining NaN values to produce a perfectly
        clean and aligned final dataset.

    Args:
        df (pd.DataFrame): A validated, datetime-indexed DataFrame with daily
                           closing prices of multiple assets in its columns.
        validated_config (Dict[str, Any]): The validated configuration dictionary
                                           returned from the `validate_inputs`
                                           function.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
        - A pandas DataFrame containing the clean, stationary log-returns.
        - A metadata dictionary detailing the preprocessing steps, including
          observation counts at each stage.

    Raises:
        ValueError: If non-positive prices are detected, which makes log-return
                    calculation impossible.
        ValueError: If any column consists entirely of NaN values after cleaning,
                    indicating a severe data quality issue.
    """
    # --- Input Validation ---
    # Ensure df is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    # Ensure validated_config is a dictionary.
    if not isinstance(validated_config, dict):
        raise TypeError("Input 'validated_config' must be a dictionary.")

    # Initialize a dictionary to store metadata about the preprocessing steps.
    metadata = {}
    # Record the initial number of observations.
    metadata['initial_observations'] = len(df)

    # --- Step 1: Temporal Filtering ---
    # Extract the required start and end dates from the validated configuration.
    start_date = validated_config['data_params']['date_range']['start']
    end_date = validated_config['data_params']['date_range']['end']

    # Create a copy of the sliced DataFrame to avoid SettingWithCopyWarning.
    # .loc is used for precise, label-based slicing on the DatetimeIndex.
    processed_df = df.loc[start_date:end_date].copy()

    # Record the number of observations after filtering to the study's date range.
    metadata['observations_after_filtering'] = len(processed_df)
    metadata['filtered_date_range'] = {'start': processed_df.index.min(), 'end': processed_df.index.max()}

    # --- Step 2: Comprehensive Data Cleaning ---
    # The cleaning sequence is critical and must be followed precisely.

    # First, replace any infinite values with NaN. Infinite values are data errors.
    processed_df.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Second, forward-fill NaN values. This carries the last valid observation
    # forward, which is standard practice for handling market holidays and weekends.
    processed_df.fillna(method='ffill', inplace=True)

    # Third, backward-fill remaining NaN values. This handles any missing data
    # at the very beginning of the series that forward-fill could not resolve.
    # Issue: It may introduce a look-ahead bias to the data so it has been commented out.
    # processed_df.fillna(method='bfill', inplace=True)

    # After filling, check if any column is still entirely NaN.
    if processed_df.isnull().all().any():
        # This indicates a severe data issue (e.g., an asset with no data in the range).
        bad_cols = processed_df.columns[processed_df.isnull().all()].tolist()
        raise ValueError(
            f"Data cleaning failed. The following columns consist entirely of "
            f"missing values within the specified date range: {bad_cols}"
        )

    # --- Step 3: Log-Return Calculation ---
    # Before taking the logarithm, rigorously check for non-positive prices.
    # The logarithm is undefined for zero or negative values.
    if (processed_df <= 0).any().any():
        # If found, raise a descriptive error to halt execution.
        raise ValueError(
            "Preprocessing failed: DataFrame contains zero or negative prices, "
            "which makes log-return calculation impossible. Please clean the source data."
        )

    # Calculate log-returns using the numerically stable formula: r_t = log(P_t / P_{t-1})
    # This is equivalent to log(P_t) - log(P_{t-1})
    log_returns_df = np.log(processed_df / processed_df.shift(1))

    # --- Step 4: Final Data Structure Creation ---
    # The first row of the log-returns DataFrame will be NaN due to the .shift(1) operation.
    # .dropna() removes this row and any other rows with NaNs that might arise from
    # misaligned trading calendars, ensuring a perfectly rectangular dataset.
    final_df = log_returns_df.dropna()

    # Record the final number of observations after all processing.
    metadata['final_observations'] = len(final_df)
    metadata['observations_dropped'] = metadata['initial_observations'] - len(final_df)

    # Final sanity check: ensure the resulting DataFrame is completely free of non-finite values.
    # This is a critical assertion for a production-grade system.
    if not np.isfinite(final_df).all().all():
        # This error should theoretically never be reached if the logic is correct,
        # but serves as a final, critical guardrail.
        raise RuntimeError(
            "A critical error occurred: the final preprocessed DataFrame "
            "contains non-finite values (NaN or infinity)."
        )

    # Print a success message with key metadata.
    print(
        f"Data preprocessing successful. Final dataset contains {metadata['final_observations']} "
        f"observations from {final_df.index.min().date()} to {final_df.index.max().date()}."
    )

    # Return the final, clean DataFrame of log-returns and the metadata dictionary.
    return final_df, metadata


In [None]:
# Task 3: Stationarity Check

def _run_adf_test(
    series: pd.Series,
    significance_level: float = 0.05
) -> Dict[str, Union[float, int, bool, str]]:
    """
    Executes a single Augmented Dickey-Fuller (ADF) test on a time series.

    This helper function provides a standardized, rigorous wrapper around the
    statsmodels `adfuller` implementation. It uses a conservative regression
    setting ('ct' - constant and trend) and data-driven lag selection ('AIC')
    to ensure robust testing for financial time series. The results are
    packaged into a dictionary with clear, human-readable interpretations.

    Args:
        series (pd.Series): The time series data to be tested for stationarity.
        significance_level (float): The p-value threshold for rejecting the
                                    null hypothesis. Defaults to 0.05.

    Returns:
        Dict[str, Union[float, int, bool, str]]: A dictionary containing the
        raw test outputs and a clear interpretation, including:
        - 'adf_statistic': The computed test statistic.
        - 'p_value': The p-value of the test.
        - 'lags_used': The number of lags selected for the regression.
        - 'is_stationary': A boolean indicating if the series is stationary
                           at the given significance level.
        - 'interpretation': A human-readable string explaining the result.
    """
    # --- Input Validation ---
    # Ensure the input is a pandas Series.
    if not isinstance(series, pd.Series):
        raise TypeError("Input 'series' must be a pandas Series.")
    # Ensure the series is not empty.
    if series.empty:
        raise ValueError("Input 'series' cannot be empty.")

    # Execute the Augmented Dickey-Fuller test.
    # regression='ct': Include constant and trend in the regression. This is a
    #                  conservative choice suitable for financial price series
    #                  which often exhibit drift and trend.
    # autolag='AIC': Automatically select the optimal number of lags based on
    #                the Akaike Information Criterion, making the test adaptive.
    try:
        adf_result = adfuller(series, regression='ct', autolag='AIC')
    except Exception as e:
        # Catch potential errors from the statsmodels library, e.g., with very short series.
        raise RuntimeError(f"ADF test failed for series '{series.name}' with error: {e}")

    # Extract the test statistic and p-value from the results tuple.
    adf_statistic = adf_result[0]
    p_value = adf_result[1]
    lags_used = adf_result[2]

    # Interpret the result based on the p-value.
    # The null hypothesis (H0) of the ADF test is that the series has a unit root (is non-stationary).
    # We reject H0 if the p-value is less than the significance level.
    if p_value < significance_level:
        # If p-value is low, we reject H0 and conclude the series is stationary.
        is_stationary = True
        interpretation = f"Reject H0. Series is likely stationary (p-value: {p_value:.4f})."
    else:
        # If p-value is high, we fail to reject H0 and conclude the series is non-stationary.
        is_stationary = False
        interpretation = f"Fail to reject H0. Series is likely non-stationary (p-value: {p_value:.4f})."

    # Compile the results into a structured dictionary for clear output.
    results_dict = {
        'adf_statistic': adf_statistic,
        'p_value': p_value,
        'lags_used': lags_used,
        'is_stationary': is_stationary,
        'interpretation': interpretation
    }

    return results_dict

def perform_stationarity_analysis(
    price_df: pd.DataFrame,
    log_return_df: pd.DataFrame,
    significance_level: float = 0.05
) -> pd.DataFrame:
    """
    Performs and summarizes stationarity tests for price and log-return series.

    This function systematically applies the Augmented Dickey-Fuller (ADF) test
    to each asset's raw price series and its corresponding log-return series.
    It serves to empirically validate the core econometric assumption that
    prices are non-stationary while log-returns are stationary. The results
    are compiled into a single, easily interpretable DataFrame.

    Args:
        price_df (pd.DataFrame): A clean, datetime-indexed DataFrame of asset
                                 prices. Should align with the log_return_df.
        log_return_df (pd.DataFrame): The DataFrame of log-returns generated
                                      from the price_df.
        significance_level (float): The statistical significance level for the
                                    ADF test. Defaults to 0.05.

    Returns:
        pd.DataFrame: A multi-index DataFrame summarizing the ADF test results
                      for both price and log-return series for each asset.

    Raises:
        TypeError: If inputs are not pandas DataFrames.
        ValueError: If input DataFrames are empty or have misaligned columns.
    """
    # --- Input Validation ---
    if not isinstance(price_df, pd.DataFrame) or not isinstance(log_return_df, pd.DataFrame):
        raise TypeError("Inputs 'price_df' and 'log_return_df' must be pandas DataFrames.")
    if price_df.empty or log_return_df.empty:
        raise ValueError("Input DataFrames cannot be empty.")
    if not all(price_df.columns == log_return_df.columns):
        raise ValueError("Columns of 'price_df' and 'log_return_df' must be identical and in the same order.")

    # Initialize a list to store the results for each asset.
    all_results = []
    # Get the list of assets from the DataFrame columns.
    assets = price_df.columns

    # Iterate through each asset to perform the paired tests.
    for asset in assets:
        # Announce which asset is being tested.
        print(f"--- Running Stationarity Tests for: {asset} ---")

        # --- Test 1: Price Series ---
        # Extract the price series for the current asset.
        price_series = price_df[asset].dropna()
        # Run the ADF test on the price series.
        price_test_results = _run_adf_test(price_series, significance_level)
        print(f"  Price Series: {price_test_results['interpretation']}")

        # --- Test 2: Log-Return Series ---
        # Extract the log-return series for the current asset.
        return_series = log_return_df[asset].dropna()
        # Run the ADF test on the log-return series.
        return_test_results = _run_adf_test(return_series, significance_level)
        print(f"  Log-Return Series: {return_test_results['interpretation']}")

        # Combine the results for the current asset into a single dictionary.
        # This flattened structure is ideal for creating a DataFrame later.
        combined_results = {
            ('Price', 'ADF Statistic'): price_test_results['adf_statistic'],
            ('Price', 'P-Value'): price_test_results['p_value'],
            ('Price', 'Is Stationary'): price_test_results['is_stationary'],
            ('Log-Return', 'ADF Statistic'): return_test_results['adf_statistic'],
            ('Log-Return', 'P-Value'): return_test_results['p_value'],
            ('Log-Return', 'Is Stationary'): return_test_results['is_stationary'],
        }
        # Add the asset name for indexing.
        combined_results['Asset'] = asset
        # Append the asset's results to the master list.
        all_results.append(combined_results)

    # --- Compile Final Report ---
    # Convert the list of dictionaries into a pandas DataFrame.
    results_df = pd.DataFrame(all_results)
    # Set the 'Asset' column as the DataFrame index.
    results_df.set_index('Asset', inplace=True)
    # Convert the flattened column names into a MultiIndex for clarity.
    results_df.columns = pd.MultiIndex.from_tuples(results_df.columns)

    # --- Final Conclusion Summary ---
    # Programmatically verify the expected outcome for all assets.
    prices_are_non_stationary = not results_df[('Price', 'Is Stationary')].any()
    returns_are_stationary = results_df[('Log-Return', 'Is Stationary')].all()

    print("\n--- Stationarity Analysis Summary ---")
    if prices_are_non_stationary and returns_are_stationary:
        # This is the desired outcome, confirming the validity of the log-return transformation.
        print("SUCCESS: As expected, all price series were found to be non-stationary,")
        print("         and all log-return series were found to be stationary.")
        print("         The data is suitable for subsequent analysis.")
    else:
        # This indicates a potential issue with the data or an unexpected market behavior.
        print("WARNING: The stationarity analysis produced unexpected results.")
        if not prices_are_non_stationary:
            print("  - One or more price series were unexpectedly found to be stationary.")
        if not returns_are_stationary:
            print("  - One or more log-return series were unexpectedly found to be non-stationary.")
        print("         Proceed with caution and review the data and results carefully.")

    # Return the final, compiled DataFrame.
    return results_df


In [None]:
# Task 4: Descriptive Statistics

def generate_descriptive_statistics(
    log_return_df: pd.DataFrame,
    validated_config: Dict[str, Any]
) -> Tuple[pd.DataFrame, plt.Figure]:
    """
    Calculates descriptive statistics and visualizes the log-return series.

    This function performs two primary operations on the clean log-return data:
    1.  It computes the first four statistical moments (Mean, Standard
        Deviation, Skewness, and Excess Kurtosis) with high precision, using
        unbiased estimators to ensure statistical rigor and comparability with
        academic literature.
    2.  It generates a publication-quality time series plot for each asset,
        with key crisis periods annotated, providing a clear visual summary of
        the data's behavior over time.

    Args:
        log_return_df (pd.DataFrame): A clean, stationary, datetime-indexed
                                      DataFrame of log-returns.
        validated_config (Dict[str, Any]): The validated configuration dictionary
                                           containing parameters for analysis
                                           and visualization.

    Returns:
        Tuple[pd.DataFrame, plt.Figure]:
        - A pandas DataFrame containing the four calculated statistical
          moments for each asset.
        - A matplotlib Figure object containing the time series subplots for
          each asset's log-returns.

    Raises:
        TypeError: If inputs are not of the expected types.
        ValueError: If the log_return_df is empty.
    """
    # --- Input Validation ---
    if not isinstance(log_return_df, pd.DataFrame):
        raise TypeError("Input 'log_return_df' must be a pandas DataFrame.")
    if not isinstance(validated_config, dict):
        raise TypeError("Input 'validated_config' must be a dictionary.")
    if log_return_df.empty:
        raise ValueError("Input 'log_return_df' cannot be empty.")

    # --- Step 1: Four Moments Calculation ---
    # Use a dictionary to build the statistics DataFrame column by column.
    stats_data = {
        # Calculate the mean return for each asset.
        'Mean': log_return_df.mean(),
        # Calculate the sample standard deviation (volatility). ddof=1 is crucial for sample std.
        'Standard Deviation': log_return_df.std(ddof=1),
        # Calculate the unbiased sample skewness using scipy.stats for precision.
        # Formula: g1 = k3 / k2^(3/2)
        'Skewness': scipy.stats.skew(log_return_df, bias=False),
        # Calculate the unbiased sample excess kurtosis (Fisher's definition).
        # Formula: g2 = k4 / k2^2
        # 'fisher=True' subtracts 3 to measure kurtosis relative to a normal distribution.
        'Excess Kurtosis': scipy.stats.kurtosis(log_return_df, bias=False, fisher=True)
    }
    # Create the final statistics DataFrame, transposing it for conventional format (moments as rows).
    stats_df = pd.DataFrame(stats_data).T

    # --- Step 2 & 3: Time Series Visualization ---
    # Extract assets and visualization parameters from the configuration.
    assets = log_return_df.columns
    num_assets = len(assets)
    viz_params = validated_config['visualization_params']

    # Programmatically determine the subplot grid layout for optimal viewing.
    # Aims for a layout that is as close to square as possible.
    num_cols = int(np.ceil(np.sqrt(num_assets)))
    num_rows = int(np.ceil(num_assets / num_cols))

    # Create the figure and axes objects with specified DPI and size.
    fig, axes = plt.subplots(
        nrows=num_rows,
        ncols=num_cols,
        figsize=(5 * num_cols, 4 * num_rows),
        dpi=viz_params['general']['figure_dpi'],
        sharex=True # Share the x-axis for easier comparison.
    )
    # Flatten the axes array for easy iteration, regardless of grid shape.
    axes = axes.flatten()

    # Iterate through each asset to create its subplot.
    for i, asset in enumerate(assets):
        ax = axes[i]
        # Plot the log-return series for the current asset.
        ax.plot(log_return_df.index, log_return_df[asset], linewidth=0.7, label=asset)

        # Set the title for the subplot with the asset's name.
        ax.set_title(asset, fontsize=viz_params['general']['font_size'] + 2)
        # Set the y-axis label.
        ax.set_ylabel("Log-Return", fontsize=viz_params['general']['font_size'])
        # Add a grid for better readability.
        ax.grid(True, linestyle='--', alpha=0.6)

        # Annotate crisis periods on the subplot.
        for period_name, period_details in validated_config['data_params']['crisis_periods'].items():
            # Use axvspan to create a shaded vertical region for the crisis period.
            ax.axvspan(
                period_details['start'],
                period_details['end'],
                color=viz_params['heatmaps']['crisis_line_color'],
                alpha=0.15, # Use transparency to avoid obscuring the data.
                label=f"{period_details['label']}" if i == 0 else "" # Label only on the first plot.
            )

    # Clean up empty subplots if the number of assets is not a perfect square.
    for j in range(num_assets, len(axes)):
        axes[j].set_visible(False)

    # Add a single legend for the entire figure to avoid redundancy.
    # Handles are collected from the first subplot.
    handles, labels = axes[0].get_legend_handles_labels()
    # Use a dictionary to get unique labels for the legend.
    by_label = dict(zip(labels, handles))
    fig.legend(by_label.values(), by_label.keys(), loc='upper right', fontsize=viz_params['general']['font_size'])

    # Apply a tight layout to optimize spacing between subplots.
    fig.tight_layout(rect=[0, 0, 0.9, 1]) # Adjust rect to make space for the figure legend.

    # Set a main title for the entire figure.
    fig.suptitle("Daily Log-Returns of Financial Assets", fontsize=viz_params['general']['font_size'] + 4, y=0.99)

    # Print a confirmation message.
    print("Descriptive statistics calculated and visualization generated successfully.")

    # Return the statistics DataFrame and the matplotlib Figure object.
    return stats_df, fig


In [None]:
# Task 5: Transfer Entropy (TE) Analysis

def _discretize_data(
    log_return_df: pd.DataFrame,
    num_bins: int
) -> Tuple[pd.DataFrame, Dict[str, np.ndarray]]:
    """
    Discretizes continuous log-return data into a specified number of bins.

    This helper function uses equal-frequency binning (quantiles) to transform
    continuous time series data into discrete integer labels. This is a
    necessary prerequisite for estimating probability distributions for
    Transfer Entropy calculation.

    Args:
        log_return_df (pd.DataFrame): A clean, stationary DataFrame of log-returns.
        num_bins (int): The number of discrete bins to create.

    Returns:
        Tuple[pd.DataFrame, Dict[str, np.ndarray]]:
        - A pandas DataFrame of the same shape as the input, containing
          integer bin labels (from 0 to num_bins-1).
        - A dictionary mapping each asset name to the computed bin edges.

    Raises:
        ValueError: If the number of unique values in any series is less
                    than the number of bins, making quantile-based
                    discretization impossible.
    """
    # Initialize a DataFrame to store the binned data.
    binned_df = pd.DataFrame(index=log_return_df.index)
    # Initialize a dictionary to store the bin edges for each asset.
    bin_edges = {}

    # Iterate over each asset column in the log-return DataFrame.
    for col in log_return_df.columns:
        # Pre-validation: Ensure meaningful discretization is possible.
        if log_return_df[col].nunique() < num_bins:
            raise ValueError(
                f"Cannot create {num_bins} unique bins for asset '{col}' as it "
                f"only has {log_return_df[col].nunique()} unique values. "
                "Consider reducing the number of bins."
            )

        # Use pandas.qcut for equal-frequency binning.
        # `labels=False` returns integer bin identifiers.
        # `duplicates='drop'` handles non-unique bin edges by merging them.
        binned_data, edges = pd.qcut(
            log_return_df[col],
            q=num_bins,
            labels=False,
            duplicates='drop',
            retbins=True # Return the computed bin edges.
        )
        # Store the resulting binned series in the new DataFrame.
        binned_df[col] = binned_data
        # Store the bin edges for reproducibility and interpretation.
        bin_edges[col] = edges

    # The result of qcut can be float if there are NaNs, so cast to a nullable integer type.
    return binned_df.astype('Int64'), bin_edges

def calculate_transfer_entropy(
    log_return_df: pd.DataFrame,
    validated_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Calculates the Transfer Entropy (TE) matrix with a direct formula implementation.

    This function implements the complete TE pipeline, quantifying the directed
    flow of information between all pairs of assets. This revised version
    adheres strictly to the defining formula for TE (Eq. 2 in the source paper)
    for maximum clarity and methodological fidelity.

    The process is as follows:
    1.  Discretizes the continuous log-return data into integer-labeled bins.
    2.  Constructs the 3D joint probability distribution P(i_t+1, i_t, j_t).
    3.  Explicitly calculates all required marginal probabilities.
    4.  Calculates the conditional probabilities P(i_t+1|i_t, j_t) and P(i_t+1|i_t)
        using numerically stable division.
    5.  Computes the log-ratio of these conditional probabilities.
    6.  Calculates the final TE value as the expectation of this log-ratio.

    Args:
        log_return_df (pd.DataFrame): A clean, stationary, datetime-indexed
                                      DataFrame of log-returns.
        validated_config (Dict[str, Any]): The validated configuration dictionary.

    Returns:
        pd.DataFrame: An N x N DataFrame representing the Transfer Entropy
                      matrix, where N is the number of assets. The element
                      at (row_i, col_j) is the TE from asset j to asset i.
    """
    # --- Input Validation ---
    if not isinstance(log_return_df, pd.DataFrame):
        raise TypeError("Input 'log_return_df' must be a pandas DataFrame.")
    if log_return_df.empty:
        raise ValueError("Input 'log_return_df' cannot be empty.")

    # --- Step 1: Discretization ---
    # Extract the number of bins from the configuration.
    num_bins = validated_config['analysis_params']['information_theory']['discretization_bins']
    # Call the helper function to discretize the data.
    binned_df, _ = _discretize_data(log_return_df, num_bins)

    # Get the list of assets and the number of assets.
    assets = log_return_df.columns
    num_assets = len(assets)
    # Define a small constant for numerical stability.
    epsilon = 1e-15

    # Initialize the N x N Transfer Entropy matrix with zeros.
    te_matrix = pd.DataFrame(np.zeros((num_assets, num_assets)), index=assets, columns=assets)

    # Iterate over every ordered pair of assets (i, j) to calculate TE from j to i.
    for i_idx, i_asset in enumerate(assets):  # i is the target series
        for j_idx, j_asset in enumerate(assets):  # j is the source series
            # TE from a series to itself is not calculated.
            if i_asset == j_asset:
                continue

            print(f"Calculating Transfer Entropy from {j_asset} -> {i_asset}...")

            # Prepare the data vectors for i_{t+1}, i_t, and j_t.
            i_future = binned_df[i_asset].values[1:]
            i_present = binned_df[i_asset].values[:-1]
            j_present = binned_df[j_asset].values[:-1]
            sample_data = np.stack([i_future, i_present, j_present], axis=1)

            # --- Step 2: Estimate 3D Joint Probability Distribution ---
            # P(i_{t+1}, i_t, j_t)
            joint_counts, _ = np.histogramdd(
                sample_data, bins=num_bins, range=[(-0.5, num_bins - 0.5)] * 3
            )
            p_joint_3d = joint_counts / np.sum(joint_counts)

            # --- Step 3: Explicitly Calculate All Required Marginal Probabilities ---
            # P(i_t, j_t) - sum over the i_{t+1} axis (axis 0)
            p_ip_jp = p_joint_3d.sum(axis=0)
            # P(i_{t+1}, i_t) - sum over the j_t axis (axis 2)
            p_if_ip = p_joint_3d.sum(axis=2)
            # P(i_t) - sum over the j_t axis (axis 1) of P(i_t, j_t)
            p_ip = p_ip_jp.sum(axis=1)

            # --- Step 4: Calculate Conditional Probabilities with Numerical Stability ---
            # P(i_{t+1} | i_t, j_t) = P(i_{t+1}, i_t, j_t) / P(i_t, j_t)
            # Reshape denominator for broadcasting: (B, B) -> (1, B, B)
            p_ip_jp_reshaped = p_ip_jp[np.newaxis, :, :]
            p_cond_if_given_ip_jp = np.divide(
                p_joint_3d, p_ip_jp_reshaped,
                out=np.zeros_like(p_joint_3d), where=(p_ip_jp_reshaped > epsilon)
            )

            # P(i_{t+1} | i_t) = P(i_{t+1}, i_t) / P(i_t)
            # Reshape denominator for broadcasting: (B,) -> (1, B)
            p_ip_reshaped = p_ip[:, np.newaxis]
            p_cond_if_given_ip = np.divide(
                p_if_ip, p_ip_reshaped,
                out=np.zeros_like(p_if_ip), where=(p_ip_reshaped > epsilon)
            )

            # --- Step 5: Calculate the Log-Ratio of Conditional Probabilities ---
            # log2[ P(i_{t+1}|i_t,j_t) / P(i_{t+1}|i_t) ]
            # Reshape P(i_{t+1}|i_t) for broadcasting: (B, B) -> (B, B, 1)
            p_cond_if_given_ip_reshaped = p_cond_if_given_ip[:, :, np.newaxis]

            # Calculate the ratio of conditional probabilities.
            ratio_of_conds = np.divide(
                p_cond_if_given_ip_jp, p_cond_if_given_ip_reshaped,
                out=np.zeros_like(p_cond_if_given_ip_jp), where=(p_cond_if_given_ip_reshaped > epsilon)
            )

            # Calculate the log2 of the ratio.
            log_ratio = np.log2(
                ratio_of_conds,
                out=np.zeros_like(ratio_of_conds), where=(ratio_of_conds > epsilon)
            )

            # --- Step 6: Compute the Final Summation (Expectation) ---
            # TE = sum( P(i_{t+1}, i_t, j_t) * log_ratio )
            te_value = np.sum(p_joint_3d * log_ratio)

            # Store the calculated TE value in the matrix.
            te_matrix.iloc[i_idx, j_idx] = te_value

    print("\nTransfer Entropy matrix calculation complete.")
    return te_matrix

In [None]:
# Task 6: Kramers-Moyal (KM) Expansion

def calculate_kramers_moyal_drift_matrix(
    log_return_df: pd.DataFrame,
    validated_config: Dict[str, Any] # Retained for future compatibility, not used here.
) -> pd.DataFrame:
    """
    Estimates the linear drift matrix using the Kramers-Moyal expansion.

    This function approximates the deterministic component of the financial
    system's dynamics by calculating the first Kramers-Moyal coefficient,
    the drift matrix A. It assumes a linear relationship where the expected
    change in an asset's return is a linear combination of the current returns
    of all assets in the system.

    The estimation follows the methodology derived from the paper's moment
    conditions (Eq. 7-9), which results in solving the linear matrix equation:
    A = C_yx * C_xx^(-1)
    where:
    - A is the N x N drift coefficient matrix.
    - C_yx is the time-lagged cross-covariance matrix between the increments
      of the returns and the returns themselves: <(x(t+1)-x(t)) * x(t)^T>.
    - C_xx is the standard covariance matrix of the returns: <x(t) * x(t)^T>.

    Args:
        log_return_df (pd.DataFrame): A clean, stationary, datetime-indexed
                                      DataFrame of log-returns.
        validated_config (Dict[str, Any]): The validated configuration dictionary.
                                           (Currently unused but included for
                                           API consistency).

    Returns:
        pd.DataFrame: An N x N DataFrame representing the estimated drift
                      matrix A. The element at (row_i, col_j) represents the
                      linear influence of asset j's return on the expected
                      change in asset i's return.

    Raises:
        TypeError: If the input is not a pandas DataFrame.
        ValueError: If the input DataFrame is empty or has too few rows.
        LinAlgError: If the covariance matrix is singular or ill-conditioned,
                     making a reliable estimation of the drift matrix impossible.
    """
    # --- Input Validation ---
    if not isinstance(log_return_df, pd.DataFrame):
        raise TypeError("Input 'log_return_df' must be a pandas DataFrame.")
    if log_return_df.shape[0] < log_return_df.shape[1] * 2:
        raise ValueError(
            "Input DataFrame has too few observations to reliably estimate "
            "the covariance matrix. Need at least 2 * num_assets rows."
        )

    print("Calculating Kramers-Moyal drift matrix...")

    # --- Step 1: Increment Variable Calculation and Data Alignment ---
    # Let x(t) be the log-return series.
    x_df = log_return_df

    # Calculate the one-step increment series y(t) = x(t+1) - x(t).
    # .diff() calculates x(t) - x(t-1).
    # .shift(-1) aligns this to represent x(t+1) - x(t).
    y_df = x_df.diff().shift(-1)

    # The last row of y_df will be NaN. We must drop the last row from both
    # DataFrames to ensure they are perfectly aligned for moment calculations.
    x_aligned = x_df.iloc[:-1]
    y_aligned = y_df.iloc[:-1]

    # --- Step 2: Moment Matrix Construction ---
    # Convert to numpy arrays for efficient linear algebra.
    x = x_aligned.values
    y = y_aligned.values

    # Get the number of observations (T) and number of assets (N).
    T, N = x.shape

    # Calculate the covariance matrix of the log-returns, C_xx = <x * x^T>.
    # We use the direct formula (1/T) * x.T @ x for consistency with the paper's derivation.
    # This is equivalent to pandas .cov() with ddof=0, but more explicit.
    c_xx = (x.T @ x) / T

    # Calculate the time-lagged cross-covariance matrix, C_yx = <y * x^T>.
    c_yx = (y.T @ x) / T

    # --- Step 3 & 4: Linear System Solution and Drift Matrix Assembly ---
    # Before solving, check if the covariance matrix C_xx is well-conditioned.
    # A high condition number indicates multicollinearity, making the inverse unstable.
    condition_number = np.linalg.cond(c_xx)
    # Define a threshold for an ill-conditioned matrix.
    # sys.float_info.epsilon is the machine epsilon for float precision.
    ill_condition_threshold = 1 / np.finfo(float).eps

    if condition_number > ill_condition_threshold:
        # If the matrix is ill-conditioned, raise a linear algebra error.
        raise LinAlgError(
            f"Covariance matrix is singular or ill-conditioned "
            f"(condition number: {condition_number:.2e}). The drift matrix "
            "cannot be reliably estimated due to high asset correlation."
        )

    # Solve the matrix equation A = C_yx * C_xx^(-1) for the drift matrix A.
    # Using np.linalg.solve(C_xx.T, C_yx.T).T is a robust way to compute this.
    # It solves A * C_xx = C_yx, which is equivalent to A = C_yx * C_xx_inv.
    drift_matrix_values = np.linalg.solve(c_xx.T, c_yx.T).T

    # Convert the resulting numpy array back to a labeled pandas DataFrame.
    drift_matrix_df = pd.DataFrame(
        drift_matrix_values,
        index=log_return_df.columns,
        columns=log_return_df.columns
    )

    print("Kramers-Moyal drift matrix calculation complete.")

    # Return the final, labeled drift matrix.
    return drift_matrix_df


In [None]:
# Task 7: Time-Resolved Analysis

def perform_time_resolved_analysis(
    log_return_df: pd.DataFrame,
    validated_config: Dict[str, Any]
) -> Dict[str, Dict[pd.Timestamp, pd.DataFrame]]:
    """
    Performs time-resolved TE and KM analysis using a sliding window approach.

    This function brings a dynamic dimension to the analysis by repeatedly
    applying the static Transfer Entropy and Kramers-Moyal calculations on
    overlapping windows of the data. This process generates a time series of
    network matrices, revealing the evolution of market structure and
    interdependencies over time.

    The process is as follows:
    1.  A sliding window is defined by size and step parameters from the config.
    2.  The window moves sequentially through the log-return time series.
    3.  For each window, it robustly attempts to calculate both the TE and KM
        drift matrices using the previously defined functions.
    4.  Failures within a single window (e.g., due to ill-conditioned data)
        are caught and logged, allowing the analysis to continue.
    5.  Results are stored in a dictionary, indexed by the end-date of each
        window, creating a time-stamped history of network snapshots.

    Args:
        log_return_df (pd.DataFrame): A clean, stationary, datetime-indexed
                                      DataFrame of log-returns.
        validated_config (Dict[str, Any]): The validated configuration dictionary
                                           containing sliding window parameters.

    Returns:
        Dict[str, Dict[pd.Timestamp, pd.DataFrame]]: A dictionary with two keys,
        'transfer_entropy' and 'kramers_moyal'. Each key maps to another
        dictionary where keys are timestamps (window end dates) and values
        are the corresponding N x N network matrices (as pd.DataFrames).
        A value may be None if the calculation for that window failed.
    """
    # --- Input Validation ---
    if not isinstance(log_return_df, pd.DataFrame):
        raise TypeError("Input 'log_return_df' must be a pandas DataFrame.")
    if not isinstance(validated_config, dict):
        raise TypeError("Input 'validated_config' must be a dictionary.")

    # --- Step 1: Sliding Window Framework Setup ---
    # Extract sliding window parameters from the configuration.
    window_params = validated_config['analysis_params']['sliding_window']
    window_size = window_params['window_size_days']
    step_size = window_params['step_size_days']

    # Get the total number of observations available.
    total_observations = len(log_return_df)

    # Validate that the window size is not larger than the total dataset.
    if window_size > total_observations:
        raise ValueError(
            f"Window size ({window_size}) is larger than the total number of "
            f"observations ({total_observations})."
        )

    # Initialize dictionaries to store the time-resolved results.
    # The keys will be the timestamp of the window's end date.
    time_resolved_results = {
        'transfer_entropy': {},
        'kramers_moyal': {}
    }

    # --- Step 2 & 3: Iterative Computation and Temporal Storage ---
    # Define the start and end points for the sliding window loop.
    start_indices = range(0, total_observations - window_size + 1, step_size)

    print(f"Starting time-resolved analysis with window size {window_size} and step size {step_size}...")
    # Use tqdm for a user-friendly progress bar during the long computation.
    for i in tqdm(start_indices, desc="Processing Sliding Windows"):
        # Define the slice for the current window using integer location.
        window_start_idx = i
        window_end_idx = i + window_size
        window_df = log_return_df.iloc[window_start_idx:window_end_idx]

        # The timestamp for this window's results is its end date.
        window_timestamp = window_df.index[-1]

        # --- Calculate Transfer Entropy for the current window ---
        try:
            # Call the previously defined function on the window's data.
            te_matrix = calculate_transfer_entropy(window_df, validated_config)
            # Store the resulting matrix with its corresponding timestamp.
            time_resolved_results['transfer_entropy'][window_timestamp] = te_matrix
        except Exception as e:
            # If any error occurs (e.g., not enough unique values for binning),
            # log the error and store None for this window.
            print(f"\nWarning: TE calculation failed for window ending {window_timestamp.date()}: {e}")
            time_resolved_results['transfer_entropy'][window_timestamp] = None

        # --- Calculate Kramers-Moyal Drift Matrix for the current window ---
        try:
            # Call the previously defined function on the window's data.
            km_matrix = calculate_kramers_moyal_drift_matrix(window_df, validated_config)
            # Store the resulting matrix with its corresponding timestamp.
            time_resolved_results['kramers_moyal'][window_timestamp] = km_matrix
        except LinAlgError as e:
            # Specifically catch linear algebra errors (e.g., singular matrix).
            # This is a common issue in windows with low volatility or high correlation.
            print(f"\nWarning: KM calculation failed for window ending {window_timestamp.date()}: {e}")
            time_resolved_results['kramers_moyal'][window_timestamp] = None
        except Exception as e:
            # Catch any other unexpected errors.
            print(f"\nWarning: An unexpected error occurred in KM calculation for window ending {window_timestamp.date()}: {e}")
            time_resolved_results['kramers_moyal'][window_timestamp] = None

    # --- Step 4: Temporal Dynamics Extraction (Implicit) ---
    # The returned dictionary of matrices is the primary 3D data structure.
    # Further transformation into a 2D DataFrame of time series can be done
    # as a separate post-processing step, providing flexibility.
    # This function's core responsibility is generating the time-stamped matrices.

    print("\nTime-resolved analysis complete.")

    # Return the final dictionary containing the time series of network matrices.
    return time_resolved_results


In [None]:
# Task 8: Crisis Period Analysis

def analyze_crisis_periods(
    time_resolved_results: Dict[str, Dict[pd.Timestamp, pd.DataFrame]],
    validated_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Analyzes and quantifies the impact of specified crisis periods on network structures.

    This function ingests the time-resolved network matrices and systematically
    compares the network's average behavior during defined crisis periods against
    a "normal" baseline period. It performs three key steps:
    1.  Partitions the time-stamped matrices into distinct sets for each crisis
        period and a baseline (non-crisis) period.
    2.  Aggregates the matrices within each period by calculating the element-wise
        mean, yielding an average network snapshot for each state.
    3.  Calculates the percentage change of the crisis-period averages relative
        to the baseline average, quantifying the magnitude of the crisis impact.

    Args:
        time_resolved_results (Dict[str, Dict[pd.Timestamp, pd.DataFrame]]):
            The output from `perform_time_resolved_analysis`, containing the
            time series of TE and KM matrices.
        validated_config (Dict[str, Any]): The validated configuration dictionary,
                                           containing crisis period definitions.

    Returns:
        Dict[str, Any]: A nested dictionary containing the analysis results.
        Structure:
        {
            'transfer_entropy': {
                'baseline': {'avg_matrix': pd.DataFrame, 'num_windows': int},
                'crisis_period_1': {'avg_matrix': pd.DataFrame, 'pct_change': pd.DataFrame, 'num_windows': int},
                ...
            },
            'kramers_moyal': { ... }
        }

    Raises:
        ValueError: If no valid windows are found for the baseline or any
                    defined crisis period.
    """
    # --- Input Validation ---
    if not isinstance(time_resolved_results, dict) or not all(k in time_resolved_results for k in ['transfer_entropy', 'kramers_moyal']):
        raise TypeError("time_resolved_results must be a dictionary with keys 'transfer_entropy' and 'kramers_moyal'.")
    if not isinstance(validated_config, dict):
        raise TypeError("validated_config must be a dictionary.")

    print("Starting crisis period analysis...")
    # The main dictionary to store all analysis results.
    analysis_output = {}
    # Define a small epsilon for safe division when calculating percentage change.
    epsilon = 1e-9

    # Get the crisis period definitions from the configuration.
    crisis_periods_config = validated_config['data_params']['crisis_periods']

    # Iterate over each type of analysis (TE and KM).
    for analysis_type, results_dict in time_resolved_results.items():
        print(f"--- Analyzing: {analysis_type.replace('_', ' ').title()} ---")

        # Initialize storage for this analysis type.
        analysis_output[analysis_type] = {}

        # --- Step 1: Partition Windows into Crisis and Baseline Sets ---
        # First, identify all timestamps that belong to any crisis period.
        all_crisis_timestamps = set()
        for period_details in crisis_periods_config.values():
            for ts in results_dict.keys():
                if period_details['start'] <= ts <= period_details['end']:
                    all_crisis_timestamps.add(ts)

        # The baseline consists of all timestamps NOT in any crisis period.
        baseline_timestamps = [ts for ts in results_dict.keys() if ts not in all_crisis_timestamps]

        # Collect the actual matrix data, filtering out any None values from failed window calculations.
        baseline_matrices = [results_dict[ts] for ts in baseline_timestamps if results_dict[ts] is not None]

        # --- Step 2: Aggregate Baseline Period ---
        if not baseline_matrices:
            raise ValueError(f"No valid data windows found for the baseline period in '{analysis_type}' analysis.")

        # Calculate the average baseline matrix by taking the mean along the temporal axis (axis=0).
        avg_baseline_matrix = np.mean(np.stack(baseline_matrices, axis=0), axis=0)
        # Convert the numpy array back to a labeled DataFrame.
        avg_baseline_df = pd.DataFrame(avg_baseline_matrix, index=baseline_matrices[0].index, columns=baseline_matrices[0].columns)

        # Store the baseline results.
        analysis_output[analysis_type]['baseline'] = {
            'avg_matrix': avg_baseline_df,
            'num_windows': len(baseline_matrices)
        }
        print(f"  Baseline: Calculated average from {len(baseline_matrices)} windows.")

        # --- Step 2 & 3: Aggregate Crisis Periods and Calculate Percentage Change ---
        # Now, process each defined crisis period individually.
        for period_name, period_details in crisis_periods_config.items():
            # Identify timestamps for this specific crisis period.
            crisis_timestamps = [
                ts for ts in results_dict.keys()
                if period_details['start'] <= ts <= period_details['end']
            ]
            # Collect the matrices, again filtering out Nones.
            crisis_matrices = [results_dict[ts] for ts in crisis_timestamps if results_dict[ts] is not None]

            if not crisis_matrices:
                print(f"  Warning: No valid data windows found for crisis period '{period_name}'. Skipping analysis for this period.")
                continue

            # Calculate the average crisis matrix.
            avg_crisis_matrix = np.mean(np.stack(crisis_matrices, axis=0), axis=0)
            avg_crisis_df = pd.DataFrame(avg_crisis_matrix, index=crisis_matrices[0].index, columns=crisis_matrices[0].columns)

            # Calculate percentage change: 100 * (crisis - baseline) / baseline
            # Use robust division to handle cases where the baseline value is near zero.
            numerator = 100 * (avg_crisis_matrix - avg_baseline_matrix)
            denominator = avg_baseline_matrix

            # Create a mask to perform division only where the denominator is not close to zero.
            where_safe = np.abs(denominator) > epsilon
            # Initialize output array for percentage change.
            pct_change_matrix = np.zeros_like(numerator)
            # Perform the division only on the safe elements.
            np.divide(numerator, denominator, out=pct_change_matrix, where=where_safe)

            # Convert to a labeled DataFrame.
            pct_change_df = pd.DataFrame(pct_change_matrix, index=avg_baseline_df.index, columns=avg_baseline_df.columns)

            # Store all results for this crisis period.
            analysis_output[analysis_type][period_name] = {
                'avg_matrix': avg_crisis_df,
                'pct_change_vs_baseline': pct_change_df,
                'num_windows': len(crisis_matrices)
            }
            print(f"  Crisis '{period_details['label']}': Calculated average from {len(crisis_matrices)} windows.")

    print("\nCrisis period analysis complete.")
    return analysis_output


In [None]:
# Task 9: Network Visualization

def plot_matrix_heatmap(
    matrix_df: pd.DataFrame,
    title: str,
    validated_config: Dict[str, Any],
    is_diverging: bool = False
) -> plt.Figure:
    """
    Generates a professional, publication-quality heatmap from a matrix.

    This function creates a heatmap visualization of an N x N matrix, with
    features designed for maximum clarity and analytical insight:
    -   Intelligently selects a sequential or diverging colormap.
    -   Clamps the color bar range to percentiles to prevent outliers from
        dominating the color scale.
    -   Annotates each cell with its numerical value, ensuring text is readable
        against any background color.
    -   All styling (DPI, fonts, colors) is driven by the configuration dict.

    Args:
        matrix_df (pd.DataFrame): The N x N matrix to plot. Index and columns
                                  should be asset names.
        title (str): The title for the heatmap plot.
        validated_config (Dict[str, Any]): The validated configuration dictionary.
        is_diverging (bool): If True, uses a diverging colormap centered at 0
                             (for data like KM coefficients). If False, uses a
                             sequential colormap (for data like TE).

    Returns:
        plt.Figure: The matplotlib Figure object containing the heatmap.
    """
    # --- Input Validation ---
    if not isinstance(matrix_df, pd.DataFrame) or matrix_df.shape[0] != matrix_df.shape[1]:
        raise TypeError("Input 'matrix_df' must be a square pandas DataFrame.")

    # Extract visualization parameters from the configuration.
    viz_params = validated_config['visualization_params']

    # Create the figure and axes object.
    fig, ax = plt.subplots(
        figsize=(10, 8),
        dpi=viz_params['general']['figure_dpi']
    )

    # Determine colormap and color bar normalization based on data type.
    if is_diverging:
        # For diverging data (e.g., KM matrix with +/- values), center the colormap at zero.
        cmap = viz_params['heatmaps']['colormap_diverging']
        # Find the maximum absolute value to create a symmetric color scale.
        abs_max = np.abs(matrix_df.values).max()
        vmin, vmax = -abs_max, abs_max
    else:
        # For sequential data (e.g., TE matrix with only positive values), use a sequential map.
        cmap = viz_params['heatmaps']['colormap_sequential']
        # Use percentiles to prevent extreme outliers from skewing the color bar.
        vmin = np.percentile(matrix_df.values, 5)
        vmax = np.percentile(matrix_df.values, 95)

    # Generate the heatmap using seaborn.
    sns.heatmap(
        matrix_df,
        ax=ax,
        annot=True,          # Annotate cells with their values.
        fmt=".3f",           # Format annotations to 3 decimal places.
        cmap=cmap,           # Apply the selected colormap.
        linewidths=.5,       # Add lines between cells.
        vmin=vmin,           # Set the min value for the color bar.
        vmax=vmax,           # Set the max value for the color bar.
        cbar_kws={'label': 'Magnitude'} # Add a label to the color bar.
    )

    # Set plot title and labels.
    ax.set_title(title, fontsize=viz_params['general']['font_size'] + 4, pad=20)
    ax.tick_params(axis='x', rotation=45)
    ax.tick_params(axis='y', rotation=0)

    fig.tight_layout()
    return fig

def plot_network_graph(
    matrix_df: pd.DataFrame,
    title: str,
    validated_config: Dict[str, Any],
    edge_threshold_percentile: float = 75.0
) -> plt.Figure:
    """
    Generates a directed network graph from an adjacency matrix.

    This function visualizes the relationships in an N x N matrix as a directed
    graph, with features designed for analytical clarity:
    -   Creates a directed graph where nodes are assets and edges represent
        interactions (e.g., information flow, drift influence).
    -   Filters edges based on a percentile threshold to reduce clutter and
        focus on the most significant connections.
    -   Encodes information visually: edge width maps to interaction strength,
        and edge color can represent the nature of the link (positive/negative).
    -   Node size is proportional to its influence (weighted degree).

    Args:
        matrix_df (pd.DataFrame): The N x N adjacency matrix. For a directed
                                  graph, matrix[i, j] represents the edge
                                  from node j to node i.
        title (str): The title for the network plot.
        validated_config (Dict[str, Any]): The validated configuration dictionary.
        edge_threshold_percentile (float): The percentile of edge weights above
                                           which edges will be drawn. E.g., 75.0
                                           means only the top 25% of edges are shown.

    Returns:
        plt.Figure: The matplotlib Figure object containing the network graph.
    """
    # --- Input Validation ---
    if not isinstance(matrix_df, pd.DataFrame) or matrix_df.shape[0] != matrix_df.shape[1]:
        raise TypeError("Input 'matrix_df' must be a square pandas DataFrame.")

    # Extract visualization parameters.
    viz_params = validated_config['visualization_params']
    net_params = viz_params['network_graphs']

    # Create a directed graph from the pandas DataFrame.
    # Note: nx.from_pandas_adjacency creates an edge from col `j` to row `i`.
    G = nx.from_pandas_adjacency(matrix_df, create_using=nx.DiGraph())

    # --- Edge Filtering and Styling ---
    # Get all edge weights and determine the threshold for filtering.
    all_weights = [abs(G.edges[edge]['weight']) for edge in G.edges()]
    if not all_weights:
        print(f"Warning: No edges in the graph for '{title}'. Cannot plot.")
        # Return an empty figure if there are no edges.
        fig, ax = plt.subplots(figsize=(10, 10), dpi=viz_params['general']['figure_dpi'])
        ax.set_title(title)
        ax.text(0.5, 0.5, "No significant connections to display.", ha='center', va='center')
        return fig

    threshold = np.percentile(all_weights, edge_threshold_percentile)

    # Prepare lists for edges and their visual properties.
    edges_to_draw = []
    edge_widths = []
    edge_colors = []

    for u, v, data in G.edges(data=True):
        weight = data['weight']
        # Only include edges that are above the significance threshold.
        if abs(weight) >= threshold:
            edges_to_draw.append((u, v))
            # Scale edge width by weight magnitude.
            edge_widths.append(abs(weight) * net_params['edge_width_multiplier'])
            # Color edges based on sign (for KM) or use a default (for TE).
            edge_colors.append('royalblue' if weight > 0 else 'firebrick')

    # --- Node Styling ---
    # Calculate node size based on weighted degree (a measure of influence).
    # Use the filtered edges for the calculation.
    temp_graph = nx.DiGraph()
    temp_graph.add_weighted_edges_from([(u, v, w) for (u,v),w in zip(edges_to_draw, edge_widths)])
    degrees = dict(temp_graph.degree(weight='weight'))
    node_sizes = [degrees.get(node, 0) * 50 + net_params['node_size'] for node in G.nodes()]

    # --- Drawing ---
    fig, ax = plt.subplots(figsize=(10, 10), dpi=viz_params['general']['figure_dpi'])

    # Use the specified layout function (e.g., circular).
    pos = getattr(nx, net_params['layout_function'])(G)

    # Draw the network components.
    nx.draw_networkx_nodes(G, pos, ax=ax, node_size=node_sizes, node_color=net_params['node_color'])
    nx.draw_networkx_labels(G, pos, ax=ax, font_size=viz_params['general']['font_size'], font_weight='bold')
    nx.draw_networkx_edges(
        G, pos, ax=ax,
        edgelist=edges_to_draw,
        width=edge_widths,
        edge_color=edge_colors,
        arrowstyle='-|>',
        arrows=True,
        arrowsize=net_params['arrow_size'],
        node_size=node_sizes, # Helps with arrow positioning.
        connectionstyle='arc3,rad=0.1' # Use curved edges to see bidirectional flows.
    )

    ax.set_title(title, fontsize=viz_params['general']['font_size'] + 6)
    plt.tight_layout()

    return fig


In [None]:
# Task 10: Results Interpretation

def _unstack_results_to_dataframe(
    time_resolved_results: Dict[pd.Timestamp, pd.DataFrame],
    analysis_type: str
) -> pd.DataFrame:
    """
    Transforms a dictionary of time-stamped matrices into a 2D DataFrame.

    This helper function "unstacks" or "melts" the 3D time-resolved data
    (time, source, target) into a 2D DataFrame where the index is time and
    the columns represent each unique network link (source, target). This
    format is ideal for performing time series analysis on individual links.

    Args:
        time_resolved_results (Dict[pd.Timestamp, pd.DataFrame]):
            A dictionary where keys are timestamps and values are N x N matrices.
        analysis_type (str): A string label for the analysis type (e.g., 'TE').

    Returns:
        pd.DataFrame: A DataFrame with a DatetimeIndex and a MultiIndex for
                      columns of the form (analysis_type, source, target).
    """
    # Handle the case of empty results.
    if not time_resolved_results:
        return pd.DataFrame()

    # Use a list comprehension to build a list of records for DataFrame creation.
    # This is more efficient than appending to a DataFrame in a loop.
    records = []
    for timestamp, matrix in time_resolved_results.items():
        # Skip any windows where the calculation might have failed.
        if matrix is None:
            continue
        # Stack the matrix to turn it into a Series with a MultiIndex (target, source).
        record = matrix.stack()
        # Add the timestamp to the record.
        record.name = timestamp
        records.append(record)

    # Concatenate all records into a single DataFrame.
    if not records:
        return pd.DataFrame()

    unstacked_df = pd.concat(records, axis=1).T
    # Create a new MultiIndex for the columns for clarity.
    unstacked_df.columns = pd.MultiIndex.from_tuples(
        [(analysis_type, target, source) for target, source in unstacked_df.columns],
        names=['analysis', 'target', 'source']
    )
    return unstacked_df

def interpret_dynamic_results(
    time_resolved_results: Dict[str, Dict[pd.Timestamp, pd.DataFrame]],
    magnitude_z_score_threshold: float = 2.0,
    consistency_quantile_threshold: float = 0.75,
    short_window_roll: int = 6,
    long_window_roll: int = 24,
    shift_z_score_threshold: float = 3.0
) -> Dict[str, Any]:
    """
    Automates interpretation of time-resolved network analysis results.

    This function provides a two-pronged interpretation of dynamic network data:
    1.  Persistent Interaction Identification: It identifies network links that
        are both strong on average (high magnitude) and consistently present
        over time (high consistency).
    2.  Regime Shift Detection: It screens for structural breaks by identifying
        moments where a link's short-term behavior deviates significantly
        from its long-term historical norm, using a rolling z-score method.

    Args:
        time_resolved_results (Dict[str, Dict[pd.Timestamp, pd.DataFrame]]):
            The output from `perform_time_resolved_analysis`.
        magnitude_z_score_threshold (float): Z-score threshold for a link's
            average strength to be considered high magnitude.
        consistency_quantile_threshold (float): Quantile of a link's own
            history used as a threshold for the consistency calculation.
        short_window_roll (int): The lookback period for the short-term
            rolling mean in regime shift detection.
        long_window_roll (int): The lookback period for the long-term
            rolling mean and standard deviation in regime shift detection.
        shift_z_score_threshold (float): The z-score threshold for flagging a
            potential regime shift.

    Returns:
        Dict[str, Any]: A dictionary containing DataFrames and dictionaries that
                        summarize the interpretation for both TE and KM analyses.
                        Includes persistent links and detected regime shifts.
    """
    # --- Input Validation ---
    if not isinstance(time_resolved_results, dict):
        raise TypeError("time_resolved_results must be a dictionary.")

    print("Interpreting dynamic results for persistent interactions and regime shifts...")
    interpretation_summary = {}
    epsilon = 1e-15 # For numerical stability

    # Iterate over each analysis type (TE and KM).
    for analysis_type, results_dict in time_resolved_results.items():
        # --- Step 1: Data Transformation ---
        links_df = _unstack_results_to_dataframe(results_dict, analysis_type)

        if links_df.empty:
            print(f"  No data to interpret for {analysis_type}.")
            interpretation_summary[analysis_type] = {
                'all_links_summary': pd.DataFrame(),
                'persistent_links': pd.DataFrame(),
                'regime_shifts': {}
            }
            continue

        # --- Step 2: Persistent Interaction Identification ---
        # (This section is retained from the original implementation)
        if analysis_type == 'kramers_moyal':
            mean_strengths = links_df.abs().mean(axis=0)
        else:
            mean_strengths = links_df.mean(axis=0)

        grand_mean = mean_strengths.mean()
        grand_std = mean_strengths.std()
        magnitude_z_scores = (mean_strengths - grand_mean) / grand_std

        def consistency_score(series: pd.Series) -> float:
            threshold = series.quantile(consistency_quantile_threshold)
            return (series > threshold).mean() if not series.empty else 0.0

        consistency_scores = links_df.apply(consistency_score, axis=0)

        summary_df = pd.DataFrame({
            'Mean Strength': mean_strengths,
            'Magnitude Z-Score': magnitude_z_scores,
            'Consistency Score': consistency_scores
        })

        is_persistent = (summary_df['Magnitude Z-Score'] > magnitude_z_score_threshold) & \
                        (summary_df['Consistency Score'] > consistency_quantile_threshold)
        summary_df['Is Persistent'] = is_persistent
        persistent_links_df = summary_df[summary_df['Is Persistent']].sort_values(
            by='Magnitude Z-Score', ascending=False
        )
        print(f"  Identified {len(persistent_links_df)} persistent interactions for {analysis_type}.")

        # --- Step 3: Implement Regime Shift Detection Logic ---
        print(f"  Detecting regime shifts for {analysis_type}...")
        # Calculate short-term and long-term rolling means.
        short_rolling_mean = links_df.rolling(window=short_window_roll, min_periods=short_window_roll).mean()
        long_rolling_mean = links_df.rolling(window=long_window_roll, min_periods=long_window_roll).mean()

        # Calculate the rolling standard deviation of the difference.
        rolling_std = (short_rolling_mean - long_rolling_mean).rolling(window=long_window_roll).std()

        # Calculate the rolling z-score of the deviation.
        z_score_df = (short_rolling_mean - long_rolling_mean) / (rolling_std + epsilon)

        # Identify points where the z-score exceeds the threshold.
        is_shift = z_score_df.abs() > shift_z_score_threshold

        # --- Step 4: Structure and Store the Regime Shift Results ---
        # Convert the sparse boolean DataFrame into a more useful dictionary format.
        regime_shifts = {}
        # Get timestamps where at least one shift occurred.
        shift_timestamps = is_shift.index[is_shift.any(axis=1)]

        for ts in shift_timestamps:
            # Get the links (columns) that shifted at this timestamp.
            shifted_links = is_shift.columns[is_shift.loc[ts]].tolist()
            # Store the list of shifted links for this timestamp.
            regime_shifts[ts] = shifted_links

        print(f"  Identified {len(regime_shifts)} timestamps with potential regime shifts.")

        # --- Step 5: Update the Return Value ---
        # Store all interpretation results for the current analysis type.
        interpretation_summary[analysis_type] = {
            'all_links_summary': summary_df,
            'persistent_links': persistent_links_df,
            'regime_shifts': regime_shifts
        }

    print("\nResults interpretation complete.")
    return interpretation_summary


In [None]:
# Task 11: Robustness Checks

def run_full_analysis_pipeline(
    raw_price_df: pd.DataFrame,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Executes the complete, end-to-end financial network analysis pipeline.

    This master orchestrator function serves as the main entry point for a
    single, comprehensive analysis run. It sequentially executes all the
    required analytical steps, from data validation and preprocessing to
    dynamic analysis and interpretation, ensuring a rigorous and reproducible
    workflow.

    The pipeline proceeds as follows:
    1.  Validates all inputs (data and configuration).
    2.  Preprocesses raw prices into clean log-returns.
    3.  Performs stationarity checks on prices and returns.
    4.  Generates descriptive statistics and visualizations.
    5.  Calculates static (full-period) TE and KM matrices.
    6.  Executes the time-resolved (sliding window) analysis for TE and KM.
    7.  Analyzes the impact of pre-defined crisis periods.
    8.  Interprets the dynamic results to identify persistent interactions.

    Args:
        raw_price_df (pd.DataFrame): The raw, datetime-indexed DataFrame of
                                     asset closing prices.
        study_config (Dict[str, Any]): The configuration dictionary defining
                                       all parameters for the study.

    Returns:
        Dict[str, Any]: A comprehensive dictionary containing all artifacts
                        generated during the analysis, including processed data,
                        statistical results, static and dynamic matrices,
                        crisis analysis, interpretations, and figures.
    """
    # Initialize the main dictionary to store all results.
    results_bundle = {}
    print("="*80)
    print("STARTING FULL ANALYSIS PIPELINE")
    print("="*80)

    try:
        # --- Task 1: Parameter and Data Validation ---
        validated_config = validate_inputs(raw_price_df, study_config)
        results_bundle['validated_config'] = validated_config

        # --- Task 2: Data Preprocessing ---
        log_return_df, preprocessing_metadata = preprocess_price_data(
            raw_price_df, validated_config
        )
        results_bundle['preprocessed_data'] = {
            'log_returns': log_return_df,
            'metadata': preprocessing_metadata
        }

        # --- Task 3: Stationarity Check ---
        # We need the price data aligned with the log-return data for a fair comparison.
        aligned_price_df = raw_price_df.loc[log_return_df.index]
        stationarity_results = perform_stationarity_analysis(
            aligned_price_df, log_return_df
        )
        results_bundle['stationarity_analysis'] = stationarity_results

        # --- Task 4: Descriptive Statistics ---
        stats_df, returns_fig = generate_descriptive_statistics(
            log_return_df, validated_config
        )
        results_bundle['descriptive_statistics'] = {
            'statistics_table': stats_df,
            'log_returns_figure': returns_fig
        }

        # --- Task 5 & 6: Static (Full-Period) Analysis ---
        print("\n--- Performing Static (Full-Period) Analysis ---")
        static_te_matrix = calculate_transfer_entropy(log_return_df, validated_config)
        static_km_matrix = calculate_kramers_moyal_drift_matrix(log_return_df, validated_config)
        results_bundle['static_analysis'] = {
            'transfer_entropy_matrix': static_te_matrix,
            'kramers_moyal_matrix': static_km_matrix
        }

        # --- Task 7: Time-Resolved Analysis ---
        time_resolved_results = perform_time_resolved_analysis(
            log_return_df, validated_config
        )
        results_bundle['time_resolved_analysis'] = time_resolved_results

        # --- Task 8: Crisis Period Analysis ---
        crisis_analysis_results = analyze_crisis_periods(
            time_resolved_results, validated_config
        )
        results_bundle['crisis_period_analysis'] = crisis_analysis_results

        # --- Task 9: Network Visualization (on static results for demonstration) ---
        # Visualizations are generated but stored in a dedicated key.
        print("\n--- Generating Visualizations for Static Analysis ---")
        static_te_heatmap = plot_matrix_heatmap(
            static_te_matrix, "Static Transfer Entropy", validated_config, is_diverging=False
        )
        static_km_heatmap = plot_matrix_heatmap(
            static_km_matrix, "Static Kramers-Moyal Drift", validated_config, is_diverging=True
        )
        static_te_graph = plot_network_graph(
            static_te_matrix, "Static TE Network (Top 25% Links)", validated_config
        )
        static_km_graph = plot_network_graph(
            static_km_matrix, "Static KM Network (Top 25% Links)", validated_config
        )
        results_bundle['visualizations'] = {
            'static_te_heatmap': static_te_heatmap,
            'static_km_heatmap': static_km_heatmap,
            'static_te_graph': static_te_graph,
            'static_km_graph': static_km_graph
        }

        # --- Task 10: Results Interpretation ---
        interpretation_results = interpret_dynamic_results(time_resolved_results)
        results_bundle['interpretation'] = interpretation_results

        print("\n" + "="*80)
        print("FULL ANALYSIS PIPELINE COMPLETED SUCCESSFULLY")
        print("="*80)

    except Exception as e:
        # Catch any exception from the pipeline, print it, and re-raise.
        print("\n" + "!"*80)
        print(f"AN ERROR OCCURRED IN THE ANALYSIS PIPELINE: {e}")
        print("!"*80)
        # Re-raise the exception to halt execution and allow for debugging.
        raise

    # Return the comprehensive bundle of all results.
    return results_bundle

def perform_robustness_analysis(
    raw_price_df: pd.DataFrame,
    base_config: Dict[str, Any],
    param_grid: Dict[str, List[Any]]
) -> Dict[str, Any]:
    """
    Performs a multi-faceted robustness analysis by running the full pipeline
    across a grid of hyperparameters.

    This high-level function assesses the stability of key analytical findings,
    including both persistent network links and detected regime shifts. It
    systematically varies crucial parameters, quantifies how often each key
    finding appears, and reports a "Robustness Score" for each.

    Args:
        raw_price_df (pd.DataFrame): The raw, datetime-indexed DataFrame of
                                     asset closing prices.
        base_config (Dict[str, Any]): The baseline configuration dictionary.
        param_grid (Dict[str, List[Any]]): A dictionary where keys are parameter
            paths (e.g., 'analysis_params.information_theory.discretization_bins')
            and values are lists of values to test for that parameter.

    Returns:
        Dict[str, Any]: A dictionary containing the robustness summary, including
                        DataFrames that list persistent links and regime shifts
                        and the percentage of runs in which they were identified.
    """
    print("="*80)
    print("STARTING COMPREHENSIVE ROBUSTNESS ANALYSIS")
    print("WARNING: This process is computationally intensive and may take a long time.")
    print("="*80)

    # --- Step 1: Generate all parameter combinations ---
    keys, values = zip(*param_grid.items())
    run_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
    num_runs = len(run_combinations)
    print(f"Will perform {num_runs} full analysis runs with different parameter configurations.")

    # Initialize lists to store the key findings from each run.
    all_persistent_te_links = []
    all_persistent_km_links = []
    all_te_regime_shifts = []
    all_km_regime_shifts = []

    # --- Step 2: Iterate through combinations and run the pipeline ---
    for i, params in enumerate(run_combinations):
        print(f"\n--- Running Analysis {i+1}/{num_runs} with params: {params} ---")
        run_config = deepcopy(base_config)

        for key_path, value in params.items():
            keys_list = key_path.split('.')
            d = run_config
            for key in keys_list[:-1]:
                d = d[key]
            d[keys_list[-1]] = value

        try:
            results_bundle = run_full_analysis_pipeline(raw_price_df, run_config)

            # --- Step 1 (cont.): Update Data Aggregation Logic ---
            # Extract persistent links
            te_links = results_bundle['interpretation']['transfer_entropy']['persistent_links']
            km_links = results_bundle['interpretation']['kramers_moyal']['persistent_links']
            all_persistent_te_links.extend(list(te_links.index))
            all_persistent_km_links.extend(list(km_links.index))

            # Extract and flatten regime shifts
            te_shifts = results_bundle['interpretation']['transfer_entropy']['regime_shifts']
            for ts, links in te_shifts.items():
                for link_tuple in links:
                    # link_tuple is ('analysis', 'target', 'source')
                    all_te_regime_shifts.append((ts, link_tuple[1], link_tuple[2]))

            km_shifts = results_bundle['interpretation']['kramers_moyal']['regime_shifts']
            for ts, links in km_shifts.items():
                for link_tuple in links:
                    all_km_regime_shifts.append((ts, link_tuple[1], link_tuple[2]))

        except Exception as e:
            print(f"  WARNING: Run {i+1} failed with error: {e}. Skipping this configuration.")
            continue

    # --- Step 3: Aggregate and Summarize Robustness ---
    print("\n--- Aggregating Robustness Results ---")

    # Helper function to create robustness summary DataFrames.
    def create_robustness_df(link_counts: Counter, total_runs: int, index_names: List[str]) -> pd.DataFrame:
        if not link_counts:
            return pd.DataFrame(columns=['Count', 'Robustness Score (%)'])

        df = pd.DataFrame.from_dict(link_counts, orient='index', columns=['Count'])
        df['Robustness Score (%)'] = (df['Count'] / total_runs) * 100
        df.index = pd.MultiIndex.from_tuples(df.index, names=index_names)
        return df.sort_values(by='Robustness Score (%)', ascending=False)

    # Process persistent links
    te_persistent_counts = Counter(all_persistent_te_links)
    km_persistent_counts = Counter(all_persistent_km_links)
    te_persistent_robustness_df = create_robustness_df(te_persistent_counts, num_runs, ['analysis', 'target', 'source'])
    km_persistent_robustness_df = create_robustness_df(km_persistent_counts, num_runs, ['analysis', 'target', 'source'])

    # Process regime shifts
    te_shift_counts = Counter(all_te_regime_shifts)
    km_shift_counts = Counter(all_km_regime_shifts)
    te_shift_robustness_df = create_robustness_df(te_shift_counts, num_runs, ['timestamp', 'target', 'source'])
    km_shift_robustness_df = create_robustness_df(km_shift_counts, num_runs, ['timestamp', 'target', 'source'])

    # Assemble the final, comprehensive robustness summary.
    robustness_summary = {
        'run_configurations': run_combinations,
        'total_runs': num_runs,
        'persistent_links': {
            'transfer_entropy': te_persistent_robustness_df,
            'kramers_moyal': km_persistent_robustness_df
        },
        'regime_shifts': {
            'transfer_entropy': te_shift_robustness_df,
            'kramers_moyal': km_shift_robustness_df
        }
    }

    print("\n" + "="*80)
    print("ROBUSTNESS ANALYSIS COMPLETE")
    print("="*80)

    return robustness_summary


In [None]:
# Task 12: Error Analysis

def _block_bootstrap_generator(
    df: pd.DataFrame,
    num_samples: int
) -> Generator[pd.DataFrame, None, None]:
    """
    A memory-efficient generator for block bootstrap resampling of time series.

    This generator creates bootstrap samples by sampling blocks of consecutive
    rows with replacement. This method preserves the autocorrelation structure
    inherent in time series data, which would be destroyed by simple i.i.d.
    resampling.

    Args:
        df (pd.DataFrame): The time series DataFrame to be resampled.
        num_samples (int): The total number of bootstrap samples to generate.

    Yields:
        Generator[pd.DataFrame, None, None]: A generator that yields a new
            bootstrapped DataFrame in each iteration.
    """
    # Get the total number of observations and define the block size.
    n_obs = len(df)
    # A common heuristic for block size is the square root of the series length.
    block_size = int(np.sqrt(n_obs))

    # Calculate the number of blocks needed to approximate the original length.
    num_blocks = int(np.ceil(n_obs / block_size))
    # Define the possible starting indices for the blocks.
    possible_starts = np.arange(0, n_obs - block_size + 1)

    # Generate the requested number of bootstrap samples.
    for _ in range(num_samples):
        # Randomly sample the starting indices of the blocks with replacement.
        block_start_indices = np.random.choice(possible_starts, size=num_blocks, replace=True)

        # Create the new series by concatenating the chosen blocks.
        # This is done efficiently using a list comprehension and pd.concat.
        bootstrap_sample_df = pd.concat(
            [df.iloc[i : i + block_size] for i in block_start_indices],
            axis=0
        )

        # Truncate the sample to the original length and reset the index.
        # The index is reset because the concatenated index is not meaningful.
        yield bootstrap_sample_df.iloc[:n_obs].reset_index(drop=True)

def perform_error_analysis(
    log_return_df: pd.DataFrame,
    validated_config: Dict[str, Any],
    num_samples: int = 500,
    alpha: float = 0.05
) -> Dict[str, Any]:
    """
    Performs error analysis using block bootstrapping to estimate confidence intervals.

    This function quantifies the uncertainty around the static TE and KM matrix
    estimates. It generates numerous resampled time series using a block
    bootstrap method (to preserve autocorrelation) and re-calculates the
    matrices for each sample. The distribution of these bootstrapped results
    is then used to construct confidence intervals for each coefficient.

    Args:
        log_return_df (pd.DataFrame): The clean, stationary log-return DataFrame.
        validated_config (Dict[str, Any]): The validated configuration dictionary.
        num_samples (int): The number of bootstrap samples to generate.
                           Warning: High numbers are computationally expensive.
        alpha (float): The significance level. E.g., 0.05 for a 95% CI.

    Returns:
        Dict[str, Any]: A dictionary containing the error analysis results, with
                        keys 'transfer_entropy' and 'kramers_moyal'. Each value
                        is another dictionary containing:
                        - 'point_estimate': The matrix calculated on original data.
                        - 'lower_ci': The matrix of lower bounds of the CI.
                        - 'upper_ci': The matrix of upper bounds of the CI.
    """
    print("="*80)
    print("STARTING ERROR ANALYSIS (BOOTSTRAPPING)")
    print(f"WARNING: This is computationally intensive. Running {num_samples} bootstrap samples.")
    print("="*80)

    # --- Calculate the original point estimates first ---
    print("Calculating original point estimates...")
    point_estimate_te = calculate_transfer_entropy(log_return_df, validated_config)
    point_estimate_km = calculate_kramers_moyal_drift_matrix(log_return_df, validated_config)

    # Initialize lists to store the results from each bootstrap iteration.
    bootstrap_te_results = []
    bootstrap_km_results = []

    # --- Create and iterate through the bootstrap samples ---
    bootstrap_generator = _block_bootstrap_generator(log_return_df, num_samples)

    for i, resampled_df in tqdm(enumerate(bootstrap_generator), total=num_samples, desc="Bootstrap Iterations"):
        # The resampled df has a generic integer index, which is fine for the
        # calculation functions as they don't rely on the datetime values.

        # --- Calculate TE and KM on the resampled data ---
        try:
            # Calculate TE matrix for the current bootstrap sample.
            te_matrix = calculate_transfer_entropy(resampled_df, validated_config)
            bootstrap_te_results.append(te_matrix.values)
        except Exception as e:
            # If a calculation fails, print a warning and skip this iteration.
            print(f"\nWarning: TE calculation failed on bootstrap sample {i+1}: {e}")

        try:
            # Calculate KM matrix for the current bootstrap sample.
            km_matrix = calculate_kramers_moyal_drift_matrix(resampled_df, validated_config)
            bootstrap_km_results.append(km_matrix.values)
        except Exception as e:
            print(f"\nWarning: KM calculation failed on bootstrap sample {i+1}: {e}")

    # --- Calculate Confidence Intervals from the bootstrap distributions ---
    print("\nCalculating confidence intervals from bootstrap distributions...")

    # Define the lower and upper percentile bounds for the CI.
    lower_percentile = (alpha / 2.0) * 100
    upper_percentile = (1.0 - (alpha / 2.0)) * 100

    # Get asset names for labeling the final DataFrames.
    asset_names = log_return_df.columns

    # --- Process TE results ---
    te_stack = np.stack(bootstrap_te_results, axis=0)
    te_lower_ci = np.percentile(te_stack, lower_percentile, axis=0)
    te_upper_ci = np.percentile(te_stack, upper_percentile, axis=0)

    # --- Process KM results ---
    km_stack = np.stack(bootstrap_km_results, axis=0)
    km_lower_ci = np.percentile(km_stack, lower_percentile, axis=0)
    km_upper_ci = np.percentile(km_stack, upper_percentile, axis=0)

    # --- Assemble the final results dictionary ---
    error_analysis_results = {
        'transfer_entropy': {
            'point_estimate': point_estimate_te,
            'lower_ci': pd.DataFrame(te_lower_ci, index=asset_names, columns=asset_names),
            'upper_ci': pd.DataFrame(te_upper_ci, index=asset_names, columns=asset_names)
        },
        'kramers_moyal': {
            'point_estimate': point_estimate_km,
            'lower_ci': pd.DataFrame(km_lower_ci, index=asset_names, columns=asset_names),
            'upper_ci': pd.DataFrame(km_upper_ci, index=asset_names, columns=asset_names)
        }
    }

    print("\n" + "="*80)
    print("ERROR ANALYSIS COMPLETE")
    print("="*80)

    return error_analysis_results


In [None]:
# Master Pipeline

def _generate_report_string(master_results: Dict[str, Any]) -> str:
    """
    Generates a comprehensive, human-readable report from the master results bundle.

    This helper function synthesizes the outputs of the entire analysis into a
    structured Markdown string. This revised version includes new sections for
    the regime shift analysis, providing a complete overview of both stable
    (persistent) and unstable (shifting) network dynamics.

    Args:
        master_results (Dict[str, Any]): The master dictionary containing all
                                         analysis results.

    Returns:
        str: A formatted Markdown string summarizing the entire analysis.
    """
    # --- Report Header ---
    report_parts = [
        "# Digital Twin of Market Dynamics: Analysis Report",
        "This report summarizes the findings from the end-to-end analysis pipeline."
    ]

    # --- Methodology Section ---
    config = master_results['main_analysis']['validated_config']
    report_parts.append("\n## 1. Methodology and Configuration")
    report_parts.append(f"- **Date Range:** {config['data_params']['date_range']['start'].date()} to {config['data_params']['date_range']['end'].date()}")
    report_parts.append(f"- **Sliding Window:** Size = {config['analysis_params']['sliding_window']['window_size_days']} days, Step = {config['analysis_params']['sliding_window']['step_size_days']} days")
    report_parts.append(f"- **Transfer Entropy:** Bins = {config['analysis_params']['information_theory']['discretization_bins']}")
    report_parts.append(f"- **Preprocessing:** {master_results['main_analysis']['preprocessed_data']['metadata']['final_observations']} observations used after cleaning.")

    # --- Key Findings Section ---
    report_parts.append("\n## 2. Key Findings: Dynamic Interpretation Summary")

    # --- 2.1 Persistent Interactions ---
    report_parts.append("\n### 2.1. Persistent Transfer Entropy Links")
    persistent_te = master_results['main_analysis']['interpretation']['transfer_entropy']['persistent_links']
    if not persistent_te.empty:
        report_parts.append(persistent_te.to_markdown(floatfmt=".4f"))
    else:
        report_parts.append("No persistent TE links were identified based on the specified criteria.")

    report_parts.append("\n### 2.2. Persistent Kramers-Moyal Links")
    persistent_km = master_results['main_analysis']['interpretation']['kramers_moyal']['persistent_links']
    if not persistent_km.empty:
        report_parts.append(persistent_km.to_markdown(floatfmt=".4f"))
    else:
        report_parts.append("No persistent KM links were identified based on the specified criteria.")

    # --- 2.3 Regime Shift Events ---
    report_parts.append("\n### 2.3. Notable Regime Shift Events")
    report_parts.append("The following dates were identified as having the largest number of simultaneous link shifts, indicating potential market-wide structural breaks.")

    # Helper to format the shift summary
    def format_shift_summary(shifts_dict: Dict, top_n: int = 5) -> str:
        if not shifts_dict:
            return "No significant regime shifts were detected."
        # Sort events by the number of links that shifted
        sorted_events = sorted(shifts_dict.items(), key=lambda item: len(item[1]), reverse=True)
        summary_lines = []
        for ts, links in sorted_events[:top_n]:
            # Format links for readability: e.g., (TE, Gold, Nasdaq) -> "Gold -> Nasdaq"
            formatted_links = [f"{link[2]} -> {link[1]}" for link in links]
            summary_lines.append(f"- **{ts.date()}** ({len(links)} links shifted): {', '.join(formatted_links)}")
        return "\n".join(summary_lines)

    te_shifts = master_results['main_analysis']['interpretation']['transfer_entropy']['regime_shifts']
    report_parts.append("\n**Top Transfer Entropy Shift Events:**")
    report_parts.append(format_shift_summary(te_shifts))

    km_shifts = master_results['main_analysis']['interpretation']['kramers_moyal']['regime_shifts']
    report_parts.append("\n**Top Kramers-Moyal Shift Events:**")
    report_parts.append(format_shift_summary(km_shifts))

    # --- Detailed Tables Section ---
    report_parts.append("\n## 3. Detailed Results Tables")
    report_parts.append("\n### 3.1. Descriptive Statistics (Log-Returns)")
    report_parts.append(master_results['main_analysis']['descriptive_statistics']['statistics_table'].to_markdown(floatfmt=".4f"))
    report_parts.append("\n### 3.2. Stationarity Analysis (ADF Test)")
    report_parts.append(master_results['main_analysis']['stationarity_analysis'].to_markdown(floatfmt=".4f"))

    # --- Robustness Analysis Section ---
    if 'robustness_analysis' in master_results:
        report_parts.append("\n## 4. Robustness Analysis Summary")

        report_parts.append("\n### 4.1. Robust Persistent Links (TE)")
        report_parts.append(master_results['robustness_analysis']['persistent_links']['transfer_entropy'].head(10).to_markdown(floatfmt=".2f"))

        report_parts.append("\n### 4.2. Robust Regime Shifts (TE)")
        report_parts.append("The following shift events were most consistently detected across different parameter settings.")
        report_parts.append(master_results['robustness_analysis']['regime_shifts']['transfer_entropy'].head(10).to_markdown(floatfmt=".2f"))

    # --- Figures Section ---
    report_parts.append("\n## 5. Generated Figures")
    report_parts.append("The following figures are available in the 'main_analysis' -> 'visualizations' key of the results bundle.")
    for key in master_results['main_analysis']['visualizations'].keys():
        report_parts.append(f"- `{key}`")

    return "\n".join(report_parts)

def _figure_to_base64_str(fig: plt.Figure) -> str:
    """
    Converts a matplotlib Figure object into a Base64-encoded data URI string.

    This utility function renders a figure into an in-memory binary buffer,
    encodes the binary data into Base64, and formats it as a data URI
    that can be directly embedded into an HTML <img> tag.

    Args:
        fig (plt.Figure): The matplotlib Figure object to convert.

    Returns:
        str: A data URI string for the figure (e.g., "data:image/png;base64,...").
    """
    # --- Step 1: Render to an In-Memory Buffer ---
    # Create a binary buffer in memory to hold the image data.
    buf = io.BytesIO()
    # Save the figure to the buffer in PNG format.
    # Using a moderate DPI is good for web display without creating huge files.
    fig.savefig(buf, format='png', dpi=150, bbox_inches='tight')
    # The buffer's cursor is at the end, so we need to seek back to the start.
    buf.seek(0)

    # --- Step 2: Encode to Base64 ---
    # Read the binary data from the buffer.
    image_bytes = buf.read()
    # Encode the binary data using the base64 standard library.
    encoded_bytes = base64.b64encode(image_bytes)
    # Close the buffer to free memory.
    buf.close()

    # --- Step 3: Decode to a String ---
    # Decode the Base64 bytes into a standard UTF-8 string.
    encoded_string = encoded_bytes.decode('utf-8')

    # --- Step 4: Return the Data URI ---
    # Format the final data URI string for use in an HTML src attribute.
    return f"data:image/png;base64,{encoded_string}"

def _generate_html_report(master_results: Dict[str, Any]) -> str:
    """
    Generates a comprehensive, self-contained, and robust HTML report.

    This function synthesizes all analytical outputs into a single, dynamic HTML
    file. This revised version includes robust checks for missing figures,
    ensuring that the report generation never fails even if an upstream
    visualization step encounters an error.

    Args:
        master_results (Dict[str, Any]): The master dictionary containing all
                                         analysis results.

    Returns:
        str: A string containing the full HTML source code of the report.
    """
    # Initialize a list to hold the parts of the HTML document.
    html_parts = []

    # --- Step 1: Define HTML Structure and CSS Styling ---
    # This CSS provides a clean, professional, and readable style for the report.
    html_css = """
    <style>
        body { font-family: 'Helvetica Neue', Helvetica, Arial, sans-serif; line-height: 1.6; color: #333; margin: 2em; max-width: 1200px; margin-left: auto; margin-right: auto; }
        h1, h2, h3, h4 { color: #2c3e50; border-bottom: 2px solid #ecf0f1; padding-bottom: 10px; }
        h1 { font-size: 2.5em; text-align: center; }
        h2 { font-size: 2em; }
        h3 { font-size: 1.5em; }
        p.error-msg { color: #c0392b; font-style: italic; }
        table.styled-table { border-collapse: collapse; margin: 25px 0; font-size: 0.9em; width: 100%; box-shadow: 0 0 20px rgba(0, 0, 0, 0.15); }
        table.styled-table thead tr { background-color: #009879; color: #ffffff; text-align: left; }
        table.styled-table th, table.styled-table td { padding: 12px 15px; }
        table.styled-table tbody tr { border-bottom: 1px solid #dddddd; }
        table.styled-table tbody tr:nth-of-type(even) { background-color: #f3f3f3; }
        table.styled-table tbody tr:last-of-type { border-bottom: 2px solid #009879; }
        img { max-width: 100%; height: auto; display: block; margin-left: auto; margin-right: auto; border: 1px solid #ddd; border-radius: 4px; padding: 5px; margin-top: 20px; margin-bottom: 20px; }
        details { border: 1px solid #aaa; border-radius: 4px; padding: .5em .5em 0; margin-top: 1em; }
        summary { font-weight: bold; margin: -.5em -.5em 0; padding: .5em; cursor: pointer; background-color: #f9f9f9; }
        details[open] { padding: .5em; background-color: #fff; }
        details[open] summary { border-bottom: 1px solid #aaa; margin-bottom: .5em; }
    </style>
    """
    # HTML boilerplate
    html_parts.append("<!DOCTYPE html><html lang='en'><head><meta charset='UTF-8'><title>Digital Twin Analysis Report</title>")
    html_parts.append(html_css)
    html_parts.append("</head><body>")
    html_parts.append("<h1>Digital Twin of Market Dynamics: Analysis Report</h1>")

    # --- Step 2: Generate Methodology and Table Sections ---
    config = master_results['main_analysis']['validated_config']
    html_parts.append("<h2>1. Methodology and Configuration</h2>")
    html_parts.append(f"<ul><li><b>Date Range:</b> {config['data_params']['date_range']['start'].date()} to {config['data_params']['date_range']['end'].date()}</li>")
    html_parts.append(f"<li><b>Sliding Window:</b> Size = {config['analysis_params']['sliding_window']['window_size_days']} days, Step = {config['analysis_params']['sliding_window']['step_size_days']} days</li>")
    html_parts.append(f"<li><b>Transfer Entropy:</b> Bins = {config['analysis_params']['information_theory']['discretization_bins']}</li></ul>")

    html_parts.append("<h2>2. Data Diagnostics</h2>")
    html_parts.append("<h3>Descriptive Statistics (Log-Returns)</h3>")
    stats_table = master_results['main_analysis']['descriptive_statistics']['statistics_table']
    html_parts.append(stats_table.to_html(classes='styled-table', float_format='{:.4f}'.format))

    html_parts.append("<h3>Stationarity Analysis (ADF Test)</h3>")
    stationarity_table = master_results['main_analysis']['stationarity_analysis']
    html_parts.append(stationarity_table.to_html(classes='styled-table', float_format='{:.4f}'.format))

    # --- Step 3: Generate and Embed Figures with Robust Checks ---
    html_parts.append("<h2>3. Visualizations</h2>")
    visualizations = master_results['main_analysis'].get('visualizations', {})

    # Helper function to safely embed a figure or report an error.
    def embed_figure(fig_key: str, title: str):
        # Use .get() for safe access to the figure object.
        figure_object = visualizations.get(fig_key)
        # Check if the figure object exists and is not None.
        if figure_object is not None:
            # If it exists, embed it.
            html_parts.append(f"<h4>{title}</h4>")
            html_parts.append(f'<img src="{_figure_to_base64_str(figure_object)}" alt="{title}">')
            # Close the figure to release memory, a critical best practice.
            plt.close(figure_object)
        else:
            # If it doesn't exist, report its absence gracefully.
            html_parts.append(f"<h4>{title}</h4>")
            html_parts.append(f'<p class="error-msg"><i>[Figure: {title}] could not be generated due to an error in the visualization stage.</i></p>')

    # Embed all key figures using the safe helper function.
    embed_figure('log_returns_figure', 'Log-Return Time Series')
    embed_figure('static_te_heatmap', 'Static Transfer Entropy Heatmap')
    embed_figure('static_km_heatmap', 'Static Kramers-Moyal Drift Heatmap')
    embed_figure('static_te_graph', 'Static TE Network (Top 25% Links)')
    embed_figure('static_km_graph', 'Static KM Network (Top 25% Links)')

    # --- Step 4: Add Interactivity for Detailed Tables ---
    html_parts.append("<h2>4. Dynamic Analysis Summaries</h2>")

    # Use .get() for safe access to nested dictionaries.
    interpretation = master_results['main_analysis'].get('interpretation', {})

    html_parts.append("<details><summary>Click to view Persistent Interaction Analysis</summary>")
    persistent_te = interpretation.get('transfer_entropy', {}).get('persistent_links')
    html_parts.append("<h4>Persistent Transfer Entropy Links</h4>")
    if persistent_te is not None and not persistent_te.empty:
        html_parts.append(persistent_te.to_html(classes='styled-table', float_format='{:.4f}'.format))
    else:
        html_parts.append("<p>No persistent TE links were identified.</p>")
    html_parts.append("</details>")

    # --- Step 5: Assemble and Return ---
    html_parts.append("</body></html>")
    return "".join(html_parts)

def run_master_pipeline(
    raw_price_df: pd.DataFrame,
    study_config: Dict[str, Any],
    param_grid: Dict[str, List[Any]],
    report_format: str = 'html',
    run_robustness_analysis: bool = True,
    run_error_analysis: bool = True,
    bootstrap_samples: int = 100
) -> Tuple[Dict[str, Any], Optional[str]]:
    """
    The master orchestrator for the end-to-end Digital Twin analysis pipeline.

    This function serves as the single entry point for a complete analysis. It
    sequentially executes all required analytical stages, from data validation
    and preprocessing to dynamic analysis, interpretation, robustness checks,
    and error analysis. Finally, it compiles all generated artifacts into a
    comprehensive results bundle and synthesizes the findings into a user-
    specified report format.

    Args:
        raw_price_df (pd.DataFrame): The raw, datetime-indexed DataFrame of
                                     asset closing prices.
        study_config (Dict[str, Any]): The baseline configuration dictionary.
        param_grid (Dict[str, List[Any]]): The parameter grid for the
                                           robustness analysis.
        report_format (str): The desired format for the output report.
                             Options: 'html', 'markdown', 'none'.
                             Defaults to 'html'.
        run_robustness_analysis (bool): Flag to enable/disable the computationally
                                        expensive robustness analysis.
        run_error_analysis (bool): Flag to enable/disable the computationally
                                   expensive error analysis.
        bootstrap_samples (int): The number of bootstrap samples for error analysis.

    Returns:
        Tuple[Dict[str, Any], Optional[str]]:
        - A master dictionary containing the complete results of all analyses.
        - A formatted report string in the specified format, or None if
          report_format is 'none'.

    Raises:
        ValueError: If an unsupported report_format is provided.
    """
    # --- Input Validation for this function's parameters ---
    # Define the supported report formats.
    supported_formats = ['html', 'markdown', 'none']
    # Check if the user-provided format is valid.
    if report_format not in supported_formats:
        raise ValueError(f"Unsupported report_format '{report_format}'. "
                         f"Please choose from {supported_formats}.")

    # Initialize the master dictionary to hold all results.
    master_results_bundle = {}
    print("="*80)
    print("STARTING MASTER ANALYSIS PIPELINE")
    print("="*80)

    try:
        # --- Execute the Main Analysis Pipeline ---
        # This is the core sequence of analysis steps.
        main_analysis_results = run_full_analysis_pipeline(raw_price_df, study_config)
        master_results_bundle['main_analysis'] = main_analysis_results

        # Extract key intermediate results needed for other pipelines.
        log_return_df = main_analysis_results['preprocessed_data']['log_returns']
        validated_config = main_analysis_results['validated_config']

        # --- Conditionally Execute Robustness Analysis ---
        if run_robustness_analysis:
            robustness_results = perform_robustness_analysis(
                raw_price_df, study_config, param_grid
            )
            master_results_bundle['robustness_analysis'] = robustness_results
        else:
            print("\nSkipping robustness analysis as per configuration.")

        # --- Conditionally Execute Error Analysis ---
        if run_error_analysis:
            error_analysis_results = perform_error_analysis(
                log_return_df, validated_config, num_samples=bootstrap_samples
            )
            master_results_bundle['error_analysis'] = error_analysis_results
        else:
            print("\nSkipping error analysis as per configuration.")

        # --- Conditionally Generate the Final Report ---
        # Initialize the report string as None.
        report_output = None
        # Check the user's desired format and call the appropriate generator.
        if report_format == 'html':
            print("\n--- Generating Final HTML Report ---")
            report_output = _generate_html_report(master_results_bundle)
            print("HTML report generation complete.")
        elif report_format == 'markdown':
            print("\n--- Generating Final Markdown Report ---")
            report_output = _generate_report_string(master_results_bundle)
            print("Markdown report generation complete.")
        else: # report_format == 'none'
            print("\nSkipping report generation as per configuration.")

        print("\n" + "="*80)
        print("MASTER ANALYSIS PIPELINE COMPLETED SUCCESSFULLY")
        print("="*80)

    except Exception as e:
        # Catch any exception from the pipeline, print it, and re-raise.
        print("\n" + "!"*80)
        print(f"A CRITICAL ERROR OCCURRED IN THE MASTER PIPELINE: {e}")
        print("!"*80)
        # Re-raise the exception to halt execution and allow for debugging.
        raise

    # --- Return the final artifacts ---
    # Return the comprehensive bundle and the generated report (or None).
    return master_results_bundle, report_output

