# `README.md`


# Regime Changes and Real-Financial Cycles: Searching Minsky's Hypothesis in a Nonlinear Setting

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2511.04348-b31b1b.svg)](https://arxiv.org/abs/2511.04348)
[![Journal](https://img.shields.io/badge/Journal-arXiv%20Preprint-003366)](https://arxiv.org/abs/2511.04348)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/regime_changes_real_financial_cycles)
[![Discipline](https://img.shields.io/badge/Discipline-Macroeconomics%20%7C%20Financial%20Economics-00529B)](https://github.com/chirindaopensource/regime_changes_real_financial_cycles)
[![Data Sources](https://img.shields.io/badge/Data-OECD%20Statistics-lightgrey)](https://stats.oecd.org/)
[![Data Sources](https://img.shields.io/badge/Data-BIS%20Credit%20Statistics-lightgrey)](https://www.bis.org/statistics/totcredit.htm)
[![Data Sources](https://img.shields.io/badge/Data-FRED%20Economic%20Data-lightgrey)](https://fred.stlouisfed.org/)
[![Core Method](https://img.shields.io/badge/Method-MS--VAR%20%7C%20EM%20Algorithm%20%7C%20HP%20Filter-orange)](https://github.com/chirindaopensource/regime_changes_real_financial_cycles)
[![Analysis](https://img.shields.io/badge/Analysis-Nonlinear%20Dynamics%20%7C%20Minsky%20Cycles-red)](https://github.com/chirindaopensource/regime_changes_real_financial_cycles)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![Matplotlib](https://img.shields.io/badge/matplotlib-%2311557c.svg?style=flat&logo=matplotlib&logoColor=white)](https://matplotlib.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%230C55A5.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![Statsmodels](https://img.shields.io/badge/statsmodels-blue.svg)](https://www.statsmodels.org/)
[![PyYAML](https://img.shields.io/badge/PyYAML-gray?logo=yaml&logoColor=white)](https://pyyaml.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/regime_changes_real_financial_cycles`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Regime Changes and Real-Financial Cycles: Searching Minsky's Hypothesis in a Nonlinear Setting"** by:

*   Domenico Delli Gatti
*   Filippo Gusella
*   Giorgio Ricchiuti

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous data validation and spectral decomposition to the core nonlinear Markov-Switching Vector Autoregression (MS-VAR) estimation, Minsky regime classification, and Monte Carlo robustness analysis.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `execute_minsky_research_project`](#key-callable-execute_minsky_research_project)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Delli Gatti, Gusella, and Ricchiuti (2025). The core of this repository is the iPython Notebook `regime_changes_real_financial_cycles_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed to be a generalizable toolkit for detecting endogenous financial fragility and nonlinear regime shifts in macroeconomic data.

The paper investigates Minsky's Financial Instability Hypothesis (FIH) by extending linear models to a nonlinear setting. It captures local real-financial endogenous cycles where stability breeds instability. This codebase operationalizes the paper's framework, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Process raw macroeconomic time series, applying Hodrick-Prescott (HP) filtering to extract cyclical components.
-   Estimate a bivariate **Markov-Switching Vector Autoregression (MS-VAR)** model to distinguish between "Minsky" (interaction) and "No Minsky" (independence) regimes.
-   Empirically verify the mathematical conditions for endogenous oscillations (complex eigenvalues) and Minskyan feedback loops.
-   Trace the evolution of financial fragility over time using filtered and smoothed regime probabilities.
-   Validate the robustness of findings via parametric bootstrap Monte Carlo simulations.
-   Automatically generate all key tables and figures from the paper.

## Theoretical Background

The implemented methods are grounded in nonlinear dynamic macroeconomic modeling and time series econometrics.

**1. The Minsky Cycle Mechanism:**
The core hypothesis is that financial fragility builds endogenously during expansions. This is captured by a dynamic interaction between a real variable ($y_t$, e.g., GDP) and a financial variable ($f_t$, e.g., debt).
$$
\begin{bmatrix} y_t \\ f_t \end{bmatrix} = \mathbf{A}(s_t) \begin{bmatrix} y_{t-1} \\ f_{t-1} \end{bmatrix} + \boldsymbol{\epsilon}_t
$$
A "Minsky Regime" ($s_t=1$) is characterized by a specific sign pattern in the interaction matrix $\mathbf{A}_1$:
-   $\beta_1 > 0$: Economic expansion leads to rising debt/interest rates (pro-cyclical leverage).
-   $\alpha_2 < 0$: Rising financial fragility drags down real activity.
-   **Oscillation Condition:** The discriminant $\Delta = (\alpha_1 - \beta_2)^2 + 4\alpha_2\beta_1 < 0$, implying complex eigenvalues and endogenous cycles.

**2. Regime Switching:**
The economy transitions between the interaction regime and an independence regime ($s_t=2$) where real and financial variables decouple ($\mathbf{A}_2$ is diagonal). This switching is governed by a first-order Markov chain with transition probabilities $p_{ij}$.

**3. Estimation Strategy:**
The model is estimated using the **Expectation-Maximization (EM) Algorithm**, which iteratively maximizes the likelihood function. The **Hamilton Filter** is used for the Expectation step to infer the latent state probabilities, and the **Kim Smoother** provides the optimal inference using the full sample.

Below is a diagram which sums up the Inputs-Processes-Outputs of the proposed approach:

<div align="center">
  <img src="https://github.com/chirindaopensource/regime_changes_real_financial_cycles/blob/main/regime_changes_real_financial_cycles_inputs_processes_outputs.png" alt="Minsky MS-VAR Process Flow" width="100%">
</div>


## Features

The provided iPython Notebook (`regime_changes_real_financial_cycles_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into 24 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters are managed in an external `config.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks the schema, temporal consistency, and domain constraints of all input data.
-   **Advanced Signal Extraction:** Implements a sparse-matrix version of the Hodrick-Prescott filter for efficient cycle extraction.
-   **Robust Econometric Engine:** A custom EM algorithm implementation with numerical safeguards (regularization, log-sum-exp) for stable MS-VAR estimation.
-   **Comprehensive Diagnostics:** Includes Ljung-Box residual tests and rigorous eigenvalue analysis for Minsky condition verification.
-   **Monte Carlo Robustness:** A fully integrated parametric bootstrap module to assess the stability of parameter estimates and regime classifications.
-   **Automated Reporting:** Generates publication-ready LaTeX tables and technical documentation.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Preprocessing (Tasks 1-4):** Ingests raw data, validates schemas, cleanses missing values via interpolation/truncation, and applies logarithmic transformations.
2.  **Signal Extraction (Task 5):** Applies the HP filter ($\lambda=100$) to extract stationary cyclical components from annual data.
3.  **Model Setup (Tasks 6-10):** Constructs bivariate systems, verifies stationarity via ADF tests, and initializes EM parameters using baseline VAR estimates.
4.  **Estimation (Tasks 11-15):** Executes the EM algorithm, utilizing the Hamilton Filter and Kim Smoother to estimate regime-dependent parameters and state probabilities.
5.  **Diagnostics & Analysis (Tasks 16-18):** Validates residuals, checks mathematical conditions for Minsky cycles (eigenvalues, signs), and analyzes regime dominance over time.
6.  **Robustness & Synthesis (Tasks 19-24):** Runs Monte Carlo simulations, cross-validates against paper benchmarks, and compiles the final report.

## Core Components (Notebook Structure)

The `regime_changes_real_financial_cycles_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 24 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `execute_minsky_research_project`

The project is designed around a single, top-level user-facing interface function:

-   **`execute_minsky_research_project`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `pyyaml`, `scipy`, `statsmodels`, `matplotlib`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/regime_changes_real_financial_cycles.git
    cd regime_changes_real_financial_cycles
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy pyyaml scipy statsmodels matplotlib
    ```

## Input Data Structure

The pipeline requires a dictionary of DataFrames (`raw_datasets`) where keys are country names and values are DataFrames with a `DatetimeIndex` (annual frequency) and the following columns:
1.  **`real_gdp`**: Real Gross Domestic Product.
2.  **`nfcd`**: Non-financial Corporate Debt.
3.  **`household_debt`**: Household Debt.
4.  **`stir`**: Short-Term Interest Rate.

All other parameters are controlled by the `config.yaml` file.

## Usage

The `regime_changes_real_financial_cycles_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `execute_minsky_research_project` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load the master configuration from the YAML file.
    with open('config.yaml', 'r') as f:
        study_config = yaml.safe_load(f)
    
    # 2. Load or generate raw datasets (Example using synthetic generator)
    # In production, load from CSV/Parquet: pd.read_csv(...)
    raw_datasets = load_country_data()
    
    # 3. Execute the entire replication study.
    final_results = execute_minsky_research_project(
        study_config=study_config,
        raw_datasets=raw_datasets
    )
    
    # 4. Access results
    print(final_results["master_results"]["analysis"]["minsky_conditions"])
```

## Output Structure

The pipeline returns a comprehensive dictionary containing all analytical artifacts, structured as follows:
-   **`master_results`**: The aggregated dictionary of all outputs.
    -   **`estimation`**: Contains estimated parameters ($A, \Sigma, P$) for all models.
    -   **`diagnostics`**: Ljung-Box test results.
    -   **`analysis`**: Minsky condition verification and regime probabilities.
    -   **`robustness`**: Monte Carlo simulation statistics.
    -   **`validation`**: Cross-validation report against paper benchmarks.
-   **`latex_tables`**: Ready-to-compile LaTeX code for parameter estimates and classification tables.
-   **`technical_appendix`**: A markdown summary of implementation choices.

## Project Structure

```
regime_changes_real_financial_cycles/
│
├── regime_changes_real_financial_cycles_draft.ipynb  # Main implementation notebook
├── config.yaml                                       # Master configuration file
├── requirements.txt                                  # Python package dependencies
│
├── data/                                             # Directory for raw data (optional)
│   ├── usa_macro_data.csv
│   └── ...
│
├── LICENSE                                           # MIT Project License File
└── README.md                                         # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify study parameters such as the HP filter lambda, convergence tolerances, Monte Carlo iterations, and statistical thresholds without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Alternative Filtering:** Implementing the Hamilton regression filter or Baxter-King filter as alternatives to HP.
-   **Model Selection:** Adding information criteria (AIC/BIC) to select the optimal number of regimes or lags.
-   **Time-Varying Transition Probabilities:** Extending the MS-VAR to allow transition probabilities to depend on exogenous variables (TVTP-MS-VAR).

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{delligatti2025regime,
  title={Regime Changes and Real-Financial Cycles: Searching Minsky's Hypothesis in a Nonlinear Setting},
  author={Delli Gatti, Domenico and Gusella, Filippo and Ricchiuti, Giorgio},
  journal={arXiv preprint arXiv:2511.04348},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). Regime Changes and Real-Financial Cycles: An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/regime_changes_real_financial_cycles
```

## Acknowledgments

-   Credit to **Domenico Delli Gatti, Filippo Gusella, and Giorgio Ricchiuti** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, Statsmodels, and Matplotlib**.

--

*This README was generated based on the structure and content of the `regime_changes_real_financial_cycles_draft.ipynb` notebook and follows best practices for research software documentation.*
```

# Paper

Title: "*Regime Changes and Real-Financial Cycles: Searching Minsky's Hypothesis in a Nonlinear Setting*"

Authors: Domenico delli Gatti, Filippo Gusella, Giorgio Ricchiuti

E-Journal Submission Date: 6 November 2025

Link: https://arxiv.org/abs/2511.04348

Abstract:

This paper investigates Minsky's cycles by extending the paper of stockhammer et al. (2019) with a nonlinear model to capture possible local real-financial endogenous cycles. We trace nonlinear regime changes and check the presence of Minsky cycles from the 1970s to 2020 for the USA, France, Germany, Canada, Australia, and the UK, linking the GDP with corporate debt, interest rate, and household debt. When considering corporate debt, the results reveal real-financial endogenous cycles in all countries, except Australia, and across all countries when interest rates are included. We find evidence for an interaction mechanism between household debt and GDP only for the USA and the UK. These findings underscore the importance of nonlinear regime transitions in empirically assessing Minsky's theory.


# Summary

### The Theoretical Premise & Problem Statement
The paper addresses a fundamental gap in the empirical validation of Hyman Minsky’s **Financial Instability Hypothesis (FIH)**.
*   **The Core Hypothesis:** Minsky argued that "stability is destabilizing." During economic expansions, agents increase leverage, transitioning from "hedge" to "speculative" and finally "Ponzi" financing positions. This endogenously creates financial fragility, leading to a crisis.
*   **The Limitation of Previous Work:** Existing empirical literature (e.g., Stockhammer et al., 2019) largely utilizes linear Vector Autoregression (VAR) models. A linear approach assumes constant parameters over time, failing to capture the *episodic* nature of financial fragility. Minskyan cycles are likely not permanent states but specific "regimes" that the economy enters and exits.
*   **The Objective:** To detect **local** endogenous real-financial cycles by employing a **Markov-Switching Vector Autoregressive (MS-VAR)** model. This allows the system to switch between a "Minsky Regime" (endogenous instability) and a "No Minsky Regime" (independence between real and financial variables).

### The Mathematical Framework


The authors construct a bivariate dynamical system linking a real variable ($y_t$, GDP) and a financial variable ($f_t$, Debt or Interest Rate).

**2.1 The Linear Condition for Cycles**
The foundational dynamics are governed by the transition matrix $A$:
$$
\begin{bmatrix} y_t \\ f_t \end{bmatrix} = A \begin{bmatrix} y_{t-1} \\ f_{t-1} \end{bmatrix}, \quad \text{where } A = \begin{bmatrix} \alpha_1 & \alpha_2 \\ \beta_1 & \beta_2 \end{bmatrix}
$$
For endogenous cycles (complex eigenvalues) to exist, the discriminant of the characteristic equation must be negative ($\Delta < 0$). Furthermore, the **Minsky condition** requires specific signs in the interaction terms:
1.  **$\beta_1 > 0$:** An increase in GDP leads to higher debt/rates (optimism breeds leverage).
2.  **$\alpha_2 < 0$:** An increase in debt/rates drags down GDP (debt service burden).

**2.2 The Nonlinear Extension (MS-VAR)**
The authors introduce a latent state variable $s_t$ governed by a first-order Markov chain. The system switches between two matrices:
*   **Regime 1 (Minsky):** Full interaction matrix allowed.
    $$ y_t = A_1(s_t)y_{t-1} + \epsilon_t $$
*   **Regime 2 (No Minsky):** A diagonal matrix where interaction terms are zero ($\alpha_2 = \beta_1 = 0$). This implies the real and financial sectors evolve independently (random walks or autoregressive processes without feedback).

### Data and Pre-processing
*   **Scope:** 1970s–2020.
*   **Countries:** USA, UK, France, Germany, Canada, and Australia.
*   **Variables:**
    *   **Real:** Real GDP ($y$).
    *   **Financial:** Non-financial Corporate Debt (NFCD), Household Debt (HD), and Short-Term Interest Rates (STIR).
*   **Frequency:** Yearly. (Note: The authors justify low frequency to avoid the noise of high-frequency data and to preserve degrees of freedom in a nonlinear estimation, though this is a constraint on the granularity of the cycle detection).
*   **Filtering:** The Hodrick-Prescott (HP) filter is applied to extract the cyclical components, rendering the series stationary before feeding them into the MS-VAR.

### Empirical Estimation Results
The authors estimate the model separately for the three financial variables.

**Corporate Debt (GDP-NFCD Interaction)**
*   **Finding:** Evidence of endogenous Minsky cycles is strong in **USA, UK, France, Germany, and Canada**.
*   **Coefficients:** In Regime 1, the coefficients $\alpha_2$ are negative and $\beta_1$ are positive (statistically significant), satisfying the mathematical condition for limit cycles.
*   **The Outlier:** **Australia** does not satisfy the condition; the interaction parameters are insignificant or have the wrong signs.
*   **Dynamics:** The "Minsky Regime" dominates in the 1970s and periods leading up to the 2001 and 2008 crises.

**Household Debt (GDP-HD Interaction)**
*   **Finding:** Results are much more heterogeneous.
*   **USA & UK:** Strong evidence of Minsky cycles. The interaction between household leverage and GDP is a key driver of instability in these financialized economies.
*   **Continental Europe & Australia:** Germany, France, Canada, and Australia show **no evidence** of Minsky cycles driven by household debt. The mathematical conditions for complex eigenvalues are not met.
*   **Implication:** The "house price-wealth effect" mechanism is institutionally specific (Anglo-Saxon model).

**Interest Rates (GDP-STIR Interaction)**
*   **Finding:** This interaction is the most robust. **All six countries** (including Australia) exhibit Regime 1 dynamics consistent with Minskyan cycles.
*   **Mechanism:** Growth triggers rate hikes (monetary tightening), which subsequently dampen growth. The authors interpret this as validating the central bank's role in generating (or reacting to) endogenous cycles.

### Regime Dynamics & Robustness
*   **Transition Matrices:**
    *   The USA and UK show high persistence in Regime 1 (Minsky), meaning once the economy enters a fragile state, it tends to stay there.
    *   France and Canada exhibit more volatility, switching frequently between regimes.
*   **Filtered Probabilities:** The plots of $P(s_t=1)$ show that the "Minsky Regime" is time-dependent. It is not a permanent state of capitalism but a phase. For example, in the US, the probability of being in the Minsky regime spiked before the 2008 crisis.
*   **Robustness:** The authors performed 100 Monte Carlo simulations to validate the small-sample properties of their estimators, confirming that the detected cycles are not artifacts of noise.

### Conclusion and Academic Contribution
The paper concludes that Minsky’s hypothesis is empirically valid but **conditional**:
1.  **Nonlinearity is key:** Linear models mask the reality that financial-real interactions are episodic.
2.  **Sector Specificity:** Corporate debt and Interest rates are universal drivers of cycles. Household debt is a driver only in specific institutional contexts (USA/UK).
3.  **Policy Implication:** Financial instability is endogenous to the system (Regime 1). Policy requires monitoring the *transition* into this regime, rather than assuming the economy is always stable or always unstable.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ====================================================================================================#
#
#  Regime Changes and Real-Financial Cycles: Searching Minsky's Hypothesis in a Nonlinear Setting
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Regime Changes and Real-Financial Cycles:
#  Searching Minsky's Hypothesis in a Nonlinear Setting" by Domenico Delli Gatti,
#  Filippo Gusella, and Giorgio Ricchiuti (2025). It delivers a computationally
#  tractable system for the dynamic quantification of endogenous systemic fragility,
#  enabling robust, regime-dependent analysis of real-financial interactions
#  through a nonlinear Markov-Switching Vector Autoregressive (MS-VAR) framework.
#
#  Core Methodological Components:
#  • Bivariate Markov-Switching Vector Autoregression (MS-VAR) estimation via EM Algorithm
#  • Endogenous regime classification: "Minsky Regime" (Interaction) vs. "No Minsky Regime" (Independence)
#  • Spectral decomposition via Hodrick-Prescott (HP) filter for cyclical component extraction
#  • Eigenvalue analysis for detecting local endogenous oscillations (complex roots)
#  • Parametric bootstrap Monte Carlo simulation for robustness assessment
#  • Ljung-Box residual diagnostics for model validation
#
#  Technical Implementation Features:
#  • Hamilton Forward Filter for recursive state inference
#  • Kim Backward Smoother for optimal regime probability estimation
#  • Numerical Hessian approximation for standard error computation
#  • Sparse matrix algebra for efficient HP filtering
#  • Robust numerical optimization with regularization for covariance matrices
#  • Comprehensive visualization of regime probabilities and historical crisis markers
#
#  Paper Reference:
#  Delli Gatti, D., Gusella, F., & Ricchiuti, G. (2025). Regime Changes and Real-Financial
#  Cycles: Searching Minsky's Hypothesis in a Nonlinear Setting. arXiv preprint arXiv:2511.04348.
#  https://arxiv.org/abs/2511.04348
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ====================================================================================================#

import json
import logging
import pickle
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Set, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import sparse
from scipy.sparse.linalg import spsolve
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller

# Configure logging for the cleansing process
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# Implementation

## Draft 1

### **Discussion of Key Callables**

### **Task 1: Validate Study Configuration Object Structure**

**Callable:** `validate_study_config`

*   **Inputs:** `study_config` (Dict[str, Any]): The master configuration dictionary.
*   **Processes:**
    1.  Calls `validate_top_level_architecture` to ensure the dictionary contains exactly `input_data_architecture` and `model_physics`.
    2.  Calls `validate_input_data_architecture` to verify temporal settings (1970-2020, Annual) and dataset schemas (6 countries, 4 variables).
    3.  Calls `validate_model_physics` to enforce mathematical parameters like $\lambda=100$ for the HP filter and the structural masks for the MS-VAR regimes.
*   **Outputs:** `bool`: Returns `True` if valid; raises `ConfigurationError` otherwise.
*   **Transformation:** No data transformation; purely a validation gate.
*   **Research Role:** Implements the **Model Specification** phase. It ensures the computational environment matches the paper's setup, specifically enforcing the regime structure where Regime 1 allows interaction ($A_1$ full) and Regime 2 enforces independence ($A_2$ diagonal), as defined in Section 2.2:
    $$ \mathbf{y}_t = \begin{cases} \mathbf{A}_1 \mathbf{y}_{t-1} + \epsilon_t & \text{if } s_t = 1 \\ \mathbf{A}_2 \mathbf{y}_{t-1} + \epsilon_t & \text{if } s_t = 2 \end{cases} $$

### **Task 2: Validate Raw Datasets Dictionary Structure**

**Callable:** `validate_raw_datasets`

*   **Inputs:** `datasets` (Dict[str, pd.DataFrame]): Raw country data.
*   **Processes:**
    1.  Validates container structure and country keys via `validate_datasets_container`.
    2.  Validates temporal structure (monotonicity, equidistance) via `validate_datetime_index`.
    3.  Validates column presence and domain constraints via `validate_columns_and_domains`.
*   **Outputs:** `bool`: Returns `True` if valid; raises `DataValidationError` otherwise.
*   **Transformation:** No transformation; validation only.
*   **Research Role:** Implements **Data Integrity Verification**. It ensures the input data adheres to the requirements for time series analysis, specifically the equidistant annual frequency required for the HP filter penalty term $\lambda \sum [(\tau_{t+1} - \tau_t) - (\tau_t - \tau_{t-1})]^2$.

### **Task 3: Data Cleansing and Temporal Alignment**

**Callable:** `cleanse_and_align_datasets`

*   **Inputs:** `datasets` (Dict[str, pd.DataFrame]): Raw validated datasets.
*   **Processes:**
    1.  `handle_missing_and_invalid_values`: Interpolates isolated single-year gaps using linear interpolation: $x_t = x_{t-1} + (x_{t+1} - x_{t-1})/2$.
    2.  `enforce_temporal_alignment`: Truncates data to the maximal contiguous block where all variables are valid.
    3.  `validate_and_freeze_data`: Computes summary statistics.
*   **Outputs:** `Dict[str, Any]`: Contains cleansed DataFrames and summary stats.
*   **Transformation:** Raw time series with gaps $\rightarrow$ Continuous, aligned time series.
*   **Research Role:** Implements **Data Preprocessing**. It prepares the raw data for spectral decomposition, ensuring that the vector $\mathbf{y}_t$ is defined for a contiguous set of time points $t=1, \dots, T$, a prerequisite for VAR estimation.

### **Task 4: Apply Logarithmic Transformation to Real GDP**

**Callable:** `apply_log_transform`

*   **Inputs:** `datasets` (Dict[str, pd.DataFrame]): Cleansed datasets.
*   **Processes:**
    1.  `execute_log_transformation`: Applies natural logarithm to Real GDP: $y_t = \ln(\text{GDP}_t)$.
    2.  `document_transformation_metadata`: Records transformation details.
*   **Outputs:** `Tuple[Dict, Dict]`: Transformed datasets and metadata.
*   **Transformation:** Level GDP $\rightarrow$ Log-Level GDP.
*   **Research Role:** Implements **Variable Transformation**. As stated in Section 3, "The real variable $y$ is proxied by... real gross domestic product (GDP)... transformed into logarithmic levels." This prepares the real variable for cycle extraction.

### **Task 5: Implement Hodrick-Prescott Filter for Cycle Extraction**

**Callable:** `extract_hp_cycles`

*   **Inputs:** `datasets` (Dict[str, pd.DataFrame]), `study_config` (Dict).
*   **Processes:**
    1.  Retrieves $\lambda=100$.
    2.  `apply_hp_filter_to_datasets`: Solves the linear system $(I + \lambda D^T D) \tau = x$ using sparse matrix algebra (`hp_filter_sparse`) to separate trend $\tau$ and cycle $c = x - \tau$.
*   **Outputs:** `Dict[str, pd.DataFrame]`: Datasets enriched with cyclical components (e.g., `gdp_cycle`).
*   **Transformation:** Log-levels/Ratios $\rightarrow$ Stationary Cyclical Components.
*   **Research Role:** Implements **Spectral Decomposition**. It isolates the business cycle component $c_t$ from the secular trend, as described in Section 3: "we focus on cyclical phenomena by first extracting cycles from the time series using the Hodrick-Prescott filter."

### **Task 6: Construct Bivariate Cyclical State Vectors**

**Callable:** `construct_bivariate_systems`

*   **Inputs:** `datasets` (Dict[str, pd.DataFrame]): Datasets with cycles.
*   **Processes:**
    1.  `build_bivariate_systems`: Stacks the real cycle and financial cycle into $(T, 2)$ arrays for three specific systems: GDP/NFCD, GDP/HD, GDP/STIR.
    2.  `validate_cyclical_properties`: Checks for zero mean and finite values.
*   **Outputs:** `Dict[str, Dict[str, Any]]`: Structured dictionary of bivariate systems.
*   **Transformation:** Individual Series $\rightarrow$ Bivariate State Vectors $\mathbf{y}_t = [y_t, f_t]'$.
*   **Research Role:** Implements **Model Setup**. It constructs the state vector $\mathbf{y}_t$ defined in Eq. (1):
    $$ \begin{bmatrix} y_t \\ f_t \end{bmatrix} $$
    This vector is the fundamental input for the MS-VAR estimation.

### **Task 7: Augmented Dickey-Fuller (ADF) Test Specification**

**Callable:** `run_adf_test`

*   **Inputs:** `series` (pd.Series), `study_config` (Dict).
*   **Processes:**
    1.  `execute_adf_test`: Estimates the ADF regression $\Delta z_t = \mu + \gamma z_{t-1} + \sum \phi_j \Delta z_{t-j} + u_t$.
    2.  `evaluate_stationarity`: Compares the t-statistic of $\gamma$ against the critical value $-1.94$.
*   **Outputs:** `Dict[str, Any]`: Test results and stationarity decision.
*   **Transformation:** Time Series $\rightarrow$ Statistical Test Metric.
*   **Research Role:** Implements **Stationarity Testing**. It verifies the assumption that the cyclical components are $I(0)$, which is required for the validity of the VAR estimation. This corresponds to the results reported in Appendix A.

### **Task 8: Execute ADF Tests for All Cyclical Series**

**Callable:** `execute_all_adf_tests`

*   **Inputs:** `bivariate_systems` (Dict), `study_config` (Dict).
*   **Processes:**
    1.  `test_all_cycles`: Iterates through all unique series in the systems and runs `run_adf_test`.
    2.  `validate_stationarity_results`: Aggregates results and checks for global stationarity.
*   **Outputs:** `Dict[str, Any]`: Summary of stationarity tests.
*   **Transformation:** Collection of Series $\rightarrow$ Validation Report.
*   **Research Role:** Implements **Diagnostic Validation**. It confirms the empirical validity of the inputs used in the paper, ensuring that "The results in Appendix A confirm the stationarity of the series for all the cases considered."

### **Task 9: Define the Two-Regime MS-VAR(1) Model**

**Callable:** `instantiate_msvar_model`

*   **Inputs:** `country` (str), `system_name` (str), `data` (np.ndarray), `study_config` (Dict).
*   **Processes:**
    1.  Initializes the `MSVARModel` class.
    2.  Loads structural masks for $A_1$ (ones) and $A_2$ (diagonal) from config.
*   **Outputs:** `MSVARModel`: Initialized model object.
*   **Transformation:** Raw Data + Config $\rightarrow$ Model Object.
*   **Research Role:** Implements **Model Definition**. It encapsulates the mathematical structure of the two-regime model defined in Section 2.2, specifically the distinction between the interaction regime ($s_t=1$) and the independence regime ($s_t=2$).

### **Task 10: Initialize EM Algorithm Parameters**

**Callable:** `initialize_em_parameters`

*   **Inputs:** `bivariate_systems` (Dict).
*   **Processes:**
    1.  `estimate_baseline_var`: Estimates a single-regime VAR(1) via OLS: $\hat{A} = (\sum y_t y_{t-1}')(\sum y_{t-1} y_{t-1}')^{-1}$.
    2.  `construct_initial_parameters`: Maps $\hat{A}$ to $A_1^{(0)}$ and $\text{diag}(\hat{A})$ to $A_2^{(0)}$. Perturbs $\Sigma_2$ to break symmetry.
*   **Outputs:** `Dict`: Initial parameters for all models.
*   **Transformation:** Data $\rightarrow$ Initial Parameter Guesses $\Theta^{(0)}$.
*   **Research Role:** Implements **Algorithm Initialization**. It provides the starting point for the Expectation-Maximization algorithm, ensuring the optimization begins in a reasonable region of the parameter space.

### **Task 11: Implement Hamilton Forward Filter**

**Callable:** `run_hamilton_filter`

*   **Inputs:** `data` (np.ndarray), `params` (Dict).
*   **Processes:**
    1.  `compute_conditional_log_densities`: Computes $\log f(y_t | s_t=j)$.
    2.  Executes the forward recursion:
        *   Prediction: $\xi_{t|t-1} = \xi_{t-1|t-1} P$.
        *   Update: $\xi_{t|t} \propto \xi_{t|t-1} \odot \eta_t$.
    3.  Computes log-likelihood via log-sum-exp.
*   **Outputs:** `Dict`: Filtered probabilities and log-likelihood.
*   **Transformation:** Data + Parameters $\rightarrow$ Filtered State Probabilities.
*   **Research Role:** Implements the **E-Step (Filtering)**. It infers the probability of being in the "Minsky Regime" at time $t$ given information up to time $t$, which is central to tracing regime changes over time.

### **Task 12: Implement Kim Backward Smoother**

**Callable:** `run_kim_smoother`

*   **Inputs:** `filter_results` (Dict), `params` (Dict).
*   **Processes:**
    1.  `compute_smoothed_probabilities`: Executes backward recursion $\xi_{t|T} = \xi_{t|t} \odot (P (\xi_{t+1|T} \oslash \xi_{t+1|t}))$.
    2.  `compute_joint_smoothed_probabilities`: Computes $P(s_t=i, s_{t+1}=j | \mathcal{F}_T)$.
*   **Outputs:** `Dict`: Smoothed probabilities.
*   **Transformation:** Filtered Probabilities $\rightarrow$ Smoothed Probabilities.
*   **Research Role:** Implements the **E-Step (Smoothing)**. It provides the optimal inference of the regime at time $t$ using the *entire* sample, which is used for the final regime classification and parameter updates.

### **Task 13: Implement EM M-Step Parameter Updates**

**Callable:** `perform_m_step`

*   **Inputs:** `data` (np.ndarray), `smoother_results` (Dict), `masks` (Dict).
*   **Processes:**
    1.  `update_transition_matrix`: Updates $P$ using joint smoothed probabilities.
    2.  `update_coefficients`: Updates $A_s$ using weighted OLS: $\hat{A}_s = (\sum \xi_t(s) y_t y_{t-1}') (\sum \xi_t(s) y_{t-1} y_{t-1}')^{-1}$. Applies structural masks.
    3.  `update_covariances`: Updates $\Sigma_s$ using weighted residuals.
*   **Outputs:** `Dict`: Updated parameters $\Theta^{(k+1)}$.
*   **Transformation:** Smoothed Probabilities + Data $\rightarrow$ New Parameters.
*   **Research Role:** Implements the **M-Step (Maximization)**. It computes the parameter values that maximize the expected log-likelihood, refining the estimates of the Minsky cycle coefficients ($\alpha, \beta$).

### **Task 14: Implement EM Convergence Logic**

**Callable:** `finalize_estimation`

*   **Inputs:** `current_params`, `current_loglik`, `previous_loglik`, `iteration`, `config`, `data`, `likelihood_evaluator`.
*   **Processes:**
    1.  `check_convergence`: Checks absolute/relative change in log-likelihood against tolerance ($10^{-6}$).
    2.  `compute_standard_errors`: If converged, computes numerical Hessian of log-likelihood and inverts it to get standard errors.
*   **Outputs:** `Tuple[bool, Dict]`: Convergence status and final results.
*   **Transformation:** Iteration State $\rightarrow$ Convergence Decision & SEs.
*   **Research Role:** Implements **Optimization Control**. It determines when the EM algorithm has found the maximum likelihood estimates and quantifies the uncertainty (standard errors) needed for hypothesis testing.

### **Task 15: Estimate MS-VAR for All Country-System Combinations**

**Callable:** `estimate_all_msvar_models`

*   **Inputs:** `bivariate_systems` (Dict), `initial_params_dict` (Dict), `study_config` (Dict).
*   **Processes:**
    1.  Iterates over all 18 models.
    2.  `estimate_single_model`: Runs the EM loop (Tasks 11-14) for each model.
*   **Outputs:** `Dict`: Estimation results for all models.
*   **Transformation:** All Data $\rightarrow$ All Model Estimates.
*   **Research Role:** Implements **Model Estimation**. It produces the empirical results presented in Tables 1, 2, and 3 of the paper.

### **Task 16: Compute Ljung-Box Residual Diagnostics**

**Callable:** `compute_ljung_box_diagnostics`

*   **Inputs:** `msvar_estimates` (Dict), `bivariate_systems` (Dict), `study_config` (Dict).
*   **Processes:**
    1.  `extract_model_residuals`: Computes residuals $u_t = y_t - A_{\hat{s}_t} y_{t-1}$ based on the most probable regime.
    2.  `compute_ljung_box_stats`: Calculates Q-statistics for lag $h=3$.
*   **Outputs:** `Dict`: Diagnostic results.
*   **Transformation:** Estimates $\rightarrow$ Diagnostic Metrics.
*   **Research Role:** Implements **Model Validation**. It checks for serial correlation in the residuals to ensure the MS-VAR(1) specification adequately captures the data dynamics, corresponding to Appendix B.

### **Task 17: Compute Eigenvalues and Minsky Oscillation Conditions**

**Callable:** `analyze_minsky_conditions`

*   **Inputs:** `msvar_estimates` (Dict).
*   **Processes:**
    1.  `evaluate_eigenvalues_and_discriminant`: Computes $\Delta' = (\alpha_1 - \beta_2)^2 + 4 \alpha_2 \beta_1$.
    2.  `classify_minsky_regime`: Checks if $\Delta' < 0$, $\beta_1 > 0$, $\alpha_2 < 0$, and coefficients are significant.
*   **Outputs:** `Dict`: Minsky classification results.
*   **Transformation:** Parameters $\rightarrow$ Economic Interpretation.
*   **Research Role:** Implements **Hypothesis Testing**. It empirically tests the mathematical condition for endogenous Minsky cycles derived in Section 2.1:
    $$ (\alpha_1 - \beta_2)^2 + 4 \alpha_2 \beta_1 < 0 $$
    and the necessary condition $\alpha_2 \beta_1 < 0$.

### **Task 18: Extract and Visualize Regime Probabilities**

**Callable:** `extract_regime_probabilities`

*   **Inputs:** `msvar_estimates` (Dict), `bivariate_systems` (Dict).
*   **Processes:**
    1.  `compute_regime_metrics`: Calculates fraction of time in Regime 1 and expected durations.
    2.  `plot_regime_probabilities`: Generates plots of $\xi_t(1)$ over time.
*   **Outputs:** `Dict`: Regime probabilities and metrics.
*   **Transformation:** Smoothed Probabilities $\rightarrow$ Time Series Analysis.
*   **Research Role:** Implements **Regime Analysis**. It traces the historical evolution of financial fragility, identifying periods where the "Minsky Regime" was dominant (e.g., pre-2008), as shown in Figures 1-13.

### **Task 19: Design End-to-End Orchestrator Function**

**Callable:** `run_minsky_msvar_pipeline`

*   **Inputs:** `study_config` (Dict), `raw_datasets` (Dict).
*   **Processes:**
    1.  Sequentially calls orchestrators for Validation, Data Engineering, Setup, Estimation, Diagnostics, and Analysis.
*   **Outputs:** `Dict`: Comprehensive pipeline results.
*   **Transformation:** Raw Inputs $\rightarrow$ Final Analytical Results.
*   **Research Role:** Implements the **Primary Research Workflow**. It automates the generation of the paper's main empirical findings.

### **Task 20: Design Monte Carlo Robustness Orchestrator**

**Callable:** `run_monte_carlo_robustness`

*   **Inputs:** `msvar_estimates` (Dict), `study_config` (Dict).
*   **Processes:**
    1.  Iterates $N=100$ times per model.
    2.  Calls `simulate_msvar_path` (Task 21) to generate synthetic data.
    3.  Calls `estimate_single_model` to re-estimate parameters.
    4.  Calls `aggregate_mc_results` (Task 22) to compute stats.
*   **Outputs:** `Dict`: Monte Carlo statistics.
*   **Transformation:** Estimates $\rightarrow$ Robustness Metrics.
*   **Research Role:** Implements **Robustness Analysis**. It validates the stability of the parameter estimates and Minsky classifications against small-sample bias, as discussed in Section 3 and Appendix C.

### **Task 21: Implement Monte Carlo Data Generation**

**Callable:** `simulate_msvar_path`

*   **Inputs:** `T` (int), `params` (Dict).
*   **Processes:**
    1.  `simulate_regime_path`: Simulates Markov chain using $P$.
    2.  `generate_var_observables`: Simulates VAR process conditional on regimes.
*   **Outputs:** `Tuple[np.ndarray, np.ndarray]`: Simulated data and regimes.
*   **Transformation:** Parameters $\rightarrow$ Synthetic Data.
*   **Research Role:** Implements the **Data Generating Process (DGP)** for the parametric bootstrap, creating synthetic histories consistent with the estimated model.

### **Task 22: Aggregate Monte Carlo Results**

**Callable:** `aggregate_mc_results`

*   **Inputs:** `replications` (List[Dict]).
*   **Processes:**
    1.  `compute_parameter_stats`: Computes mean, std, CI.
    2.  `assess_minsky_robustness`: Computes fraction of replications satisfying Minsky conditions.
*   **Outputs:** `Dict`: Aggregated stats.
*   **Transformation:** List of Estimates $\rightarrow$ Summary Statistics.
*   **Research Role:** Implements **Statistical Inference**. It provides the confidence intervals and robustness checks reported in Appendix C.

### **Task 23: Cross-Validate Results Against Paper Tables**

**Callable:** `cross_validate_results`

*   **Inputs:** `msvar_estimates` (Dict).
*   **Processes:**
    1.  `load_paper_benchmarks`: Loads hardcoded values from the paper.
    2.  `compute_relative_errors`: Calculates deviation between replication and paper.
    3.  `generate_validation_report`: Summarizes discrepancies.
*   **Outputs:** `Dict`: Validation report.
*   **Transformation:** Estimates + Benchmarks $\rightarrow$ Validation Metrics.
*   **Research Role:** Implements **Replication Verification**. It quantifies the fidelity of the replication against the published results.

### **Task 24: Final Synthesis and Documentation**

**Callable:** `synthesize_final_report`

*   **Inputs:** All pipeline outputs.
*   **Processes:**
    1.  `compile_master_results`: Aggregates everything into one dictionary.
    2.  `generate_latex_tables`: Formats results into LaTeX.
    3.  `generate_technical_appendix`: Creates documentation.
*   **Outputs:** `Dict`: Final report package.
*   **Transformation:** Raw Results $\rightarrow$ Publication Artifacts.
*   **Research Role:** Implements **Reporting**. It generates the final tables and figures presented in the paper.

### **Top-Level Orchestrator**

**Callable:** `execute_minsky_research_project`

*   **Inputs:** `study_config` (Dict), `raw_datasets` (Dict).
*   **Processes:**
    1.  Calls `run_minsky_msvar_pipeline` (Task 19).
    2.  Calls `run_monte_carlo_robustness` (Task 20).
    3.  Calls `cross_validate_results` (Task 23).
    4.  Calls `synthesize_final_report` (Task 24).
*   **Outputs:** `Dict`: The complete project output.
*   **Transformation:** Raw Data $\rightarrow$ Complete Research Output.
*   **Research Role:** Implements the **Full Project Lifecycle**. It ties together the estimation, robustness checking, validation, and reporting into a single executable workflow.

<br><br>

### **Usage Example: End-to-End Minsky Cycle Analysis**

This example uses synthentic data to demonstrate how to run the `execute_minsky_research_project` orchestrator (correctly) as follows:
1.  **Data Generation**: Synthesizing realistic macroeconomic time series for the six target countries.
2.  **Configuration Setup**: Reading a `config.yaml` file to simulate a production environment.
3.  **Execution**: Running the `execute_minsky_research_project` orchestrator.
4.  **Result Inspection**: Accessing the generated artifacts.

```python
import numpy as np
import pandas as pd
import yaml
import logging
import os

# Ensure logging is configured to show pipeline progress
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# ==============================================================================
# 1. Synthetic Data Generation
# ==============================================================================
# We generate synthetic data that mimics the statistical properties of the
# actual OECD/BIS datasets to ensure the pipeline runs successfully.

def generate_synthetic_country_data(seed: int) -> pd.DataFrame:
    """
    Generates a synthetic macroeconomic dataset for a single country.
    
    Properties:
    - Time: 1970-2020 (Annual)
    - GDP: Exponential trend + Cyclical component
    - Debt: Linear trend + Cycle
    - Rates: Mean reverting + Cycle
    """
    np.random.seed(seed)
    dates = pd.date_range(start='1970-01-01', end='2020-01-01', freq='AS')
    T = len(dates)
    t = np.arange(T)
    
    # 1. Real GDP (Billions)
    # Trend: Exponential growth (2% per year)
    # Cycle: Sine wave (business cycle) + Noise
    trend_gdp = 1000 * np.exp(0.02 * t)
    cycle_gdp = 50 * np.sin(2 * np.pi * t / 10) + np.random.normal(0, 20, T)
    real_gdp = trend_gdp + cycle_gdp
    # Ensure strictly positive
    real_gdp = np.maximum(real_gdp, 100.0)

    # 2. Non-Financial Corporate Debt (NFCD)
    # Modeled as % of GDP approx, then scaled
    # Trend: Rising leverage
    trend_nfcd = 500 + 10 * t
    cycle_nfcd = 30 * np.sin(2 * np.pi * t / 10 + 0.5) + np.random.normal(0, 10, T)
    nfcd = trend_nfcd + cycle_nfcd
    nfcd = np.maximum(nfcd, 10.0)

    # 3. Household Debt (HD)
    # Trend: Slower rising leverage
    trend_hd = 400 + 8 * t
    cycle_hd = 25 * np.sin(2 * np.pi * t / 12) + np.random.normal(0, 10, T)
    household_debt = trend_hd + cycle_hd
    household_debt = np.maximum(household_debt, 10.0)

    # 4. Short-Term Interest Rate (STIR)
    # Mean reverting around 5%
    stir = 5.0 + 2.0 * np.sin(2 * np.pi * t / 8) + np.random.normal(0, 1.0, T)
    # Bound within [-5, 40]
    stir = np.clip(stir, 0.0, 20.0)

    df = pd.DataFrame({
        'real_gdp': real_gdp,
        'nfcd': nfcd,
        'household_debt': household_debt,
        'stir': stir
    }, index=dates)
    
    return df

# Generate datasets for all 6 countries and store in dictionary
countries = ["USA", "UK", "France", "Germany", "Canada", "Australia"]
raw_datasets = {
    country: generate_synthetic_country_data(seed=i)
    for i, country in enumerate(countries)
}

print(f"Generated datasets for: {list(raw_datasets.keys())}")
print(f"USA Data Shape: {raw_datasets['USA'].shape}")

# ==============================================================================
# 2. Configuration Setup (Write & Read)
# ==============================================================================

# A. Read the config.yaml file
print("Reading 'config.yaml'...")
with open("config.yaml", "r") as f:
    study_config = yaml.safe_load(f)

# B. Post-processing: Convert lists to numpy arrays
# YAML loads nested lists; our pipeline expects numpy arrays for masks.
study_config["model_physics"]["regime_constraints"]["regime_1_mask"] = np.array(
    study_config["model_physics"]["regime_constraints"]["regime_1_mask"]
)
study_config["model_physics"]["regime_constraints"]["regime_2_mask"] = np.array(
    study_config["model_physics"]["regime_constraints"]["regime_2_mask"]
)

# ==============================================================================
# 3. Execution
# ==============================================================================

print("\n>>> Executing Minsky Research Project Pipeline...")
try:
    # Assumption: All modules and callables are accurately imported into the working environment
    # This single call runs Validation -> Data Engineering -> Estimation -> Diagnostics -> Analysis -> Reporting
    project_results = execute_minsky_research_project(study_config, raw_datasets)
    print(">>> Execution Successful.")
except Exception as e:
    print(f">>> Execution Failed: {e}")
    # In a real scenario, we would inspect the logs here
    raise e

# ==============================================================================
# 4. Result Inspection
# ==============================================================================

print("\n--- Pipeline Output Summary ---")
print(f"Keys available: {list(project_results.keys())}")

# A. Inspect Minsky Classification for USA GDP/NFCD
minsky_analysis = project_results["analysis"]["minsky_conditions"]
usa_nfcd_key = ("USA", "GDP_NFCD")

if usa_nfcd_key in minsky_analysis:
    res = minsky_analysis[usa_nfcd_key]
    print(f"\n--- Minsky Analysis: {usa_nfcd_key} ---")
    print(f"Is Minsky Regime? {res['is_minsky']}")
    print(f"Oscillatory Dynamics: {res['dynamics']['is_oscillatory']}")
    print(f"Sign Pattern (beta1>0, alpha2<0): {res['signs']['pattern_valid']}")
    print(f"Statistically Significant: {res['significance']['valid']}")

# B. Inspect Monte Carlo Robustness
mc_results = project_results["robustness"]["monte_carlo"]
if usa_nfcd_key in mc_results:
    mc_stats = mc_results[usa_nfcd_key]
    print(f"\n--- Monte Carlo Robustness: {usa_nfcd_key} ---")
    print(f"Fraction satisfying Sign Pattern: {mc_stats['robustness_metrics']['frac_sign']:.2f}")
    print(f"Fraction satisfying Oscillation: {mc_stats['robustness_metrics']['frac_osc']:.2f}")

# C. View Generated LaTeX Table (Snippet)
latex_tables = project_results["latex_tables"]
print("\n--- LaTeX Parameter Table (Snippet) ---")
print(latex_tables["parameter_estimates"][:500] + "...") # Print first 500 chars

# D. View Technical Appendix
print("\n--- Technical Appendix ---")
print(project_results["technical_appendix"])

# Cleanup
os.remove("config.yaml")
print("\nRemoved 'config.yaml'.")
```



In [None]:
# Task 1: Validate Study Configuration Object Structure

# ==============================================================================
# Task 1: Validate and parse the study configuration dictionary
# ==============================================================================

class ConfigurationError(Exception):
    """
    Custom exception for study configuration validation failures.

    Purpose:
        This class serves as a specific exception identifier for errors detected
        during the validation of the `study_config` dictionary. It flags structural
        inconsistencies, missing keys (e.g., 'max_lags', 'k_regimes'), or invalid
        value types (e.g., negative lag lengths) before the computationally
        expensive estimation pipeline begins.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        configuration violation.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the initialization
        routines to halt immediately if the setup parameters are invalid,
        preventing runtime crashes deeper in the pipeline.

    Outputs:
        When raised, it propagates a `ConfigurationError` instance up the call
        stack, signaling that the model inputs are malformed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1:  Load the `study_parameters` dictionary and verify structural completeness.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_top_level_architecture(study_config: Dict[str, Any]) -> None:
    """
    Validates the top-level structure of the study configuration dictionary.

    This function enforces that the input is a dictionary and contains exactly
    the two required top-level keys: 'input_data_architecture' and 'model_physics'.
    This ensures the configuration object adheres to the strict schema required
    for the MS-VAR pipeline.

    Parameters
    ----------
    study_config : Dict[str, Any]
        The master configuration dictionary containing data schemas and model parameters.

    Returns
    -------
    None
        The function returns nothing if validation passes.

    Raises
    ------
    ConfigurationError
        If `study_config` is not a dictionary.
        If the set of keys does not exactly match the required set.
    """
    # Validate input type
    # We ensure the config is a dictionary to allow key-based access.
    if not isinstance(study_config, dict):
        raise ConfigurationError(
            f"Input 'study_config' must be a dictionary, got {type(study_config)}."
        )

    # Define the strict set of required top-level keys
    required_keys: Set[str] = {"input_data_architecture", "model_physics"}

    # Extract actual keys from the input
    actual_keys: Set[str] = set(study_config.keys())

    # Check for exact set equality
    # This prevents missing sections or the injection of unknown parameters.
    if actual_keys != required_keys:
        missing = required_keys - actual_keys
        extra = actual_keys - required_keys
        error_msg = "Top-level configuration keys do not match schema."
        if missing:
            error_msg += f" Missing: {missing}."
        if extra:
            error_msg += f" Unexpected: {extra}."
        raise ConfigurationError(error_msg)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Validate input data architecture schema completeness.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_input_data_architecture(study_config: Dict[str, Any]) -> None:
    """
    Validates the 'input_data_architecture' section of the configuration.

    This function enforces constraints on the temporal domain (1970-2020, Annual)
    and the dataset schemas for the six required countries. It verifies that
    each country has the four required features (GDP, NFCD, HD, STIR) and that
    the log-transformation flags are set correctly according to the paper's specification.

    Parameters
    ----------
    study_config : Dict[str, Any]
        The master configuration dictionary.

    Returns
    -------
    None

    Raises
    ------
    ConfigurationError
        If temporal domain parameters are incorrect.
        If required countries are missing.
        If feature schemas are incomplete or incorrect.
        If log-transformation flags violate the specification.
    """
    # Access the relevant subsection
    data_arch = study_config.get("input_data_architecture")
    if not isinstance(data_arch, dict):
        raise ConfigurationError("'input_data_architecture' must be a dictionary.")

    # 1. Validate Temporal Domain
    temporal = data_arch.get("temporal_domain", {})
    # Start year must be 1970
    if temporal.get("start_year") != 1970:
        raise ConfigurationError(f"Temporal domain start_year must be 1970, got {temporal.get('start_year')}.")
    # End year must be 2020
    if temporal.get("end_year") != 2020:
        raise ConfigurationError(f"Temporal domain end_year must be 2020, got {temporal.get('end_year')}.")
    # Frequency must be Annual Start ('AS')
    if temporal.get("frequency") != "AS":
        raise ConfigurationError(f"Temporal domain frequency must be 'AS', got {temporal.get('frequency')}.")

    # 2. Validate Datasets Keys (Countries)
    datasets = data_arch.get("datasets", {})
    required_countries: Set[str] = {"USA", "UK", "France", "Germany", "Canada", "Australia"}
    actual_countries: Set[str] = set(datasets.keys())

    if actual_countries != required_countries:
        missing = required_countries - actual_countries
        extra = actual_countries - required_countries
        raise ConfigurationError(
            f"Dataset country keys mismatch. Missing: {missing}. Unexpected: {extra}."
        )

    # 3. Validate Feature Schemas per Country
    required_features: Set[str] = {"real_gdp", "nfcd", "household_debt", "stir"}
    required_fields: Set[str] = {"description", "unit", "source", "dtype", "transform_log"}

    for country, schema in datasets.items():
        features = schema.get("features", {})
        actual_features = set(features.keys())

        # Check feature presence
        if actual_features != required_features:
            raise ConfigurationError(
                f"Feature keys for {country} mismatch. "
                f"Missing: {required_features - actual_features}. "
                f"Unexpected: {actual_features - required_features}."
            )

        # Check fields within each feature
        for feature_name, feature_meta in features.items():
            actual_fields = set(feature_meta.keys())
            # We check if required fields are a subset of actual fields (allowing extras is okay, but missing is not)
            if not required_fields.issubset(actual_fields):
                raise ConfigurationError(
                    f"Missing metadata fields in {country}/{feature_name}. "
                    f"Missing: {required_fields - actual_fields}."
                )

            # Check dtype
            if feature_meta["dtype"] != "float64":
                raise ConfigurationError(
                    f"Feature {country}/{feature_name} must have dtype 'float64'."
                )

            # 4. Validate Log Transformation Flags (Crucial for Model Physics)
            # GDP must be logged
            if feature_name == "real_gdp":
                if feature_meta["transform_log"] is not True:
                    raise ConfigurationError(
                        f"{country}/real_gdp must have 'transform_log': True."
                    )
            # Others must NOT be logged
            else:
                if feature_meta["transform_log"] is not False:
                    raise ConfigurationError(
                        f"{country}/{feature_name} must have 'transform_log': False."
                    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Validate model physics parameter domains and types.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_model_physics(study_config: Dict[str, Any]) -> None:
    """
    Validates the 'model_physics' section of the configuration.

    This function enforces the mathematical and statistical parameters required
    for the replication. It checks the HP filter settings, the MS-VAR regime
    structure (masks), the EM algorithm settings, and the statistical thresholds
    for ADF and Ljung-Box tests.

    Parameters
    ----------
    study_config : Dict[str, Any]
        The master configuration dictionary.

    Returns
    -------
    None

    Raises
    ------
    ConfigurationError
        If HP filter lambda is not 100.0.
        If regime masks are incorrect shapes or values.
        If EM settings are incorrect.
        If statistical thresholds do not match the paper's appendices.
    """
    physics = study_config.get("model_physics")
    if not isinstance(physics, dict):
        raise ConfigurationError("'model_physics' must be a dictionary.")

    # 1. Validate Signal Extraction (HP Filter)
    sig_ext = physics.get("signal_extraction", {})
    if sig_ext.get("algorithm") != "Hodrick-Prescott Filter":
        raise ConfigurationError("Signal extraction algorithm must be 'Hodrick-Prescott Filter'.")
    # Lambda must be 100.0 for annual data
    if sig_ext.get("lambda_parameter") != 100.0:
        raise ConfigurationError(f"HP Filter lambda must be 100.0, got {sig_ext.get('lambda_parameter')}.")
    if sig_ext.get("target_component") != "cyclical":
        raise ConfigurationError("Target component must be 'cyclical'.")

    # 2. Validate Regime Constraints (MS-VAR Structure)
    constraints = physics.get("regime_constraints", {})
    if constraints.get("num_regimes") != 2:
        raise ConfigurationError("Number of regimes must be 2.")
    if constraints.get("lag_order") != 1:
        raise ConfigurationError("Lag order must be 1.")

    # Validate Regime 1 Mask (Full Interaction: 2x2 ones)
    mask1 = constraints.get("regime_1_mask")
    expected_mask1 = np.array([[1, 1], [1, 1]], dtype=int)
    if not isinstance(mask1, np.ndarray) or mask1.shape != (2, 2):
        raise ConfigurationError("regime_1_mask must be a 2x2 numpy array.")
    if not np.array_equal(mask1, expected_mask1):
        raise ConfigurationError("regime_1_mask must be all ones (full interaction).")

    # Validate Regime 2 Mask (Diagonal: 2x2 identity-like structure)
    mask2 = constraints.get("regime_2_mask")
    expected_mask2 = np.array([[1, 0], [0, 1]], dtype=int)
    if not isinstance(mask2, np.ndarray) or mask2.shape != (2, 2):
        raise ConfigurationError("regime_2_mask must be a 2x2 numpy array.")
    if not np.array_equal(mask2, expected_mask2):
        raise ConfigurationError("regime_2_mask must be diagonal (no interaction).")

    # 3. Validate Estimation Engine
    engine = physics.get("estimation_engine", {})
    if engine.get("algorithm") != "Expectation-Maximization (EM)":
        raise ConfigurationError("Estimation algorithm must be 'Expectation-Maximization (EM)'.")

    criteria = engine.get("convergence_criteria", {})
    if criteria.get("tolerance") != 1e-6:
        raise ConfigurationError("EM tolerance must be 1e-6.")
    if criteria.get("max_iterations") != 1000:
        raise ConfigurationError("EM max_iterations must be 1000.")

    # 4. Validate Statistical Thresholds
    thresholds = physics.get("statistical_thresholds", {})

    # ADF Test (Appendix A)
    adf = thresholds.get("adf_test", {})
    if adf.get("critical_value_5pct") != -1.94:
        raise ConfigurationError(f"ADF critical value must be -1.94, got {adf.get('critical_value_5pct')}.")

    # Ljung-Box Test (Appendix B)
    lb = thresholds.get("ljung_box_test", {})
    if lb.get("critical_value") != 6.63:
        raise ConfigurationError(f"Ljung-Box critical value must be 6.63, got {lb.get('critical_value')}.")
    if lb.get("lags") != 3:
        raise ConfigurationError(f"Ljung-Box lags must be 3, got {lb.get('lags')}.")

    # Cycle Conditions (Section 2.1)
    cycle = thresholds.get("cycle_conditions", {})
    if cycle.get("beta1_sign") != "positive":
        raise ConfigurationError("beta1_sign must be 'positive'.")
    if cycle.get("alpha2_sign") != "negative":
        raise ConfigurationError("alpha2_sign must be 'negative'.")
    if cycle.get("discriminant_limit") != 0.0:
        raise ConfigurationError("discriminant_limit must be 0.0.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_study_config(study_config: Dict[str, Any]) -> bool:
    """
    Orchestrates the complete validation of the study configuration object.

    This function executes the validation pipeline in a granular sequence:
    1. Top-level architecture check.
    2. Input data schema check.
    3. Model physics parameter check.

    If any step fails, a ConfigurationError is raised with a descriptive message.
    If all steps pass, the function returns True, signaling that the configuration
    is valid and safe for downstream processing.

    Parameters
    ----------
    study_config : Dict[str, Any]
        The master configuration dictionary to be validated.

    Returns
    -------
    bool
        True if the configuration is valid.

    Raises
    ------
    ConfigurationError
        If any validation step fails.
    """
    # Step 1: Top-level check
    validate_top_level_architecture(study_config)

    # Step 2: Data architecture check
    validate_input_data_architecture(study_config)

    # Step 3: Model physics check
    validate_model_physics(study_config)

    return True


In [None]:
# Task 2: Validate Raw Datasets Dictionary Structure

# ==============================================================================
# Task 2: Validate Raw Datasets Dictionary Structure
# ==============================================================================

class DataValidationError(Exception):
    """
    Custom exception for dataset validation failures.

    Purpose:
        This class serves as a specific exception identifier for errors detected
        during the validation of the `datasets` dictionary or its constituent
        DataFrames. It is raised when data fails to meet structural (e.g.,
        missing columns), temporal (e.g., gaps in index), or domain (e.g.,
        non-numeric values) constraints required for the MS-VAR pipeline,
        distinguishing data integrity errors from configuration or runtime errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        validation failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the data loading
        routines to halt immediately if the input data is compromised.

    Outputs:
        When raised, it propagates a `DataValidationError` instance up the call
        stack, signaling that the input data structure is invalid.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Validate datasets container and country keys.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_datasets_container(datasets: Dict[str, pd.DataFrame]) -> None:
    """
    Validates the top-level structure of the datasets dictionary.

    This function enforces that the input `datasets` is a dictionary and that its
    keys exactly match the set of six required countries: USA, UK, France, Germany,
    Canada, and Australia. This ensures that the pipeline has the necessary
    geographical coverage as specified in the study configuration.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The dictionary containing raw country data.

    Returns
    -------
    None
        Returns nothing if validation passes.

    Raises
    ------
    DataValidationError
        If `datasets` is not a dictionary.
        If the set of keys does not exactly match the required countries.
        If any value in the dictionary is not a pandas DataFrame.
    """
    # Validate input type
    # We ensure the container is a dictionary to allow key-based access.
    if not isinstance(datasets, dict):
        raise DataValidationError(
            f"Input 'datasets' must be a dictionary, got {type(datasets)}."
        )

    # Define the strict set of required country keys
    required_countries: Set[str] = {"USA", "UK", "France", "Germany", "Canada", "Australia"}

    # Extract actual keys from the input
    actual_countries: Set[str] = set(datasets.keys())

    # Check for exact set equality
    # This prevents missing countries or the inclusion of unexpected ones.
    if actual_countries != required_countries:
        missing = required_countries - actual_countries
        extra = actual_countries - required_countries
        error_msg = "Dataset country keys do not match schema."
        if missing:
            error_msg += f" Missing: {missing}."
        if extra:
            error_msg += f" Unexpected: {extra}."
        raise DataValidationError(error_msg)

    # Validate value types
    # Each value must be a pandas DataFrame to support time-series operations.
    for country, df in datasets.items():
        if not isinstance(df, pd.DataFrame):
            raise DataValidationError(
                f"Data for {country} must be a pandas DataFrame, got {type(df)}."
            )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Validate DatetimeIndex properties for each country.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_datetime_index(datasets: Dict[str, pd.DataFrame]) -> None:
    """
    Validates the DatetimeIndex properties for each country's DataFrame.

    This function enforces strict temporal structure requirements:
    1. The index must be a pandas DatetimeIndex.
    2. The index must be strictly monotonic increasing (ordered in time).
    3. The index must be unique (no duplicate timestamps).
    4. The observations must be strictly annual (equidistant intervals of 365 or 366 days).
    5. The temporal span must cover at least 30 observations to support MS-VAR estimation.
    6. The data must fall within the 1970-2020 domain (though truncation happens later,
       we check bounds here).

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The dictionary containing raw country data.

    Returns
    -------
    None
        Returns nothing if validation passes.

    Raises
    ------
    DataValidationError
        If any temporal constraint is violated for any country.
    """
    for country, df in datasets.items():
        # 1. Check Index Type
        if not isinstance(df.index, pd.DatetimeIndex):
            raise DataValidationError(
                f"Index for {country} must be a DatetimeIndex, got {type(df.index)}."
            )

        # 2. Check Monotonicity
        if not df.index.is_monotonic_increasing:
            raise DataValidationError(f"Index for {country} is not strictly increasing.")

        # 3. Check Uniqueness
        if not df.index.is_unique:
            raise DataValidationError(f"Index for {country} contains duplicate timestamps.")

        # 4. Check Equidistance (Annual Frequency)
        # We calculate the difference between consecutive dates.
        # Since we expect annual data, diffs should be 365 or 366 days.
        if len(df) > 1:
            diffs = df.index.to_series().diff().dropna()
            days = diffs.dt.days
            # Allow for leap years (366) and standard years (365)
            valid_intervals = days.isin([365, 366])
            if not valid_intervals.all():
                invalid_diffs = days[~valid_intervals]
                raise DataValidationError(
                    f"Index for {country} is not strictly annual. "
                    f"Found intervals of {invalid_diffs.unique()} days."
                )

        # 5. Check Sample Size
        # MS-VAR estimation requires sufficient data points.
        if len(df) < 30:
            raise DataValidationError(
                f"Insufficient data for {country}. Expected >= 30 observations, got {len(df)}."
            )

        # 6. Check Temporal Bounds
        # While cleansing handles truncation, we ensure the raw data isn't completely out of bounds.
        start_date = df.index.min()
        end_date = df.index.max()

        # Bounds from study config (hardcoded here as per task context, but conceptually linked)
        min_allowed = pd.Timestamp("1970-01-01")
        max_allowed = pd.Timestamp("2020-01-01")

        if start_date > max_allowed:
            raise DataValidationError(f"Data for {country} starts after 2020 ({start_date}).")
        if end_date < min_allowed:
            raise DataValidationError(f"Data for {country} ends before 1970 ({end_date}).")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Validate column presence, types, and domain constraints.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_columns_and_domains(datasets: Dict[str, pd.DataFrame]) -> None:
    """
    Validates column presence, data types, and domain constraints for each DataFrame.

    This function enforces that:
    1. Each DataFrame contains exactly the four required columns: 'real_gdp', 'nfcd',
       'household_debt', 'stir'.
    2. All columns are of float64 type (or convertible).
    3. Domain constraints are met:
       - real_gdp > 0 (strictly positive for log transformation).
       - nfcd >= 0 (non-negative debt).
       - household_debt >= 0 (non-negative debt).
       - stir is within a plausible range [-5, 40].

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The dictionary containing raw country data.

    Returns
    -------
    None
        Returns nothing if validation passes.

    Raises
    ------
    DataValidationError
        If columns are missing or extra.
        If data types are incorrect.
        If domain constraints are violated (though NaNs are allowed here, to be cleaned later).
    """
    required_columns: Set[str] = {"real_gdp", "nfcd", "household_debt", "stir"}

    for country, df in datasets.items():
        # 1. Validate Column Presence
        actual_columns = set(df.columns)
        if actual_columns != required_columns:
            missing = required_columns - actual_columns
            extra = actual_columns - required_columns
            raise DataValidationError(
                f"Columns for {country} mismatch. "
                f"Missing: {missing}. Unexpected: {extra}."
            )

        # 2. Validate Data Types
        # We check if columns are numeric. We don't strictly enforce float64 here if
        # they are int, but we ensure they are numbers.
        for col in required_columns:
            if not pd.api.types.is_numeric_dtype(df[col]):
                raise DataValidationError(
                    f"Column '{col}' for {country} must be numeric, got {df[col].dtype}."
                )

        # 3. Validate Domain Constraints
        # We use boolean indexing to find violations. We ignore NaNs here as they
        # will be handled in the cleansing step.

        # Real GDP > 0
        gdp_violations = df["real_gdp"] <= 0
        if gdp_violations.any():
            # We raise an error for non-positive GDP because log(<=0) is undefined.
            # While cleansing could fix this, raw data with <=0 GDP is likely erroneous.
            bad_dates = df.index[gdp_violations].strftime('%Y-%m-%d').tolist()
            raise DataValidationError(
                f"Found non-positive Real GDP for {country} at {bad_dates}. "
                "GDP must be strictly positive for log transformation."
            )

        # NFCD >= 0
        nfcd_violations = (df["nfcd"] < 0) & df["nfcd"].notna()
        if nfcd_violations.any():
            bad_dates = df.index[nfcd_violations].strftime('%Y-%m-%d').tolist()
            raise DataValidationError(
                f"Found negative NFCD for {country} at {bad_dates}."
            )

        # Household Debt >= 0
        hd_violations = (df["household_debt"] < 0) & df["household_debt"].notna()
        if hd_violations.any():
            bad_dates = df.index[hd_violations].strftime('%Y-%m-%d').tolist()
            raise DataValidationError(
                f"Found negative Household Debt for {country} at {bad_dates}."
            )

        # STIR within [-5, 40]
        # Interest rates can be negative, but usually not below -5% or above 40% in developed economies.
        stir_violations = ((df["stir"] < -5) | (df["stir"] > 40)) & df["stir"].notna()
        if stir_violations.any():
            bad_dates = df.index[stir_violations].strftime('%Y-%m-%d').tolist()
            # We log a warning or raise an error. Given strict fidelity, we raise.
            raise DataValidationError(
                f"Found implausible STIR for {country} at {bad_dates}. "
                "Expected range [-5, 40]."
            )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_raw_datasets(datasets: Dict[str, pd.DataFrame]) -> bool:
    """
    Orchestrates the complete validation of the raw datasets.

    This function executes the validation pipeline for the input data:
    1. Validates the container structure and country keys.
    2. Validates the temporal structure (DatetimeIndex) of each DataFrame.
    3. Validates the column schema, data types, and value domains.

    If any step fails, a DataValidationError is raised. If all pass, returns True.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The dictionary containing raw country data.

    Returns
    -------
    bool
        True if the datasets are valid.

    Raises
    ------
    DataValidationError
        If any validation step fails.
    """
    # Step 1: Container and Keys Check
    validate_datasets_container(datasets)

    # Step 2: Temporal Structure Check
    validate_datetime_index(datasets)

    # Step 3: Column and Domain Check
    validate_columns_and_domains(datasets)

    return True


In [None]:
# Task 3: Data Cleansing and Temporal Alignment

# ==============================================================================
# Task 3: Data Cleansing and Temporal Alignment
# ==============================================================================

class DataCleansingError(Exception):
    """
    Custom exception for failures during data cleansing and alignment.

    Purpose:
        This class serves as a specialized exception identifier used during the
        data preprocessing stage. It distinguishes errors related to data integrity
        (e.g., null values, duplicate indices, mismatched timestamps) from
        generic runtime or configuration errors.

    Inputs:
        No specific input required beyond standard Exception arguments (typically
        an error message string).

    Processes:
        Inherits the standard behavior of the Python `Exception` class. It does
        not implement additional logic but provides a semantic distinction for
        error handling strategies (try/except blocks).

    Outputs:
        Raises a `DataCleansingError` instance, halting execution or triggering
        an exception handler.
    """
    # Execute the pass statement to maintain valid class syntax without adding methods or attributes.
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Handle missing and invalid values.
# -------------------------------------------------------------------------------------------------------------------------------

def handle_missing_and_invalid_values(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Identifies invalid values, interpolates isolated single-year gaps, and prepares
    data for truncation.

    This function iterates through each country's dataset and performs the following:
    1. Identifies invalid entries: NaNs, Infs, and domain violations (e.g., negative debt).
    2. Masks these invalid entries as NaN.
    3. Detects gaps in the data.
    4. Applies linear interpolation ONLY for single-year gaps (isolated missing values).
       Equation: x_t = x_{t-1} + (x_{t+1} - x_{t-1}) / 2
    5. Leaves multi-year gaps as NaN, to be handled by truncation in the next step.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The raw validated datasets.

    Returns
    -------
    Dict[str, pd.DataFrame]
        A dictionary of partially cleansed DataFrames (interpolated but not yet truncated).

    Raises
    ------
    DataCleansingError
        If interpolation fails or data structure is corrupted.
    """
    cleansed_datasets = {}

    for country, df in datasets.items():
        # Create a copy to avoid mutating the original
        df_clean = df.copy()

        # 1. Mask Domain Violations as NaN
        # Real GDP <= 0
        df_clean.loc[df_clean["real_gdp"] <= 0, "real_gdp"] = np.nan
        # NFCD < 0
        df_clean.loc[df_clean["nfcd"] < 0, "nfcd"] = np.nan
        # Household Debt < 0
        df_clean.loc[df_clean["household_debt"] < 0, "household_debt"] = np.nan
        # STIR outside [-5, 40]
        df_clean.loc[(df_clean["stir"] < -5) | (df_clean["stir"] > 40), "stir"] = np.nan

        # Replace Inf with NaN
        df_clean.replace([np.inf, -np.inf], np.nan, inplace=True)

        # 2. Interpolate Single-Year Gaps
        # We iterate column by column to handle gaps specific to each variable
        for col in df_clean.columns:
            series = df_clean[col]

            # Find indices where data is missing
            missing_mask = series.isna()
            if not missing_mask.any():
                continue

            # Get integer locations of NaNs
            nan_locs = np.where(missing_mask)[0]

            # Identify isolated NaNs
            isolated_nans = []
            for loc in nan_locs:
                # Check bounds
                if loc == 0 or loc == len(series) - 1:
                    continue # Cannot interpolate boundaries

                # Check neighbors
                if not missing_mask.iloc[loc - 1] and not missing_mask.iloc[loc + 1]:
                    isolated_nans.append(series.index[loc])

            # Apply interpolation only to isolated NaNs
            if isolated_nans:
                for date_idx in isolated_nans:
                    # Get previous and next valid values
                    # We know they exist and are at loc-1 and loc+1 because of the check above
                    loc = df_clean.index.get_loc(date_idx)
                    prev_val = series.iloc[loc - 1]
                    next_val = series.iloc[loc + 1]

                    # Linear interpolation formula
                    interpolated_val = prev_val + (next_val - prev_val) / 2.0

                    # Update DataFrame
                    df_clean.at[date_idx, col] = interpolated_val

                    logger.info(f"Interpolated isolated missing value for {country} - {col} at {date_idx.date()}")

        cleansed_datasets[country] = df_clean

    return cleansed_datasets

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Enforce temporal alignment within each country.
# -------------------------------------------------------------------------------------------------------------------------------

def enforce_temporal_alignment(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Truncates DataFrames to the maximal contiguous temporal intersection of all variables.

    This function ensures that for each country, all four variables (GDP, NFCD, HD, STIR)
    are present and valid for the exact same time range. It handles multi-year gaps
    by selecting the longest contiguous block of valid data.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The datasets with isolated gaps interpolated (from Step 1).

    Returns
    -------
    Dict[str, pd.DataFrame]
        The aligned and truncated DataFrames.

    Raises
    ------
    DataCleansingError
        If the resulting aligned dataset has fewer than 20 observations.
    """
    aligned_datasets = {}

    for country, df in datasets.items():
        # 1. Identify rows where ALL columns are valid (no NaNs)
        # After Step 1, any remaining NaNs are part of multi-year gaps or boundaries.
        valid_rows_mask = df.notna().all(axis=1)

        if not valid_rows_mask.any():
            logger.warning(f"No valid overlapping data found for {country}. Excluding from analysis.")
            continue

        # 2. Find contiguous blocks of valid data
        # We group by the difference between the index (integer location) and a counter.
        # Consecutive valid rows will have a constant difference.
        # Reset index to get integer index for calculation
        df_reset = df.reset_index()
        valid_indices = df_reset.index[valid_rows_mask.values]

        if len(valid_indices) == 0:
             logger.warning(f"No valid overlapping data found for {country}. Excluding from analysis.")
             continue

        # Group consecutive indices
        # Series(valid_indices) - Series(range(len)) is constant for contiguous blocks
        blocks = valid_indices.to_series().groupby(valid_indices - np.arange(len(valid_indices)))

        # 3. Select the longest block
        longest_block_indices = []
        max_len = 0

        for _, block in blocks:
            if len(block) > max_len:
                max_len = len(block)
                longest_block_indices = block.values
            # Tie-breaking: prefer earlier block? The prompt says "earliest chronologically"
            elif len(block) == max_len:
                # If current max_len is same, we keep the existing one
                pass

        # 4. Truncate DataFrame
        # Use the indices to slice the original DataFrame
        # We need to map integer indices back to the DatetimeIndex
        start_idx = longest_block_indices[0]
        end_idx = longest_block_indices[-1]

        # Slicing by integer position (iloc) is safer here
        df_aligned = df.iloc[start_idx : end_idx + 1].copy()

        # 5. Validate Sample Size
        if len(df_aligned) < 20:
            logger.warning(
                f"Aligned data for {country} has {len(df_aligned)} observations (minimum 20 required). "
                "Excluding from analysis."
            )
            continue

        # 6. Final Equidistance Check
        # Ensure no internal rows were dropped (which shouldn't happen with contiguous block logic)
        diffs = df_aligned.index.to_series().diff().dropna().dt.days
        if not diffs.isin([365, 366]).all():
             raise DataCleansingError(f"Temporal alignment failed for {country}: Resulting index is not equidistant.")

        aligned_datasets[country] = df_aligned

        logger.info(
            f"Aligned {country}: {df_aligned.index.min().date()} to {df_aligned.index.max().date()} "
            f"(T={len(df_aligned)})"
        )

    return aligned_datasets

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Validate cleansed data and freeze canonical input.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_and_freeze_data(datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
    """
    Performs final validation on cleansed datasets, computes summary statistics,
    and prepares the canonical data structure.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The aligned and truncated datasets.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing:
        - 'cleansed_datasets': The final DataFrames.
        - 'summary_statistics': Nested dict of stats per country/variable.

    Raises
    ------
    DataCleansingError
        If any final validation check fails.
    """
    summary_stats = {}

    for country, df in datasets.items():
        # 1. Re-run Validations
        # Check for NaNs
        if df.isna().any().any():
            raise DataCleansingError(f"Final validation failed for {country}: NaNs present.")

        # Check for Infs
        if np.isinf(df.values).any():
             raise DataCleansingError(f"Final validation failed for {country}: Infs present.")

        # Check Domains
        if (df["real_gdp"] <= 0).any():
             raise DataCleansingError(f"Final validation failed for {country}: Non-positive GDP.")
        if (df["nfcd"] < 0).any():
             raise DataCleansingError(f"Final validation failed for {country}: Negative NFCD.")
        if (df["household_debt"] < 0).any():
             raise DataCleansingError(f"Final validation failed for {country}: Negative Household Debt.")

        # 2. Compute Summary Statistics
        stats = df.describe().T
        stats["skew"] = df.skew()
        stats["kurtosis"] = df.kurt()
        stats["T"] = len(df)

        summary_stats[country] = stats.to_dict(orient="index")

    # 3. Prepare Result Structure
    result = {
        "cleansed_datasets": datasets,
        "summary_statistics": summary_stats
    }

    return result

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_and_align_datasets(datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
    """
    Orchestrates the data cleansing and temporal alignment pipeline.

    Sequence:
    1. Handle missing/invalid values (Interpolation).
    2. Enforce temporal alignment (Truncation).
    3. Validate and freeze canonical input (Stats & Final Check).

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The raw validated datasets.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing the cleansed datasets and summary statistics.
    """
    logger.info("Starting data cleansing pipeline...")

    # Step 1: Interpolation
    interpolated_data = handle_missing_and_invalid_values(datasets)

    # Step 2: Alignment/Truncation
    aligned_data = enforce_temporal_alignment(interpolated_data)

    # Step 3: Final Validation & Stats
    final_output = validate_and_freeze_data(aligned_data)

    logger.info("Data cleansing pipeline completed successfully.")
    return final_output



In [None]:
# Task 4: Apply Logarithmic Transformation to Real GDP

# ==============================================================================
# Task 4: Apply Logarithmic Transformation to Real GDP
# ==============================================================================

class TransformationError(Exception):
    """
    Custom exception for failures during data transformation.

    Purpose:
        This class serves as a specialized exception identifier used during the
        feature engineering or mathematical transformation stage. It flags issues
        such as failures in log-return calculations, division by zero during
        standardization, or errors in stationarity conversion.

    Inputs:
        No specific input required beyond standard Exception arguments (typically
        an error message string).

    Processes:
        Inherits the standard behavior of the Python `Exception` class. It does
        not implement additional logic but provides a semantic distinction for
        upstream error handling.

    Outputs:
        Raises a `TransformationError` instance, halting execution or triggering
        an exception handler.
    """
    # Execute the pass statement to maintain valid class syntax without adding methods or attributes.
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Define log transformation rule.
# -------------------------------------------------------------------------------------------------------------------------------

def define_log_transformation_rule(df: pd.DataFrame, country: str) -> pd.Series:
    """
    Defines and executes the logarithmic transformation rule for Real GDP.

    Equation:
        y_t^(log, c) = ln(gdp_t^c)

    This function takes a country's DataFrame, extracts the 'real_gdp' column,
    validates that all values are strictly positive, and applies the natural
    logarithm.

    Parameters
    ----------
    df : pd.DataFrame
        The cleansed DataFrame for a specific country.
    country : str
        The name of the country (for error messaging).

    Returns
    -------
    pd.Series
        The log-transformed Real GDP series.

    Raises
    ------
    TransformationError
        If 'real_gdp' contains non-positive values.
    """
    # Extract the series
    gdp_series = df["real_gdp"]

    # Validate strictly positive domain
    # Although Task 3 cleansed this, we verify defensively before the math operation.
    if (gdp_series <= 0).any():
        bad_indices = gdp_series[gdp_series <= 0].index.tolist()
        raise TransformationError(
            f"Cannot apply log transform for {country}: Found non-positive GDP at {bad_indices}."
        )

    # Apply natural logarithm
    # np.log is the natural logarithm (base e)
    log_gdp = np.log(gdp_series)

    return log_gdp

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Execute log transformation and validation.
# -------------------------------------------------------------------------------------------------------------------------------

def execute_log_transformation(datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
    """
    Executes the log transformation for all countries and validates the results.

    This function iterates through the datasets, applies the log transform to
    'real_gdp', stores the result in a new column 'log_real_gdp', and verifies
    that the output contains finite, numeric values.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The dictionary of cleansed country DataFrames.

    Returns
    -------
    Dict[str, pd.DataFrame]
        The datasets with the added 'log_real_gdp' column.

    Raises
    ------
    TransformationError
        If the transformation results in NaNs or Infs.
    """
    transformed_datasets = {}

    for country, df in datasets.items():
        # Create a copy to avoid mutating the input in place immediately
        df_transformed = df.copy()

        # Apply the transformation rule
        try:
            log_gdp_series = define_log_transformation_rule(df_transformed, country)
        except TransformationError as e:
            raise e

        # Validate the result
        if not np.isfinite(log_gdp_series).all():
            raise TransformationError(
                f"Log transformation for {country} resulted in non-finite values (NaN or Inf)."
            )

        # Store the result
        # We use a specific column name to distinguish from the raw level
        df_transformed["log_real_gdp"] = log_gdp_series

        # Verify index alignment
        if not df_transformed.index.equals(log_gdp_series.index):
             raise TransformationError(f"Index misalignment after log transform for {country}.")

        transformed_datasets[country] = df_transformed

        logger.info(f"Applied log transform to Real GDP for {country}.")

    return transformed_datasets

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Document transformation metadata.
# -------------------------------------------------------------------------------------------------------------------------------

def document_transformation_metadata(datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
    """
    Captures and documents metadata regarding the log transformation.

    This function records the min/max statistics of the original and transformed
    GDP series, the timestamp of the operation, and confirms that other variables
    remain in their original levels.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The datasets containing both 'real_gdp' and 'log_real_gdp'.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing metadata for each country.
    """
    metadata = {}
    timestamp = datetime.now().isoformat()

    for country, df in datasets.items():
        # Calculate statistics
        orig_min = df["real_gdp"].min()
        orig_max = df["real_gdp"].max()
        log_min = df["log_real_gdp"].min()
        log_max = df["log_real_gdp"].max()

        # Verify other variables are untouched (sanity check)
        country_meta = {
            "transformation_date": timestamp,
            "variable_transformed": "real_gdp",
            "transformation_type": "natural_log",
            "original_range": {"min": float(orig_min), "max": float(orig_max)},
            "log_range": {"min": float(log_min), "max": float(log_max)},
            "other_variables_status": "levels (untouched)"
        }

        metadata[country] = country_meta

    return metadata

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def apply_log_transform(datasets: Dict[str, pd.DataFrame]) -> Tuple[Dict[str, pd.DataFrame], Dict[str, Any]]:
    """
    Orchestrates the application of the logarithmic transformation to Real GDP.

    Sequence:
    1. Define and execute the log transformation for all countries.
    2. Validate the results (finite, aligned).
    3. Document metadata for reproducibility.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The cleansed and aligned datasets.

    Returns
    -------
    Tuple[Dict[str, pd.DataFrame], Dict[str, Any]]
        - The datasets with 'log_real_gdp' added.
        - The transformation metadata.
    """
    logger.info("Starting log transformation pipeline...")

    # Steps 1 & 2: Execute and Validate
    transformed_data = execute_log_transformation(datasets)

    # Step 3: Document Metadata
    metadata = document_transformation_metadata(transformed_data)

    logger.info("Log transformation pipeline completed successfully.")
    return transformed_data, metadata


In [None]:
# Task 5: Implement Hodrick-Prescott Filter for Cycle Extraction

# ==============================================================================
# Task 5: Implement Hodrick-Prescott Filter for Cycle Extraction
# ==============================================================================

class HPFilterError(Exception):
    """
    Custom exception for failures during HP filtering.

    Purpose:
        This class acts as a distinct identifier for runtime errors encountered
        specifically while applying the Hodrick-Prescott filter. This allows
        calling functions to differentiate between general data processing
        errors and specific mathematical failures within the detrending
        algorithm (e.g., invalid lambda parameters or data dimensionality issues).

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts standard arguments passed to the base `Exception` class,
        typically an error message string describing the failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a specific type in the exception hierarchy for targeted `except` blocks.

    Outputs:
        When raised, it propagates an `HPFilterError` instance up the call
        stack, signaling a specific failure in the trend-cycle decomposition
        process.
    """
    # Execute the pass statement to ensure syntactical correctness while defining an empty class body
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1 & 2: Define HP filter optimization problem and solve linear system.
# -------------------------------------------------------------------------------------------------------------------------------

def hp_filter_sparse(series: pd.Series, lamb: float = 100.0) -> Tuple[pd.Series, pd.Series]:
    """
    Implements the Hodrick-Prescott (HP) filter using sparse matrix algebra.

    The HP filter decomposes a time series x_t into a trend component tau_t
    and a cyclical component c_t by solving the minimization problem:

    min_tau sum((x_t - tau_t)^2) + lambda * sum(((tau_{t+1} - tau_t) - (tau_t - tau_{t-1}))^2)

    The first-order condition yields the linear system:
    (I + lambda * D^T D) * tau = x

    where D is the second-difference matrix.

    Parameters
    ----------
    series : pd.Series
        The input time series (e.g., log GDP).
    lamb : float, optional
        The smoothing parameter lambda. Default is 100.0 for annual data.

    Returns
    -------
    Tuple[pd.Series, pd.Series]
        - The trend component (tau).
        - The cyclical component (cycle = x - tau).

    Raises
    ------
    HPFilterError
        If the linear system cannot be solved.
    """
    # Validate input
    if series.empty:
        raise HPFilterError("Input series is empty.")

    # Convert to numpy array for calculation
    x = series.values
    n = len(x)

    if n < 3:
        raise HPFilterError(f"Series length {n} is too short for HP filter (min 3).")

    # Construct the second-difference matrix D
    data = np.array([np.ones(n-2), -2*np.ones(n-2), np.ones(n-2)])
    offsets = [0, 1, 2]
    # D shape is (n-2, n)
    D = sparse.diags(data, offsets, shape=(n-2, n))

    # Construct the system matrix A = I + lambda * D^T D
    # I is n x n identity
    I = sparse.eye(n)

    # D.T @ D results in a pentadiagonal matrix
    # A is symmetric positive definite
    A = I + lamb * (D.T @ D)

    # Solve the linear system A * tau = x
    try:
        # spsolve is efficient for sparse CSR/CSC matrices
        tau = spsolve(A.tocsc(), x)
    except Exception as e:
        raise HPFilterError(f"Failed to solve HP filter linear system: {e}")

    # Compute cycle
    cycle = x - tau

    # Convert back to pandas Series with original index
    trend_series = pd.Series(tau, index=series.index, name=f"{series.name}_trend")
    cycle_series = pd.Series(cycle, index=series.index, name=f"{series.name}_cycle")

    return trend_series, cycle_series

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Apply HP filter to all series and countries.
# -------------------------------------------------------------------------------------------------------------------------------

def apply_hp_filter_to_datasets(datasets: Dict[str, pd.DataFrame], lamb: float = 100.0) -> Dict[str, pd.DataFrame]:
    """
    Applies the HP filter to all relevant variables for each country.

    Variables processed:
    1. 'log_real_gdp' -> 'gdp_cycle' (Real Variable y_t)
    2. 'nfcd' -> 'nfcd_cycle' (Financial Variable f_t)
    3. 'household_debt' -> 'hd_cycle' (Financial Variable f_t)
    4. 'stir' -> 'stir_cycle' (Financial Variable f_t)

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The datasets containing log-transformed GDP and raw financial variables.
    lamb : float
        The smoothing parameter (default 100.0).

    Returns
    -------
    Dict[str, pd.DataFrame]
        The datasets enriched with cyclical component columns.

    Raises
    ------
    HPFilterError
        If filtering fails for any series.
    """
    processed_datasets = {}

    # Define mapping from input column to output cycle name
    # Note: 'log_real_gdp' is the input for GDP cycle, not 'real_gdp'
    variable_map = {
        "log_real_gdp": "gdp_cycle",
        "nfcd": "nfcd_cycle",
        "household_debt": "hd_cycle",
        "stir": "stir_cycle"
    }

    for country, df in datasets.items():
        df_processed = df.copy()

        for input_col, output_col in variable_map.items():
            if input_col not in df_processed.columns:
                raise HPFilterError(f"Required column '{input_col}' missing for {country}.")

            series = df_processed[input_col]

            # Apply HP Filter
            try:
                _, cycle = hp_filter_sparse(series, lamb=lamb)
            except HPFilterError as e:
                raise HPFilterError(f"HP Filter failed for {country} - {input_col}: {e}")

            # Store cycle
            df_processed[output_col] = cycle

            # Validation: Check for NaNs in result (shouldn't happen if input was clean)
            if cycle.isna().any():
                 raise HPFilterError(f"HP Filter produced NaNs for {country} - {input_col}.")

        processed_datasets[country] = df_processed
        logger.info(f"Extracted cyclical components for {country} (lambda={lamb}).")

    return processed_datasets

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def extract_hp_cycles(datasets: Dict[str, pd.DataFrame], study_config: Dict[str, Any]) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the extraction of cyclical components using the HP filter.

    Sequence:
    1. Retrieve lambda parameter from configuration.
    2. Apply HP filter to all required series.
    3. Return enriched datasets.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The datasets with log-transformed GDP.
    study_config : Dict[str, Any]
        The study configuration containing model physics parameters.

    Returns
    -------
    Dict[str, pd.DataFrame]
        The datasets with added cyclical components.
    """
    logger.info("Starting HP filter cycle extraction...")

    # 1. Retrieve Lambda
    try:
        lamb = study_config["model_physics"]["signal_extraction"]["lambda_parameter"]
    except KeyError:
        raise HPFilterError("Configuration missing 'lambda_parameter' in 'model_physics/signal_extraction'.")

    # 2. Apply Filter
    datasets_with_cycles = apply_hp_filter_to_datasets(datasets, lamb=lamb)

    logger.info("Cycle extraction completed successfully.")
    return datasets_with_cycles


In [None]:
# Task 6: Construct Bivariate Cyclical State Vectors

# ==============================================================================
# Task 6: Construct Bivariate Cyclical State Vectors
# ==============================================================================

class StateVectorError(Exception):
    """
    Custom exception for failures during state vector construction.

    Purpose:
        This class acts as a specific error identifier used when aggregating
        individual time series (e.g., Carbon, Clean Energy, Geopolitical Risk)
        into the unified state vector $Y_t$ required for the MS-VAR model.
        It allows the system to segregate errors related to vector dimensionality
        and component alignment from general preprocessing exceptions.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically an error message string describing the specific construction failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy to facilitate targeted error
        trapping during the matrix formation steps.

    Outputs:
        When raised, it propagates a `StateVectorError` instance up the call
        stack, signaling that the formation of the endogenous variable vector
        has failed.
    """
    # Execute the pass statement to ensure valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1 & 2: Define bivariate systems and organize into containers.
# -------------------------------------------------------------------------------------------------------------------------------

def build_bivariate_systems(datasets: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, Any]]:
    """
    Constructs the three bivariate cyclical state vectors for each country.

    For each country c, creates:
    1. System A (GDP/NFCD): y_t = [gdp_cycle, nfcd_cycle]'
    2. System B (GDP/HD):   y_t = [gdp_cycle, hd_cycle]'
    3. System C (GDP/STIR): y_t = [gdp_cycle, stir_cycle]'

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The datasets containing cyclical components (output of Task 5).

    Returns
    -------
    Dict[str, Dict[str, Any]]
        A nested dictionary structure:
        {
            country: {
                "GDP_NFCD": {
                    "data": np.ndarray (T x 2),
                    "index": pd.DatetimeIndex,
                    "T": int
                },
                "GDP_HD": { ... },
                "GDP_STIR": { ... }
            }
        }

    Raises
    ------
    StateVectorError
        If required cyclical columns are missing.
    """
    bivariate_systems = {}

    # Define the systems and their constituent columns
    # Order is CRITICAL: [Real Variable, Financial Variable]
    # This aligns with the MS-VAR coefficient matrix structure:
    # [ alpha1 alpha2 ]
    # [ beta1  beta2  ]
    systems_map = {
        "GDP_NFCD": ("gdp_cycle", "nfcd_cycle"),
        "GDP_HD":   ("gdp_cycle", "hd_cycle"),
        "GDP_STIR": ("gdp_cycle", "stir_cycle")
    }

    for country, df in datasets.items():
        country_systems = {}

        for system_name, (real_col, fin_col) in systems_map.items():
            # Validate column presence
            if real_col not in df.columns or fin_col not in df.columns:
                raise StateVectorError(
                    f"Missing columns for {country} system {system_name}. "
                    f"Required: {real_col}, {fin_col}."
                )

            # Extract series
            real_series = df[real_col].values
            fin_series = df[fin_col].values

            # Stack into (T, 2) array
            # column 0 = real, column 1 = financial
            data_matrix = np.column_stack((real_series, fin_series))

            # Store with metadata
            country_systems[system_name] = {
                "data": data_matrix,
                "index": df.index,
                "T": len(df),
                "columns": [real_col, fin_col] # Record for traceability
            }

        bivariate_systems[country] = country_systems
        logger.info(f"Constructed 3 bivariate systems for {country}.")

    return bivariate_systems

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Validate cyclical component properties.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_cyclical_properties(bivariate_systems: Dict[str, Dict[str, Any]]) -> None:
    """
    Validates the statistical properties of the constructed cyclical state vectors.

    Checks:
    1. Zero Mean: HP filtered cycles should have a mean close to zero.
    2. Finite Values: No NaNs or Infs.
    3. Non-trivial Variance: Standard deviation should be positive.

    Parameters
    ----------
    bivariate_systems : Dict[str, Dict[str, Any]]
        The constructed systems from Step 1.

    Returns
    -------
    None

    Raises
    ------
    StateVectorError
        If validation fails (e.g., NaNs present).
    """
    for country, systems in bivariate_systems.items():
        for system_name, sys_data in systems.items():
            data = sys_data["data"]

            # 1. Check for NaNs/Infs
            if not np.isfinite(data).all():
                raise StateVectorError(f"Non-finite values found in {country} - {system_name}.")

            # 2. Check Mean (should be approx 0)
            means = np.mean(data, axis=0)
            # We use a relatively loose tolerance because for short samples (T~50),
            # the HP filter mean isn't exactly zero, but should be small.
            if np.any(np.abs(means) > 1e-1):
                logger.warning(
                    f"Large mean detected for {country} - {system_name}: {means}. "
                    "Check HP filter parameters."
                )

            # 3. Check Variance (should be > 0)
            stds = np.std(data, axis=0)
            if np.any(stds == 0):
                raise StateVectorError(f"Zero variance detected for {country} - {system_name}.")

            # Log stats for audit
            logger.debug(f"{country} {system_name} stats: Mean={means}, Std={stds}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def construct_bivariate_systems(datasets: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, Any]]:
    """
    Orchestrates the construction and validation of bivariate cyclical state vectors.

    Sequence:
    1. Build systems (stacking arrays).
    2. Validate statistical properties.
    3. Return structured dictionary.

    Parameters
    ----------
    datasets : Dict[str, pd.DataFrame]
        The datasets containing cyclical components.

    Returns
    -------
    Dict[str, Dict[str, Any]]
        The validated bivariate systems ready for MS-VAR estimation.
    """
    logger.info("Starting bivariate system construction...")

    # Step 1 & 2: Build
    systems = build_bivariate_systems(datasets)

    # Step 3: Validate
    validate_cyclical_properties(systems)

    logger.info("Bivariate system construction completed successfully.")
    return systems


In [None]:
# Task 7: Augmented Dickey–Fuller (ADF) Test Specification

# ==============================================================================
# Task 7: Augmented Dickey-Fuller (ADF) Test Specification
# ==============================================================================

class ADFTestError(Exception):
    """
    Custom exception for failures during ADF testing.

    Purpose:
        This class serves as a specialized exception identifier for errors
        encountered during the assessment of time series stationarity using the
        Augmented Dickey-Fuller (ADF) test. It distinguishes statistical or
        computational failures within the stationarity testing module (e.g.,
        convergence issues, insufficient observations for lag selection) from
        general runtime errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically an error message string detailing the cause of the ADF failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy to allow for granular error
        handling strategies during the pre-estimation phase.

    Outputs:
        When raised, it propagates an `ADFTestError` instance up the call
        stack, signaling that the stationarity verification for a specific
        time series has failed.
    """
    # Execute the pass statement to ensure valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1 & 2: Define ADF regression, lag selection, and estimation.
# -------------------------------------------------------------------------------------------------------------------------------

def execute_adf_test(series: pd.Series, max_lag: Optional[int] = None, autolag: str = 'AIC') -> Dict[str, Any]:
    """
    Executes the Augmented Dickey-Fuller (ADF) test on a single time series.

    Regression Specification:
        Delta z_t = mu + gamma * z_{t-1} + sum_{j=1}^p phi_j * Delta z_{t-j} + u_t

    This specification includes a constant (intercept) but no deterministic trend,
    appropriate for cyclical components centered around zero.

    Lag Selection:
        If max_lag is None, it is calculated as: p_max = floor(12 * (T / 100)^0.25).
        The optimal lag p is selected by minimizing the specified information criterion (AIC or BIC).

    Parameters
    ----------
    series : pd.Series
        The time series to test (e.g., a cyclical component).
    max_lag : int, optional
        Maximum lag order. If None, calculated based on sample size.
    autolag : str
        Method to select lag length {'AIC', 'BIC', 't-stat', None}. Default 'AIC'.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing:
        - 'adf_stat': The test statistic (t-stat of gamma).
        - 'p_value': MacKinnon's approximate p-value.
        - 'used_lag': The lag order p selected.
        - 'n_obs': Number of observations used in regression.
        - 'critical_values': Dictionary of critical values from standard tables.
        - 'icbest': The value of the information criterion at the optimal lag.

    Raises
    ------
    ADFTestError
        If the test fails (e.g., input too short, constant series).
    """
    # Validate input
    if series.empty:
        raise ADFTestError("Input series is empty.")

    # Drop NaNs (ADF cannot handle missing values)
    clean_series = series.dropna()
    n = len(clean_series)

    if n < 5: # Minimal length for lag=1 + regression
        raise ADFTestError(f"Series length {n} is too short for ADF test.")

    # Calculate max_lag if not provided
    if max_lag is None:
        # Schwert's rule of thumb
        max_lag = int(12 * (n / 100)**0.25)

    # Ensure max_lag is valid
    max_lag = min(max_lag, n - 2)
    if max_lag < 0:
        max_lag = 0

    try:
        # Execute ADF test using statsmodels
        # regression='c' implies constant only (intercept)
        result = adfuller(clean_series, maxlag=max_lag, regression='c', autolag=autolag)

        adf_stat, p_value, used_lag, n_obs, critical_values, icbest = result

        return {
            "adf_stat": float(adf_stat),
            "p_value": float(p_value),
            "used_lag": int(used_lag),
            "n_obs": int(n_obs),
            "critical_values": critical_values,
            "icbest": float(icbest) if icbest is not None else None
        }

    except Exception as e:
        raise ADFTestError(f"ADF test failed: {e}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 3: Define decision rule and critical values.
# -------------------------------------------------------------------------------------------------------------------------------

def evaluate_stationarity(adf_result: Dict[str, Any], critical_value: float = -1.94) -> Dict[str, Any]:
    """
    Evaluates the stationarity hypothesis based on the ADF statistic and a custom critical value.

    Null Hypothesis (H0): The series has a unit root (gamma = 0).
    Alternative Hypothesis (H1): The series is stationary (gamma < 0).

    Decision Rule:
        If adf_stat < critical_value, Reject H0 (Stationary).
        Else, Fail to Reject H0 (Non-Stationary).

    Parameters
    ----------
    adf_result : Dict[str, Any]
        The output from `execute_adf_test`.
    critical_value : float
        The critical value for the test (default -1.94 from paper).

    Returns
    -------
    Dict[str, Any]
        The input dictionary enriched with:
        - 'custom_critical_value': The threshold used.
        - 'is_stationary': Boolean indicating if H0 was rejected.
        - 'decision': String description ("Stationary" or "Unit Root").
    """
    stat = adf_result["adf_stat"]

    # Decision logic
    is_stationary = stat < critical_value

    result = adf_result.copy()
    result["custom_critical_value"] = critical_value
    result["is_stationary"] = is_stationary
    result["decision"] = "Stationary" if is_stationary else "Unit Root"

    return result

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_adf_test(series: pd.Series, study_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the ADF test for a single series using study configuration parameters.

    Sequence:
    1. Retrieve critical value from config.
    2. Execute ADF test.
    3. Evaluate stationarity against the specific critical value.

    Parameters
    ----------
    series : pd.Series
        The time series to test.
    study_config : Dict[str, Any]
        The study configuration containing statistical thresholds.

    Returns
    -------
    Dict[str, Any]
        The complete ADF test results and decision.
    """
    # 1. Retrieve Parameters
    try:
        crit_val = study_config["model_physics"]["statistical_thresholds"]["adf_test"]["critical_value_5pct"]
    except KeyError:
        # Fallback or raise error? Raising error ensures fidelity.
        raise ADFTestError("Configuration missing 'critical_value_5pct' in 'statistical_thresholds/adf_test'.")

    # 2. Execute Test
    # We use AIC for lag selection as it's a common standard if not specified otherwise
    raw_result = execute_adf_test(series, autolag='AIC')

    # 3. Evaluate
    final_result = evaluate_stationarity(raw_result, critical_value=crit_val)

    return final_result


In [None]:
# Task 8: Execute ADF Tests for All Cyclical Series

# ==============================================================================
# Task 8: Execute ADF Tests for All Cyclical Series
# ==============================================================================

class StationarityWarning(Warning):
    """
    Warning for series that fail the stationarity test.

    Purpose:
        This class serves as a specific non-fatal alert used during the data
        preprocessing phase. It is issued when a time series fails to reject the
        null hypothesis of a unit root (e.g., via the ADF test) after applied
        transformations (log-returns, differencing). It flags potential
        violations of the asymptotic assumptions required for stable VAR parameter
        estimation without halting the entire pipeline.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts standard arguments passed to the base `Warning` class,
        typically a message string indicating which series failed and its test
        statistics.

    Processes:
        The class inherits the standard behavior of the Python `Warning` class.
        It allows for the implementation of distinct filtering policies (e.g.,
        `warnings.simplefilter`) to treat stationarity issues separately from
        deprecation or runtime warnings.

    Outputs:
        When issued, it prints a warning message to the standard error stream
        or logging system, alerting the analyst to potential model instability
        due to non-stationary inputs.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1 & 2: Apply ADF test to real and financial cycles.
# -------------------------------------------------------------------------------------------------------------------------------

def test_all_cycles(bivariate_systems: Dict[str, Dict[str, Any]], study_config: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Executes the ADF test for every unique cyclical series in the bivariate systems.

    Iterates through each country and system:
    1. Tests the GDP cycle (Real Variable) once per country.
    2. Tests the Financial Variable cycle for each system.

    Parameters
    ----------
    bivariate_systems : Dict[str, Dict[str, Any]]
        The constructed bivariate systems containing data arrays.
    study_config : Dict[str, Any]
        The study configuration containing statistical thresholds.

    Returns
    -------
    List[Dict[str, Any]]
        A list of result dictionaries, each containing:
        - 'country': Country name.
        - 'variable': Variable name (e.g., 'GDP_cycle', 'NFCD_cycle').
        - 'adf_stat': Test statistic.
        - 'p_value': Approximate p-value.
        - 'is_stationary': Boolean decision.
        - 'decision': String description.
        - 'used_lag': Lag order used.
    """
    all_results = []

    for country, systems in bivariate_systems.items():
        # Track if GDP has been tested for this country to avoid redundancy
        gdp_tested = False

        for system_name, sys_data in systems.items():
            # Extract data and metadata
            data = sys_data["data"]
            index = sys_data["index"]
            columns = sys_data["columns"] # e.g., ['gdp_cycle', 'nfcd_cycle']

            # 1. Test GDP Cycle (Column 0)
            if not gdp_tested:
                gdp_series = pd.Series(data[:, 0], index=index, name=columns[0])
                try:
                    # run_adf_test is defined in Task 7 context (assumed available)
                    # We call the orchestrator from Task 7
                    gdp_result = run_adf_test(gdp_series, study_config)

                    # Enrich result with metadata
                    gdp_result.update({
                        "country": country,
                        "variable": columns[0], # 'gdp_cycle'
                        "system_context": "All"
                    })
                    all_results.append(gdp_result)
                    gdp_tested = True

                except Exception as e:
                    logger.error(f"ADF test failed for {country} GDP: {e}")

            # 2. Test Financial Cycle (Column 1)
            fin_series = pd.Series(data[:, 1], index=index, name=columns[1])
            try:
                fin_result = run_adf_test(fin_series, study_config)

                fin_result.update({
                    "country": country,
                    "variable": columns[1], # e.g., 'nfcd_cycle'
                    "system_context": system_name
                })
                all_results.append(fin_result)

            except Exception as e:
                logger.error(f"ADF test failed for {country} {columns[1]}: {e}")

    return all_results

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 3: Validate stationarity assumption and document exceptions.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_stationarity_results(adf_results: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Validates that all tested series are stationary and documents any exceptions.

    This function iterates through the results of the Augmented Dickey-Fuller (ADF)
    tests performed on the cyclical components. It checks the 'is_stationary' flag
    for each series. If any series fails to reject the null hypothesis of a unit root
    (i.e., is non-stationary), it is flagged and logged as a warning.

    The function produces a summary report indicating whether the stationarity
    assumption holds globally across the dataset, which is a prerequisite for
    stable MS-VAR estimation.

    Parameters
    ----------
    adf_results : List[Dict[str, Any]]
        The list of ADF test results, where each dictionary contains the test
        statistics, critical values, and decision for a specific country-variable pair.

    Returns
    -------
    Dict[str, Any]
        A summary dictionary containing:
        - 'all_stationary': Boolean indicating if all series passed the stationarity test.
        - 'failed_series': A list of dictionaries detailing the series that failed the test.
        - 'full_report': The complete input list of results for archival purposes.
    """
    failed_series = []

    # Iterate through each result to check the stationarity decision
    for res in adf_results:
        if not res["is_stationary"]:
            # Record details of the failure for the summary report
            failed_series.append({
                "country": res["country"],
                "variable": res["variable"],
                "adf_stat": res["adf_stat"],
                "critical_value": res["custom_critical_value"]
            })

            # Log a warning with specific statistical details
            logger.warning(
                f"Unit root not rejected for {res['country']} - {res['variable']} "
                f"(Stat: {res['adf_stat']:.3f} > Crit: {res['custom_critical_value']})"
            )

    # Determine global success
    all_passed = len(failed_series) == 0

    if all_passed:
        logger.info("All cyclical series confirmed stationary (I(0)).")
    else:
        logger.warning(f"{len(failed_series)} series failed stationarity test.")

    return {
        "all_stationary": all_passed,
        "failed_series": failed_series,
        "full_report": adf_results
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def execute_all_adf_tests(bivariate_systems: Dict[str, Dict[str, Any]], study_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the execution and validation of ADF tests for all cyclical series.

    Sequence:
    1. Run ADF tests on all unique series.
    2. Validate results against the critical value.
    3. Return comprehensive report.

    Parameters
    ----------
    bivariate_systems : Dict[str, Dict[str, Any]]
        The constructed bivariate systems.
    study_config : Dict[str, Any]
        The study configuration.

    Returns
    -------
    Dict[str, Any]
        The validation summary and full results.
    """
    logger.info("Starting ADF stationarity testing...")

    # Step 1 & 2: Execute Tests
    raw_results = test_all_cycles(bivariate_systems, study_config)

    # Step 3: Validate
    validation_summary = validate_stationarity_results(raw_results)

    logger.info("ADF testing completed.")
    return validation_summary


In [None]:
# Task 9: Define the Two-Regime MS-VAR(1) Model

# ==============================================================================
# Task 9: Define the Two-Regime MS-VAR(1) Model
# ==============================================================================

class MSVARModel:
    """
    Represents a Two-Regime Markov-Switching Vector Autoregression (MS-VAR) model of order 1.

    This class encapsulates the data, parameters, and structural constraints for a
    bivariate MS-VAR(1) model. It supports regime-dependent coefficient matrices
    and covariance matrices, governed by a first-order Markov chain.

    Model Specification:
        y_t = A(s_t) * y_{t-1} + epsilon_t
        epsilon_t ~ N(0, Sigma(s_t))
        s_t follows a first-order Markov chain with transition matrix P.

    Regimes:
        Regime 1 (Index 0): Interaction Regime (Minsky). Full A matrix allowing cross-variable feedback.
        Regime 2 (Index 1): Independence Regime (No Minsky). Diagonal A matrix enforcing decoupling.

    Attributes:
        country (str): The country name associated with the data.
        system_name (str): The system identifier (e.g., 'GDP_NFCD').
        data (np.ndarray): The bivariate time series data of shape (T, 2).
        T (int): Number of observations.
        params (Dict[str, Any]): Dictionary holding current parameter estimates:
            - 'A': (2, 2, 2) array, A[s, :, :] representing coefficient matrices.
            - 'Sigma': (2, 2, 2) array, Sigma[s, :, :] representing covariance matrices.
            - 'P': (2, 2) array, transition matrix P[i, j] = Prob(s_t=j | s_{t-1}=i).
        masks (Dict[int, np.ndarray]): Structural masks for A matrices to enforce regime constraints.
    """

    def __init__(self, country: str, system_name: str, data: np.ndarray, study_config: Dict[str, Any]):
        """
        Initializes the MS-VAR model structure with data and configuration constraints.

        Parameters
        ----------
        country : str
            Name of the country.
        system_name : str
            Name of the bivariate system (e.g., 'GDP_NFCD').
        data : np.ndarray
            The bivariate cyclical data array of shape (T, 2).
        study_config : Dict[str, Any]
            The study configuration dictionary containing regime constraints and masks.
        """
        # Initialize key variables
        self.country = country
        self.system_name = system_name
        self.data = data
        self.T = data.shape[0]

        # Initialize parameter container with zeros
        # Shapes correspond to 2 regimes, 2 variables
        self.params = {
            "A": np.zeros((2, 2, 2)),      # Stacked A matrices: [regime, row, col]
            "Sigma": np.zeros((2, 2, 2)),  # Stacked Sigma matrices: [regime, row, col]
            "P": np.zeros((2, 2))          # Transition matrix: [from_state, to_state]
        }

        # Load structural masks from config to enforce regime-specific dynamics
        # Note: Config uses 1-based indexing in names, we map to 0-based indices internally
        # Regime 1 (Index 0) -> Full Interaction (Mask of ones)
        # Regime 2 (Index 1) -> Diagonal (Identity-like mask)
        try:
            mask1 = study_config["model_physics"]["regime_constraints"]["regime_1_mask"]
            mask2 = study_config["model_physics"]["regime_constraints"]["regime_2_mask"]

            self.masks = {
                0: np.array(mask1, dtype=int),
                1: np.array(mask2, dtype=int)
            }
        except KeyError:
            # Fallback to default if config is malformed (though validation should prevent this)
            # This ensures the class remains robust even if config is partial during testing
            logging.warning("Regime masks not found in config. Using defaults.")
            self.masks = {
                0: np.ones((2, 2), dtype=int),
                1: np.eye(2, dtype=int)
            }

    def set_parameters(self, A: np.ndarray, Sigma: np.ndarray, P: np.ndarray) -> None:
        """
        Sets the model parameters, strictly enforcing structural constraints on A.

        This method updates the internal parameter state. It applies the regime-specific
        masks to the coefficient matrices A to ensure that Regime 2 remains diagonal
        (decoupled) regardless of the input values.

        Parameters
        ----------
        A : np.ndarray
            Coefficient matrices of shape (2, 2, 2).
        Sigma : np.ndarray
            Covariance matrices of shape (2, 2, 2).
        P : np.ndarray
            Transition matrix of shape (2, 2).
        """
        # Enforce masks on A by element-wise multiplication
        # This zeroes out off-diagonal elements in Regime 2 (Index 1)
        A_constrained = A.copy()
        A_constrained[0] = A[0] * self.masks[0]
        A_constrained[1] = A[1] * self.masks[1]

        self.params["A"] = A_constrained
        self.params["Sigma"] = Sigma
        self.params["P"] = P

    def validate_parameters(self) -> bool:
        """
        Validates the current parameters for mathematical and structural consistency.

        Checks performed:
        1. Transition matrix P: Row sums must equal 1 (stochastic matrix).
        2. Covariance matrices Sigma: Must be positive definite for both regimes.
        3. Coefficient matrices A: Must strictly obey the structural masks (zeros where required).

        Returns
        -------
        bool
            True if all parameters are valid.

        Raises
        ------
        ValueError
            If any parameter constraint is violated.
        """
        P = self.params["P"]
        # Check row sums of transition matrix
        if not np.allclose(P.sum(axis=1), 1.0):
            raise ValueError(f"Transition matrix rows do not sum to 1: {P.sum(axis=1)}")

        for s in range(2):
            # Check Sigma positive definiteness via eigenvalues
            eigvals = np.linalg.eigvalsh(self.params["Sigma"][s])
            if np.any(eigvals <= 0):
                raise ValueError(f"Regime {s} covariance matrix is not positive definite.")

            # Check A mask compliance
            # Elements that should be zero (mask=0) must be zero
            masked_elements = self.params["A"][s] * (1 - self.masks[s])
            if not np.allclose(masked_elements, 0.0):
                raise ValueError(f"Regime {s} coefficient matrix violates structural mask.")

        return True

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def instantiate_msvar_model(country: str, system_name: str, data: np.ndarray, study_config: Dict[str, Any]) -> MSVARModel:
    """
    Factory function to create and initialize an MSVARModel instance.

    This orchestrator handles the instantiation of the MSVARModel class, ensuring
    that the correct data and configuration are passed. It serves as the entry point
    for creating model objects in the estimation pipeline.

    Parameters
    ----------
    country : str
        Name of the country.
    system_name : str
        Name of the bivariate system (e.g., 'GDP_NFCD').
    data : np.ndarray
        The bivariate cyclical data array of shape (T, 2).
    study_config : Dict[str, Any]
        The study configuration dictionary.

    Returns
    -------
    MSVARModel
        The initialized and configured MS-VAR model object.
    """
    # Fit model
    model = MSVARModel(country, system_name, data, study_config)

    logging.debug(f"Instantiated MS-VAR model for {country} - {system_name}.")

    return model



In [None]:
# Task 10: Initialize EM Algorithm Parameters

# ==============================================================================
# Task 10: Initialize EM Algorithm Parameters
# ==============================================================================

class InitializationError(Exception):
    """
    Custom exception for failures during EM initialization.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered during the initialization of the Markov-Switching VAR model parameters.
        It flags critical failures in the startup phase of the Expectation-Maximization (EM)
        algorithm—such as convergence failures in K-Means clustering used for regime
        guessing or singularity issues when estimating initial covariance matrices—before
        the iterative estimation begins.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the initialization failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy to facilitate targeted error
        handling during the model instantiation and parameter seeding steps.

    Outputs:
        When raised, it propagates an `InitializationError` instance up the call
        stack, signaling that the iterative estimation process cannot commence
        due to invalid starting conditions.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1: Estimate baseline single-regime VAR(1).
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_baseline_var(data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Estimates a zero-intercept VAR(1) model using OLS.

    Model: y_t = A * y_{t-1} + u_t

    Parameters
    ----------
    data : np.ndarray
        The bivariate time series data of shape (T, 2).

    Returns
    -------
    Tuple[np.ndarray, np.ndarray]
        - A_hat: Estimated coefficient matrix (2, 2).
        - Sigma_hat: Estimated residual covariance matrix (2, 2).

    Raises
    ------
    InitializationError
        If OLS estimation fails (e.g., singular matrix).
    """
    T, K = data.shape

    # Prepare regressors (Y_{t-1}) and targets (Y_t)
    # X = Y_{t-1} (rows 0 to T-2)
    # Y = Y_t     (rows 1 to T-1)
    X = data[:-1]
    Y = data[1:]

    # OLS Estimation: Y = X * A' + U
    # We want A such that y_t = A y_{t-1}.
    # In matrix form Y ~ X A.T
    # Solution: A.T = (X'X)^(-1) X'Y  => A = Y'X (X'X)^(-1)
    try:
        # Use lstsq for numerical stability
        # lstsq solves X * B = Y -> B = A.T
        A_transpose, residuals, rank, s = np.linalg.lstsq(X, Y, rcond=None)
        A_hat = A_transpose.T

    except np.linalg.LinAlgError as e:
        raise InitializationError(f"OLS estimation failed: {e}")

    # Compute Residuals
    # U = Y - X * A'
    U_hat = Y - X @ A_hat.T

    # Compute Covariance
    # Sigma = (U'U) / (T_eff - K*p)
    # Here p=1 (lag), K=2 (vars). T_eff = T-1.
    # Degrees of freedom correction: T-1 - 2*1?
    # Standard VAR estimators often use T-1-Kp or just T.
    # We use T-1-2 for unbiasedness in small samples.
    df = (T - 1) - 2
    if df <= 0:
        raise InitializationError("Insufficient degrees of freedom for covariance estimation.")

    Sigma_hat = (U_hat.T @ U_hat) / df

    return A_hat, Sigma_hat

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2 & 3: Map to regime-specific values and initialize transition matrix.
# -------------------------------------------------------------------------------------------------------------------------------

def construct_initial_parameters(A_hat: np.ndarray, Sigma_hat: np.ndarray) -> Dict[str, np.ndarray]:
    """
    Constructs the initial parameter set for the MS-VAR EM algorithm.

    Mappings:
    - Regime 1 (Minsky): A1 = A_hat (Full interaction)
    - Regime 2 (Independence): A2 = diag(A_hat) (No interaction)
    - Covariances: Sigma1 = Sigma_hat, Sigma2 = Sigma_hat + perturbation
    - Transition Matrix: P = [[0.8, 0.2], [0.2, 0.8]]

    Parameters
    ----------
    A_hat : np.ndarray
        Baseline VAR coefficient matrix.
    Sigma_hat : np.ndarray
        Baseline VAR covariance matrix.

    Returns
    -------
    Dict[str, np.ndarray]
        Dictionary containing 'A', 'Sigma', 'P' with correct shapes for MS-VAR.
    """
    # 1. Coefficient Matrices
    # Shape (2, 2, 2) -> [regime, row, col]
    A_init = np.zeros((2, 2, 2))

    # Regime 1: Full A
    A_init[0] = A_hat

    # Regime 2: Diagonal A
    A_init[1] = np.diag(np.diag(A_hat))

    # 2. Covariance Matrices
    # Shape (2, 2, 2)
    Sigma_init = np.zeros((2, 2, 2))

    # Regime 1
    Sigma_init[0] = Sigma_hat

    # Regime 2: Perturb slightly to break symmetry and aid identification
    # We add a small scalar to the diagonal
    perturbation = 1e-4 * np.eye(2)
    Sigma_init[1] = Sigma_hat + perturbation

    # Verify positive definiteness
    for s in range(2):
        eigvals = np.linalg.eigvalsh(Sigma_init[s])
        if np.any(eigvals <= 0):
            # Regularize if not PD
            min_eig = np.min(eigvals)
            fix = (-min_eig + 1e-6) * np.eye(2)
            Sigma_init[s] += fix
            logger.warning(f"Regularized initial Sigma for regime {s}.")

    # 3. Transition Matrix
    # Symmetric persistence
    P_init = np.array([
        [0.8, 0.2],
        [0.2, 0.8]
    ])

    return {
        "A": A_init,
        "Sigma": Sigma_init,
        "P": P_init
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def initialize_em_parameters(bivariate_systems: Dict[str, Dict[str, Any]]) -> Dict[Tuple[str, str], Dict[str, np.ndarray]]:
    """
    Orchestrates the initialization of EM parameters for all models.

    Sequence:
    1. Estimate baseline VAR for each system.
    2. Construct initial MS-VAR parameters.
    3. Store in a dictionary keyed by (country, system).

    Parameters
    ----------
    bivariate_systems : Dict[str, Dict[str, Any]]
        The constructed bivariate systems.

    Returns
    -------
    Dict[Tuple[str, str], Dict[str, np.ndarray]]
        A dictionary mapping (country, system_name) to the initial parameter dict.
    """
    logger.info("Starting EM parameter initialization...")

    initial_params = {}

    # Iterate through countries and systems
    for country, systems in bivariate_systems.items():
        for system_name, sys_data in systems.items():
            data = sys_data["data"]

            try:
                # Step 1: Baseline VAR
                A_hat, Sigma_hat = estimate_baseline_var(data)

                # Step 2 & 3: Construct MS-VAR Init
                params = construct_initial_parameters(A_hat, Sigma_hat)

                initial_params[(country, system_name)] = params

            except InitializationError as e:
                logger.error(f"Initialization failed for {country} - {system_name}: {e}")
                # We might choose to skip this model or halt.
                # For now, we log and skip, but the main loop should handle missing keys.

    logger.info(f"Initialized parameters for {len(initial_params)} models.")

    return initial_params


In [None]:
# Task 11: Implement Hamilton Forward Filter

# ==============================================================================
# Task 11: Implement Hamilton Forward Filter
# ==============================================================================
.
class FilterError(Exception):
    """
    Custom exception for failures during Hamilton filtering.

    Purpose:
        This class serves as a specific exception identifier for numerical or logical
        errors encountered during the forward filtering recursion (Hamilton Filter).
        It distinguishes critical failures in calculating filtered regime probabilities
        ($\\xi_{t|t}$)—such as likelihood underflow, non-positive definite covariance
        matrices during density evaluation, or invalid transition probabilities—from
        general runtime errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific filtering
        failure (e.g., "Likelihood collapse at t=50").

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the calling estimation
        loop to catch filtering errors specifically, potentially triggering
        optimization restarts or alternative numerical strategies.

    Outputs:
        When raised, it propagates a `FilterError` instance up the call stack,
        signaling that the recursive inference of the latent state vector has
        failed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 2: Compute regime-conditional densities (Helper Function).
# -------------------------------------------------------------------------------------------------------------------------------

def compute_conditional_log_densities(data: np.ndarray, params: Dict[str, np.ndarray]) -> np.ndarray:
    """
    Computes the log-density of the observations conditional on each regime.

    For each time t and regime s, computes:
        log f(y_t | s_t=s, y_{t-1}) = log N(y_t; A_s y_{t-1}, Sigma_s)

    Parameters
    ----------
    data : np.ndarray
        The bivariate data array of shape (T, 2).
    params : Dict[str, np.ndarray]
        Dictionary containing 'A' (2, 2, 2) and 'Sigma' (2, 2, 2).

    Returns
    -------
    np.ndarray
        Array of log-densities with shape (T, 2).
        Entry [t, s] is the log-likelihood of observation t given regime s.

    Raises
    ------
    FilterError
        If covariance matrices are singular.
    """
    # Initialize variables
    T, K = data.shape
    A = params["A"]         # Shape (2, 2, 2) -> [regime, row, col]
    Sigma = params["Sigma"] # Shape (2, 2, 2)

    log_densities = np.zeros((T, 2))

    # Precompute inverse covariances and log determinants
    inv_Sigma = np.zeros_like(Sigma)
    log_det_Sigma = np.zeros(2)
    const_term = -0.5 * K * np.log(2 * np.pi)

    for s in range(2):
        try:
            # Cholesky decomposition for stability: Sigma = L L^T
            L = np.linalg.cholesky(Sigma[s])
            inv_Sigma[s] = np.linalg.inv(Sigma[s])
            log_det_Sigma[s] = 2 * np.sum(np.log(np.diag(L)))
        except np.linalg.LinAlgError:
            raise FilterError(f"Singular covariance matrix in regime {s}.")

    # Loop over time (vectorized over regimes would be possible but loop is clear)
    # We start from t=1 because we need y_{t-1}.
    # For t=0, we cannot compute density conditional on y_{-1} unless we assume it.
    # Standard practice: Conditional likelihood starts at t=1.
    # We fill t=0 with zeros or handle it in the filter loop.
    for t in range(1, T):
        y_t = data[t]
        y_tm1 = data[t-1]

        for s in range(2):
            # Mean: mu_t = A_s * y_{t-1}
            mu = A[s] @ y_tm1

            # Residual: e_t = y_t - mu_t
            resid = y_t - mu

            # Quadratic form: e_t' * inv_Sigma * e_t
            quad_form = resid @ inv_Sigma[s] @ resid

            # Log density
            log_densities[t, s] = const_term - 0.5 * log_det_Sigma[s] - 0.5 * quad_form

    return log_densities

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1 & 3: Implement Forward Filter Recursion.
# -------------------------------------------------------------------------------------------------------------------------------

def run_hamilton_filter(data: np.ndarray, params: Dict[str, np.ndarray]) -> Dict[str, Any]:
    """
    Executes the Hamilton forward filter to infer regime probabilities.

    Recursion:
    1. Prediction: P(s_t=j | Y_{t-1}) = sum_i p_ij * P(s_{t-1}=i | Y_{t-1})
    2. Update: P(s_t=j | Y_t) propto f(y_t | s_t=j) * P(s_t=j | Y_{t-1})

    Parameters
    ----------
    data : np.ndarray
        The bivariate data array (T, 2).
    params : Dict[str, np.ndarray]
        Dictionary containing 'A', 'Sigma', 'P'.

    Returns
    -------
    Dict[str, Any]
        - 'filtered_probs': (T, 2) array of P(s_t | Y_t).
        - 'predicted_probs': (T, 2) array of P(s_t | Y_{t-1}).
        - 'log_likelihood': Scalar total log-likelihood.
    """
    T = data.shape[0]
    P_trans = params["P"] # P[i, j] = Prob(i -> j)

    # 1. Compute Conditional Log-Densities
    log_eta = compute_conditional_log_densities(data, params)

    # Initialize containers
    filtered_probs = np.zeros((T, 2))
    predicted_probs = np.zeros((T, 2))
    log_likelihoods = np.zeros(T)

    # Initialization at t=0
    # We use the stationary distribution of P as the initial prior
    # Solve pi = pi * P => (I - P^T) * pi = 0, sum(pi) = 1
    eigvals, eigvecs = np.linalg.eig(P_trans.T)
    # Find eigenvector for eigenvalue 1
    idx = np.argmin(np.abs(eigvals - 1.0))
    pi_stat = np.real(eigvecs[:, idx])
    pi_stat /= np.sum(pi_stat)

    # For t=0, we don't have y_{-1}, so we can't compute a likelihood update based on dynamics.
    # We set filtered_probs[0] = stationary distribution (prior).
    # Or we could treat y_0 as fixed.
    # Standard approach: Start recursion at t=1.
    filtered_probs[0] = pi_stat
    predicted_probs[0] = pi_stat # Not strictly used but good for completeness

    total_log_lik = 0.0

    # Forward Loop
    for t in range(1, T):
        # 1. Prediction Step: xi_{t|t-1} = P^T * xi_{t-1|t-1}
        # P[i, j] is prob i -> j.
        # Prob(s_t=j) = sum_i Prob(s_{t-1}=i) * P[i, j]
        # Vectorized: pred = filtered[t-1] @ P
        pred = filtered_probs[t-1] @ P_trans
        predicted_probs[t] = pred

        # 2. Update Step
        # We work with log-densities to avoid underflow
        # log_unnorm_prob = log(pred) + log_eta[t]
        # We use the log-sum-exp trick to compute the normalization constant (likelihood contribution)

        log_pred = np.log(pred + 1e-20) # Safety floor
        log_joint = log_pred + log_eta[t]

        # Compute max for stability
        max_log = np.max(log_joint)

        # Likelihood contribution f(y_t | Y_{t-1}) = sum_j exp(log_joint_j)
        # log_lik_t = max_log + log(sum(exp(log_joint - max_log)))
        term = np.sum(np.exp(log_joint - max_log))
        log_lik_t = max_log + np.log(term)

        total_log_lik += log_lik_t
        log_likelihoods[t] = log_lik_t

        # Filtered probabilities: exp(log_joint - log_lik_t)
        # This is equivalent to exp(log_joint) / sum(exp(log_joint))
        filt = np.exp(log_joint - log_lik_t)

        # Ensure normalization (handle numerical noise)
        filt /= np.sum(filt)
        filtered_probs[t] = filt

    return {
        "filtered_probs": filtered_probs,
        "predicted_probs": predicted_probs,
        "log_likelihood": total_log_lik,
        "log_densities": log_eta # Needed for smoother
    }


In [None]:
# Task 12: Implement Kim Backward Smoother

# ==============================================================================
# Task 12: Implement Kim Backward Smoother
# ==============================================================================

class SmootherError(Exception):
    """
    Custom exception for failures during Kim smoothing.

    Purpose:
        This class serves as a specific exception identifier for numerical or logical
        errors encountered during the backward smoothing recursion (Kim's Smoother).
        It distinguishes critical failures in calculating smoothed regime probabilities
        ($\\xi_{t|T}$)—such as division by zero when the predicted probability
        $\\xi_{t+1|t}$ is close to zero, or numerical instability in the recursive
        update equation—from general runtime or forward filtering errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific smoothing
        failure (e.g., "Smoothing recursion failed at t=100 due to singularity").

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the calling estimation
        routine to differentiate between filtering failures (forward pass) and
        smoothing failures (backward pass).

    Outputs:
        When raised, it propagates a `SmootherError` instance up the call stack,
        signaling that the retrospective inference of the latent state vector has
        failed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1 & 2: Backward recursion for smoothed single-state probabilities.
# -------------------------------------------------------------------------------------------------------------------------------

def compute_smoothed_probabilities(
    filtered_probs: np.ndarray,
    predicted_probs: np.ndarray,
    params: Dict[str, np.ndarray]
) -> np.ndarray:
    """
    Computes smoothed probabilities P(s_t | Y_T) using the Kim backward recursion.

    Recursion:
        xi_{t|T} = xi_{t|t} * [ (P * (xi_{t+1|T} / xi_{t+1|t})) ]
        where division and multiplication are element-wise with broadcasting.

    Parameters
    ----------
    filtered_probs : np.ndarray
        (T, 2) array of filtered probabilities P(s_t | Y_t).
    predicted_probs : np.ndarray
        (T, 2) array of predicted probabilities P(s_t | Y_{t-1}).
    params : Dict[str, np.ndarray]
        Dictionary containing 'P' (transition matrix).

    Returns
    -------
    np.ndarray
        (T, 2) array of smoothed probabilities.

    Raises
    ------
    SmootherError
        If numerical instability occurs (e.g., division by zero).
    """
    # Initialize key variables
    T, K = filtered_probs.shape
    P_trans = params["P"] # P[i, j] = Prob(i -> j)

    smoothed_probs = np.zeros((T, K))

    # Step 1: Initialization at t=T
    smoothed_probs[T-1] = filtered_probs[T-1]

    # Step 2: Backward Recursion
    for t in range(T-2, -1, -1):
        # Ratio: xi_{t+1|T}(j) / xi_{t+1|t}(j)
        # Add epsilon to denominator to prevent division by zero
        ratio = smoothed_probs[t+1] / (predicted_probs[t+1] + 1e-20)

        # Weight: sum_j p_{ij} * ratio_j
        # P is (2, 2), ratio is (2,)
        # We want for each i: sum_j P[i, j] * ratio[j]
        # This is matrix-vector product: P @ ratio
        weight = P_trans @ ratio

        # Update: xi_{t|T}(i) = xi_{t|t}(i) * weight_i
        smoothed_probs[t] = filtered_probs[t] * weight

        # Normalize to ensure sum to 1 (corrects minor numerical drift)
        smoothed_probs[t] /= np.sum(smoothed_probs[t])

    return smoothed_probs

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Compute joint smoothed probabilities for transitions.
# -------------------------------------------------------------------------------------------------------------------------------

def compute_joint_smoothed_probabilities(
    smoothed_probs: np.ndarray,
    filtered_probs: np.ndarray,
    predicted_probs: np.ndarray,
    params: Dict[str, np.ndarray]
) -> np.ndarray:
    """
    Computes joint smoothed probabilities P(s_t=i, s_{t+1}=j | Y_T).

    Formula:
        P(s_t=i, s_{t+1}=j | Y_T) = P(s_{t+1}=j | Y_T) * [ (P(s_t=i | Y_t) * p_{ij}) / P(s_{t+1}=j | Y_t) ]

    Parameters
    ----------
    smoothed_probs : np.ndarray
        (T, 2) array of smoothed probabilities.
    filtered_probs : np.ndarray
        (T, 2) array of filtered probabilities.
    predicted_probs : np.ndarray
        (T, 2) array of predicted probabilities.
    params : Dict[str, np.ndarray]
        Dictionary containing 'P'.

    Returns
    -------
    np.ndarray
        (T-1, 2, 2) array of joint probabilities.
        Index [t, i, j] corresponds to transition from t to t+1.
    """
    # Initialize key variables
    T, K = smoothed_probs.shape
    P_trans = params["P"]

    joint_probs = np.zeros((T-1, K, K))

    for t in range(T-1):
        # Denominator: P(s_{t+1}=j | Y_t)
        denom = predicted_probs[t+1] + 1e-20

        # Term 1: P(s_{t+1}=j | Y_T) / P(s_{t+1}=j | Y_t)
        term1 = smoothed_probs[t+1] / denom

        # Term 2: P(s_t=i | Y_t) * p_{ij}
        # We want a (2, 2) matrix where element (i, j) is filtered[t, i] * P[i, j]
        # filtered[t] is (2,), P is (2, 2)
        # Use broadcasting: (2, 1) * (2, 2)
        term2 = filtered_probs[t][:, np.newaxis] * P_trans

        # Joint: term2 * term1 (broadcast over i)
        # term1 is (2,) corresponding to j
        # term2 is (i, j)
        # Result is (i, j)
        joint_t = term2 * term1[np.newaxis, :]

        # Normalize so sum over i,j is 1
        joint_t /= np.sum(joint_t)

        joint_probs[t] = joint_t

    return joint_probs

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_kim_smoother(
    filter_results: Dict[str, Any],
    params: Dict[str, np.ndarray]
) -> Dict[str, np.ndarray]:
    """
    Orchestrates the Kim backward smoothing algorithm.

    Sequence:
    1. Compute smoothed single-state probabilities.
    2. Compute joint smoothed probabilities.

    Parameters
    ----------
    filter_results : Dict[str, Any]
        Output from `run_hamilton_filter`.
    params : Dict[str, np.ndarray]
        Model parameters.

    Returns
    -------
    Dict[str, np.ndarray]
        - 'smoothed_probs': (T, 2) array.
        - 'joint_smoothed_probs': (T-1, 2, 2) array.
    """
    # Extract filtered and predicted results
    filtered = filter_results["filtered_probs"]
    predicted = filter_results["predicted_probs"]

    # Step 1: Single-state smoothing
    smoothed = compute_smoothed_probabilities(filtered, predicted, params)

    # Step 2: Joint smoothing
    joint = compute_joint_smoothed_probabilities(smoothed, filtered, predicted, params)

    return {
        "smoothed_probs": smoothed,
        "joint_smoothed_probs": joint
    }


In [None]:
# Task 13: Implement EM M-Step Parameter Updates

# ==============================================================================
# Task 13: Implement EM M-Step Parameter Updates
# ==============================================================================

class MStepError(Exception):
    """
    Custom exception for failures during M-step updates.

    Purpose:
        This class serves as a specific exception identifier for numerical or logical
        errors encountered during the parameter update phase (M-step) of the
        Expectation-Maximization algorithm. It distinguishes failures in calculating
        the Maximum Likelihood Estimates (MLE) for the model parameters—such as
        singular matrices during VAR coefficient regression,
        non-positive definite covariance updates, or invalid transition probability
        normalizations—from filtering or smoothing errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        optimization failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the estimation
        algorithm to abort the current iteration or attempt numerical regularization
        strategies specifically when parameter updates fail.

    Outputs:
        When raised, it propagates an `MStepError` instance up the call stack,
        signaling that the optimization of the objective function with respect
        to the model parameters has failed.
    """
    # Execute the pass statement to maintain valid class syntax

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1: Update transition probabilities.
# -------------------------------------------------------------------------------------------------------------------------------

def update_transition_matrix(
    joint_smoothed_probs: np.ndarray,
    smoothed_probs: np.ndarray
) -> np.ndarray:
    """
    Updates the Markov transition matrix P.

    Formula:
        p_{ij} = sum_t P(s_t=i, s_{t+1}=j | Y_T) / sum_t P(s_t=i | Y_T)

    Parameters
    ----------
    joint_smoothed_probs : np.ndarray
        (T-1, 2, 2) array of joint probabilities.
    smoothed_probs : np.ndarray
        (T, 2) array of smoothed probabilities.

    Returns
    -------
    np.ndarray
        (2, 2) updated transition matrix.
    """
    # Numerator: Sum over time t=1 to T-1
    # joint_smoothed_probs has shape (T-1, 2, 2)
    num = np.sum(joint_smoothed_probs, axis=0)

    # Denominator: Sum over time t=1 to T-1 (exclude T)
    # smoothed_probs has shape (T, 2)
    # We need sum_{t=1}^{T-1} P(s_t=i)
    # Note: smoothed_probs indices are 0 to T-1. We sum 0 to T-2.
    denom = np.sum(smoothed_probs[:-1], axis=0)

    # Broadcasting division: (2, 2) / (2,) -> divide each row i by denom[i]
    P_new = num / (denom[:, np.newaxis] + 1e-20)

    # Normalize rows to ensure sum to 1
    P_new /= np.sum(P_new, axis=1, keepdims=True)

    return P_new

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 2: Update VAR coefficient matrices for both regimes.
# -------------------------------------------------------------------------------------------------------------------------------

def update_coefficients(
    data: np.ndarray,
    smoothed_probs: np.ndarray,
    masks: Dict[int, np.ndarray]
) -> np.ndarray:
    """
    Updates the VAR coefficient matrices A_s for each regime.

    Formula (Weighted OLS):
        A_s = (sum_t xi_t(s) y_t y_{t-1}') * (sum_t xi_t(s) y_{t-1} y_{t-1}')^-1

    Constraints:
        Applies structural masks to zero out restricted elements (e.g., off-diagonals in Regime 2).

    Parameters
    ----------
    data : np.ndarray
        (T, 2) data array.
    smoothed_probs : np.ndarray
        (T, 2) smoothed probabilities.
    masks : Dict[int, np.ndarray]
        Structural masks for each regime.

    Returns
    -------
    np.ndarray
        (2, 2, 2) updated coefficient matrices.
    """
    T, K = data.shape
    A_new = np.zeros((2, K, K))

    # Prepare data matrices
    # Y = y_t (t=2..T) -> indices 1..T-1
    # X = y_{t-1} (t=2..T) -> indices 0..T-2
    Y = data[1:]
    X = data[:-1]

    # Smoothed probs aligned with t=2..T (indices 1..T-1)
    weights = smoothed_probs[1:]

    for s in range(2):
        # Weighted Moment Matrices
        # S_xx = sum_t w_t(s) x_t x_t'
        # S_yx = sum_t w_t(s) y_t x_t'

        # Weight vector for regime s
        w = weights[:, s]

        # Efficient weighted sum using broadcasting
        # X.T @ (w[:, None] * X)
        S_xx = X.T @ (w[:, np.newaxis] * X)
        S_yx = Y.T @ (w[:, np.newaxis] * X)

        # Solve S_xx * A_s^T = S_yx^T  => A_s * S_xx = S_yx
        # A_s = S_yx @ inv(S_xx)

        try:
            # Add ridge for stability
            S_xx_reg = S_xx + 1e-6 * np.eye(K)
            A_s = S_yx @ np.linalg.inv(S_xx_reg)
        except np.linalg.LinAlgError:
            raise MStepError(f"Singular matrix in coefficient update for regime {s}.")

        # Apply Mask
        # This is a projection step. For strict MLE with constraints, one should solve
        # the constrained GLS problem. However, zeroing out elements is the standard
        # EM approximation when regressors are identical (VAR).
        # For Regime 2 (Diagonal), this is equivalent to equation-by-equation OLS.
        A_s_constrained = A_s * masks[s]

        A_new[s] = A_s_constrained

    return A_new

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 3: Update covariance matrices for both regimes.
# -------------------------------------------------------------------------------------------------------------------------------

def update_covariances(
    data: np.ndarray,
    smoothed_probs: np.ndarray,
    A_new: np.ndarray
) -> np.ndarray:
    """
    Updates the covariance matrices Sigma_s.

    Formula:
        Sigma_s = (sum_t xi_t(s) u_t u_t') / (sum_t xi_t(s))
        where u_t = y_t - A_s y_{t-1}

    Parameters
    ----------
    data : np.ndarray
        (T, 2) data array.
    smoothed_probs : np.ndarray
        (T, 2) smoothed probabilities.
    A_new : np.ndarray
        (2, 2, 2) updated coefficient matrices.

    Returns
    -------
    np.ndarray
        (2, 2, 2) updated covariance matrices.
    """
    # Initialize key variables
    T, K = data.shape
    Sigma_new = np.zeros((2, K, K))

    Y = data[1:]
    X = data[:-1]
    weights = smoothed_probs[1:]

    for s in range(2):
        # Compute residuals
        # U = Y - X @ A_s.T
        U = Y - X @ A_new[s].T

        # Weighted covariance
        w = weights[:, s]
        sum_w = np.sum(w) + 1e-20

        # Sigma = (U.T @ diag(w) @ U) / sum_w
        Sigma_s = (U.T @ (w[:, np.newaxis] * U)) / sum_w

        # Regularize
        eigvals = np.linalg.eigvalsh(Sigma_s)
        if np.min(eigvals) < 1e-6:
            Sigma_s += 1e-6 * np.eye(K)

        Sigma_new[s] = Sigma_s

    return Sigma_new

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def perform_m_step(
    data: np.ndarray,
    smoother_results: Dict[str, np.ndarray],
    masks: Dict[int, np.ndarray]
) -> Dict[str, np.ndarray]:
    """
    Orchestrates the M-step parameter updates.

    Sequence:
    1. Update P.
    2. Update A (with masks).
    3. Update Sigma.

    Parameters
    ----------
    data : np.ndarray
        Bivariate data.
    smoother_results : Dict[str, np.ndarray]
        Output from Kim smoother.
    masks : Dict[int, np.ndarray]
        Structural masks.

    Returns
    -------
    Dict[str, np.ndarray]
        Updated parameters 'A', 'Sigma', 'P'.
    """
    smoothed = smoother_results["smoothed_probs"]
    joint = smoother_results["joint_smoothed_probs"]

    # 1. Update P
    P_new = update_transition_matrix(joint, smoothed)

    # 2. Update A
    A_new = update_coefficients(data, smoothed, masks)

    # 3. Update Sigma
    Sigma_new = update_covariances(data, smoothed, A_new)

    return {
        "A": A_new,
        "Sigma": Sigma_new,
        "P": P_new
    }


In [None]:
# Task 14: Implement EM Convergence Logic

# ==============================================================================
# Task 14: Implement EM Convergence Logic
# ==============================================================================

class ConvergenceError(Exception):
    """
    Custom exception for EM convergence failures.

    Purpose:
        This class serves as a specific exception identifier for situations where the
        Expectation-Maximization (EM) algorithm fails to satisfy its convergence criteria
        (e.g., log-likelihood improvement drops below a tolerance threshold, or the
        maximum number of iterations is reached without stability). It allows the
        estimation pipeline to distinguish between mathematical errors (like singularities)
        and algorithmic non-convergence.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the convergence statistics
        (e.g., "EM did not converge after 1000 iterations").

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This enables the calling code
        to implement fallback strategies, such as restarting with different initial
        parameters or relaxing convergence tolerances.

    Outputs:
        When raised, it propagates a `ConvergenceError` instance up the call stack,
        halting the estimation process or triggering a specific retry mechanism.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1 & 2: Check convergence criteria.
# -------------------------------------------------------------------------------------------------------------------------------

def check_convergence(
    current_loglik: float,
    previous_loglik: float,
    iteration: int,
    config: Dict[str, Any]
) -> Tuple[bool, str]:
    """
    Evaluates EM convergence criteria based on log-likelihood changes.

    Criteria:
    1. Absolute change < tolerance
    2. Relative change < tolerance
    3. Max iterations reached

    Parameters
    ----------
    current_loglik : float
        Log-likelihood at current iteration k.
    previous_loglik : float
        Log-likelihood at previous iteration k-1.
    iteration : int
        Current iteration number.
    config : Dict[str, Any]
        Configuration dictionary containing 'convergence_criteria'.

    Returns
    -------
    Tuple[bool, str]
        - converged: True if converged, False otherwise.
        - reason: Description of the convergence status.
    """
    criteria = config["model_physics"]["estimation_engine"]["convergence_criteria"]
    tol = criteria["tolerance"]
    max_iters = criteria["max_iterations"]

    # Check monotonicity (EM property)
    # Allow small numerical noise, but warn on large drops
    diff = current_loglik - previous_loglik
    if diff < -1e-4:
        logger.warning(f"Log-likelihood decreased at iteration {iteration}: {diff:.6f}")

    abs_diff = abs(diff)
    rel_diff = abs_diff / (abs(previous_loglik) + 1e-10)

    if abs_diff < tol:
        return True, "Absolute tolerance reached"

    if rel_diff < tol:
        return True, "Relative tolerance reached"

    if iteration >= max_iters:
        return True, "Maximum iterations reached"

    return False, "Continuing"

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 3: Compute Standard Errors (Numerical Hessian).
# -------------------------------------------------------------------------------------------------------------------------------

def flatten_parameters(params: Dict[str, np.ndarray]) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Flattens the structured parameter dictionary into a single vector.

    This function converts the dictionary of model parameters (A, Sigma, P) into
    a 1D numpy array, which is required for numerical optimization and Hessian
    computation. It also returns metadata to reconstruct the original structure.

    Parameters
    ----------
    params : Dict[str, np.ndarray]
        Dictionary containing 'A', 'Sigma', 'P'.

    Returns
    -------
    Tuple[np.ndarray, Dict[str, Any]]
        - theta: Flattened parameter vector.
        - shapes: Metadata dictionary containing shapes and lengths to reconstruct the dictionary.
    """
    # Extract and flatten objects
    A_flat = params["A"].flatten()
    Sigma_flat = params["Sigma"].flatten()
    P_flat = params["P"].flatten()

    # Compute theta
    theta = np.concatenate([A_flat, Sigma_flat, P_flat])

    shapes = {
        "A_shape": params["A"].shape,
        "Sigma_shape": params["Sigma"].shape,
        "P_shape": params["P"].shape,
        "A_len": len(A_flat),
        "Sigma_len": len(Sigma_flat),
        "P_len": len(P_flat)
    }
    return theta, shapes

def unflatten_parameters(theta: np.ndarray, shapes: Dict[str, Any]) -> Dict[str, np.ndarray]:
    """
    Reconstructs the parameter dictionary from a flattened vector.

    This function reverses the flattening process, using the metadata provided by
    `flatten_parameters` to reshape the 1D vector back into the structured
    dictionary of model parameters.

    Parameters
    ----------
    theta : np.ndarray
        Flattened parameter vector.
    shapes : Dict[str, Any]
        Metadata dictionary from `flatten_parameters`.

    Returns
    -------
    Dict[str, np.ndarray]
        Reconstructed parameter dictionary containing 'A', 'Sigma', 'P'.
    """
    # Extract objects
    A_end = shapes["A_len"]
    Sigma_end = A_end + shapes["Sigma_len"]

    A_flat = theta[:A_end]
    Sigma_flat = theta[A_end:Sigma_end]
    P_flat = theta[Sigma_end:]

    return {
        "A": A_flat.reshape(shapes["A_shape"]),
        "Sigma": Sigma_flat.reshape(shapes["Sigma_shape"]),
        "P": P_flat.reshape(shapes["P_shape"])
    }

def compute_numerical_hessian(
    theta: np.ndarray,
    objective_func: Callable[[np.ndarray], float],
    epsilon: float = 1e-5
) -> np.ndarray:
    """
    Computes the Hessian matrix of a scalar function using central finite differences.

    The Hessian matrix H contains the second-order partial derivatives of the function:
    H_ij = d^2 f / (dx_i dx_j). This implementation uses a central difference
    approximation for both diagonal and off-diagonal elements to ensure accuracy.

    Parameters
    ----------
    theta : np.ndarray
        Parameter vector at the optimum.
    objective_func : Callable
        Function mapping theta -> log-likelihood.
    epsilon : float
        Step size for finite differences.

    Returns
    -------
    np.ndarray
        Hessian matrix of shape (N, N).
    """
    n = len(theta)
    hessian = np.zeros((n, n))

    # Evaluate function at optimum
    f_0 = objective_func(theta)

    for i in range(n):
        for j in range(i, n):
            if i == j:
                # Diagonal element: d^2 f / dx_i^2
                theta_plus = theta.copy()
                theta_plus[i] += epsilon
                f_plus = objective_func(theta_plus)

                theta_minus = theta.copy()
                theta_minus[i] -= epsilon
                f_minus = objective_func(theta_minus)

                hessian[i, i] = (f_plus - 2*f_0 + f_minus) / (epsilon**2)
            else:
                # Off-diagonal element: d^2 f / (dx_i dx_j)
                # f(x + ei + ej)
                theta_pp = theta.copy()
                theta_pp[i] += epsilon
                theta_pp[j] += epsilon
                f_pp = objective_func(theta_pp)

                # f(x + ei - ej)
                theta_pm = theta.copy()
                theta_pm[i] += epsilon
                theta_pm[j] -= epsilon
                f_pm = objective_func(theta_pm)

                # f(x - ei + ej)
                theta_mp = theta.copy()
                theta_mp[i] -= epsilon
                theta_mp[j] += epsilon
                f_mp = objective_func(theta_mp)

                # f(x - ei - ej)
                theta_mm = theta.copy()
                theta_mm[i] -= epsilon
                theta_mm[j] -= epsilon
                f_mm = objective_func(theta_mm)

                val = (f_pp - f_pm - f_mp + f_mm) / (4 * epsilon**2)
                hessian[i, j] = val
                hessian[j, i] = val

    return hessian

def compute_standard_errors(
    model_data: np.ndarray,
    final_params: Dict[str, np.ndarray],
    likelihood_evaluator: Callable[[np.ndarray, Dict[str, np.ndarray]], float]
) -> Dict[str, np.ndarray]:
    """
    Computes standard errors for MS-VAR parameters using numerical Hessian approximation.

    This function calculates the asymptotic standard errors of the maximum likelihood
    estimates. It approximates the Observed Information Matrix by computing the
    negative Hessian of the log-likelihood function at the optimum. The standard
    errors are the square roots of the diagonal elements of the inverse information matrix.

    Parameters
    ----------
    model_data : np.ndarray
        The bivariate data array of shape (T, 2).
    final_params : Dict[str, np.ndarray]
        The optimized parameter dictionary.
    likelihood_evaluator : Callable
        A function that takes (data, params) and returns a dictionary containing
        the 'log_likelihood'. This is typically the `run_hamilton_filter` function.

    Returns
    -------
    Dict[str, np.ndarray]
        Dictionary of standard errors with the same structure as `final_params`.
        Contains NaNs if the Hessian inversion fails.
    """
    # 1. Flatten parameters
    theta_opt, shapes = flatten_parameters(final_params)

    # 2. Define objective wrapper
    def objective(theta: np.ndarray) -> float:
        try:
            # Reconstruct params
            params = unflatten_parameters(theta, shapes)

            # Run filter to get log-likelihood
            result = likelihood_evaluator(model_data, params)
            return result["log_likelihood"]
        except Exception:
            # Return a large negative value if parameters are invalid (e.g., non-PD Sigma)
            return -1e20

    # 3. Compute Hessian of the log-likelihood
    H = compute_numerical_hessian(theta_opt, objective)

    # 4. Invert Negative Hessian to get Covariance Matrix
    try:
        # Observed Information Matrix J = -H
        # Covariance = J^-1 = (-H)^-1
        # Add small ridge for numerical stability if H is near-singular
        H_reg = H - 1e-6 * np.eye(len(theta_opt))
        cov_matrix = np.linalg.inv(-H_reg)

        # 5. Extract SEs (square root of diagonal elements)
        variances = np.diag(cov_matrix)
        # Handle potential negative variances due to numerical noise
        variances = np.maximum(variances, 0)
        std_errors_flat = np.sqrt(variances)

    except np.linalg.LinAlgError:
        logging.warning("Hessian inversion failed. Returning NaN standard errors.")
        std_errors_flat = np.full_like(theta_opt, np.nan)

    # 6. Unflatten standard errors back to dictionary structure
    return unflatten_parameters(std_errors_flat, shapes)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def finalize_estimation(
    current_params: Dict[str, np.ndarray],
    current_loglik: float,
    previous_loglik: float,
    iteration: int,
    config: Dict[str, Any],
    data: np.ndarray,
    likelihood_evaluator: Callable
) -> Tuple[bool, Dict[str, Any]]:
    """
    Orchestrates convergence checking and finalization.

    Parameters
    ----------
    current_params : Dict[str, np.ndarray]
        Parameters at iteration k.
    current_loglik : float
        Log-likelihood at iteration k.
    previous_loglik : float
        Log-likelihood at iteration k-1.
    iteration : int
        Current iteration.
    config : Dict[str, Any]
        Configuration.
    data : np.ndarray
        Data for SE computation.
    likelihood_evaluator : Callable
        Function to compute log-likelihood.

    Returns
    -------
    Tuple[bool, Dict[str, Any]]
        - converged: Boolean.
        - results: Dictionary containing final params, SEs, loglik, etc. (if converged).
    """
    converged, reason = check_convergence(current_loglik, previous_loglik, iteration, config)

    results = {}

    if converged:
        logger.info(f"EM converged at iteration {iteration}: {reason}")

        # Compute SEs using the rigorous numerical Hessian approach
        std_errors = compute_standard_errors(data, current_params, likelihood_evaluator)

        results = {
            "params": current_params,
            "std_errors": std_errors,
            "final_loglik": current_loglik,
            "iterations": iteration,
            "convergence_reason": reason
        }

    return converged, results


In [None]:
# Task 15: Estimate MS-VAR for All Country-System Combinations

# ==============================================================================
# Task 15: Estimate MS-VAR for All Country-System Combinations
# ==============================================================================

class EstimationError(Exception):
    """
    Custom exception for failures during MS-VAR estimation.

    Purpose:
        This class serves as a high-level exception identifier for errors encountered
        during the execution of the `fit` method or the broader estimation pipeline
        of the MS-VAR model. It acts as a wrapper or a distinct category for
        failures that prevent the model from producing valid parameter estimates,
        often aggregating more specific underlying errors (like InitializationError
        or ConvergenceError) or handling structural data mismatches during estimation.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        cause of the estimation failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows users of the
        library to catch all model estimation failures with a single `except`
        block without catching unrelated system errors.

    Outputs:
        When raised, it propagates an `EstimationError` instance up the call
        stack, signaling that the model estimation has failed and no valid
        model attributes have been set.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 2: Execute EM estimation for a single model.
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_single_model(
    data: np.ndarray,
    initial_params: Dict[str, np.ndarray],
    masks: Dict[int, np.ndarray],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Executes the EM algorithm for a single MS-VAR model.

    Parameters
    ----------
    data : np.ndarray
        (T, 2) data array.
    initial_params : Dict[str, np.ndarray]
        Initial parameters 'A', 'Sigma', 'P'.
    masks : Dict[int, np.ndarray]
        Structural masks for A.
    config : Dict[str, Any]
        Configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        Estimation results including final parameters, log-likelihood, and regime probabilities.

    Raises
    ------
    EstimationError
        If estimation fails (divergence, numerical error).
    """
    current_params = initial_params
    current_loglik = -np.inf

    # Retrieve max iterations
    max_iters = config["model_physics"]["estimation_engine"]["convergence_criteria"]["max_iterations"]

    # Storage for history
    loglik_history = []

    for k in range(max_iters):
        try:
            # 1. E-Step: Hamilton Filter
            # run_hamilton_filter is defined in Task 11 context
            filter_res = run_hamilton_filter(data, current_params)
            new_loglik = filter_res["log_likelihood"]
            loglik_history.append(new_loglik)

            # 2. Check Convergence
            # finalize_estimation is defined in Task 14 context
            # We pass run_hamilton_filter as the likelihood evaluator for SE computation
            converged, final_res = finalize_estimation(
                current_params, new_loglik, current_loglik, k, config, data, run_hamilton_filter
            )

            if converged:
                # Compute smoothed probabilities for final output
                # run_kim_smoother is defined in Task 12 context
                smoother_res = run_kim_smoother(filter_res, current_params)

                final_res["smoothed_probs"] = smoother_res["smoothed_probs"]
                final_res["filtered_probs"] = filter_res["filtered_probs"]
                final_res["loglik_history"] = loglik_history
                return final_res

            # 3. E-Step Continued: Kim Smoother
            smoother_res = run_kim_smoother(filter_res, current_params)

            # 4. M-Step: Parameter Updates
            # perform_m_step is defined in Task 13 context
            current_params = perform_m_step(data, smoother_res, masks)

            # Update loglik for next iteration check
            current_loglik = new_loglik

        except Exception as e:
            raise EstimationError(f"EM failed at iteration {k}: {e}")

    # If loop finishes without convergence
    raise EstimationError(f"EM did not converge within {max_iters} iterations.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 1 & 3: Construct grid, execute estimation, and organize results.
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_all_msvar_models(
    bivariate_systems: Dict[str, Dict[str, Any]],
    initial_params_dict: Dict[Tuple[str, str], Dict[str, np.ndarray]],
    study_config: Dict[str, Any]
) -> Dict[Tuple[str, str], Dict[str, Any]]:
    """
    Orchestrates the estimation of all MS-VAR models defined in the study.

    Iterates through every country and system, retrieves initial parameters,
    and runs the EM algorithm. Collects and validates results.

    Parameters
    ----------
    bivariate_systems : Dict[str, Dict[str, Any]]
        The constructed bivariate systems.
    initial_params_dict : Dict[Tuple[str, str], Dict[str, np.ndarray]]
        Dictionary of initial parameters keyed by (country, system).
    study_config : Dict[str, Any]
        The study configuration.

    Returns
    -------
    Dict[Tuple[str, str], Dict[str, Any]]
        A dictionary mapping (country, system) to estimation results.
    """
    logger.info("Starting batch MS-VAR estimation...")

    msvar_estimates = {}

    # Load masks once
    try:
        mask1 = np.array(study_config["model_physics"]["regime_constraints"]["regime_1_mask"], dtype=int)
        mask2 = np.array(study_config["model_physics"]["regime_constraints"]["regime_2_mask"], dtype=int)
        masks = {0: mask1, 1: mask2}
    except KeyError:
        logger.error("Regime masks missing in config. Aborting estimation.")
        return {}

    # Iterate over the grid
    success_count = 0
    total_count = 0

    for country, systems in bivariate_systems.items():
        for system_name, sys_data in systems.items():
            total_count += 1
            model_key = (country, system_name)

            # Check if initialization exists
            if model_key not in initial_params_dict:
                logger.warning(f"Skipping {model_key}: No initial parameters found.")
                continue

            data = sys_data["data"]
            init_params = initial_params_dict[model_key]

            logger.info(f"Estimating model: {country} - {system_name} (T={len(data)})")

            try:
                # Execute Estimation
                result = estimate_single_model(data, init_params, masks, study_config)

                # Validate Result Structure
                if "params" not in result or "smoothed_probs" not in result:
                    raise EstimationError("Estimation returned incomplete results.")

                # Store Result
                msvar_estimates[model_key] = result
                success_count += 1
                logger.info(f"Successfully estimated {country} - {system_name}. LogLik: {result['final_loglik']:.2f}")

            except EstimationError as e:
                logger.error(f"Estimation failed for {country} - {system_name}: {e}")
            except Exception as e:
                logger.error(f"Unexpected error for {country} - {system_name}: {e}")

    logger.info(f"Batch estimation completed. Success: {success_count}/{total_count}.")
    return msvar_estimates


In [None]:
# Task 16: Compute Ljung–Box Residual Diagnostics

# ==============================================================================
# Task 16: Compute Ljung–Box Residual Diagnostics
# ==============================================================================

class DiagnosticError(Exception):
    """
    Custom exception for failures during residual diagnostics.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered during the validation of model adequacy. It flags failures
        in calculating diagnostic statistics—such as the Ljung-Box test for
        autocorrelation, the Jarque-Bera test for normality, or ARCH-LM tests
        for conditional heteroskedasticity—applied to the estimated residuals
        ($\\hat{u}_t$). This distinguishes computational failures in the diagnostic
        module from core estimation errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        test failure (e.g., "NaN values encountered in residuals during Ljung-Box test").

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the pipeline to
        isolate issues related to model validation steps, ensuring that a fitted
        model is not discarded solely due to a secondary diagnostic calculation error.

    Outputs:
        When raised, it propagates a `DiagnosticError` instance up the call stack,
        signaling that the quality control checks on the model residuals could
        not be completed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 1: Extract regime-weighted residuals.
# -------------------------------------------------------------------------------------------------------------------------------

def extract_model_residuals(
    data: np.ndarray,
    params: Dict[str, np.ndarray],
    smoothed_probs: np.ndarray
) -> np.ndarray:
    """
    Computes the residuals of the MS-VAR model based on the most probable regime.

    For each time t, the residual is defined as:
        u_t = y_t - A_{s_t} * y_{t-1}
    where s_t = argmax_i P(s_t=i | Y_T).

    Parameters
    ----------
    data : np.ndarray
        (T, 2) data array.
    params : Dict[str, np.ndarray]
        Estimated parameters 'A'.
    smoothed_probs : np.ndarray
        (T, 2) smoothed probabilities.

    Returns
    -------
    np.ndarray
        (T-1, 2) array of residuals (starting from t=1).
    """
    T, K = data.shape
    A = params["A"]

    # Determine most probable regime for each time t
    # shape (T,)
    most_probable_regime = np.argmax(smoothed_probs, axis=1)

    # Prepare data
    # Y = y_t (t=1..T-1)
    # X = y_{t-1} (t=0..T-2)
    Y = data[1:]
    X = data[:-1]

    # Regimes corresponding to Y (t=1..T-1)
    regimes = most_probable_regime[1:]

    residuals = np.zeros_like(Y)

    # Compute residuals
    # We can vectorize this by using fancy indexing if A was (T, K, K)
    # Construct A_t sequence
    A_sequence = A[regimes] # shape (T-1, K, K)

    # Compute A_t @ x_t
    # (T-1, K, K) @ (T-1, K, 1) -> (T-1, K, 1)
    # We need to reshape X for matmul
    X_reshaped = X[:, :, np.newaxis]
    fitted = A_sequence @ X_reshaped
    fitted = fitted.squeeze(-1)

    residuals = Y - fitted

    return residuals

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2 & 3: Compute Ljung–Box Q statistic and evaluate.
# -------------------------------------------------------------------------------------------------------------------------------

def compute_ljung_box_stats(
    residuals: np.ndarray,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Computes Ljung-Box Q-statistics for the residuals of both equations.

    Parameters
    ----------
    residuals : np.ndarray
        (T_res, 2) array of residuals.
    config : Dict[str, Any]
        Configuration containing 'ljung_box_test' thresholds.

    Returns
    -------
    Dict[str, Any]
        Dictionary containing Q-stats, p-values, and decisions for 'GDP' and 'Financial'.
    """
    # Retrieve settings
    lb_config = config["model_physics"]["statistical_thresholds"]["ljung_box_test"]
    lags = lb_config["lags"]
    crit_val = lb_config["critical_value"]

    results = {}
    equation_names = ["GDP", "Financial"]

    for i, name in enumerate(equation_names):
        series = residuals[:, i]

        # Use statsmodels for robust calculation
        # We want the stat at exactly 'lags'
        try:
            lb_df = acorr_ljungbox(series, lags=[lags], return_df=True)
            q_stat = lb_df.iloc[0]["lb_stat"]
            p_val = lb_df.iloc[0]["lb_pvalue"]

            # Compute autocorrelations manually for reporting
            # acf(k) = cov(x_t, x_{t-k}) / var(x_t)
            mean = np.mean(series)
            var = np.var(series)
            n = len(series)
            autocorr = []
            for k in range(1, lags + 1):
                cov = np.sum((series[k:] - mean) * (series[:-k] - mean)) / n
                autocorr.append(cov / var)

            decision = "No Autocorrelation" if q_stat < crit_val else "Autocorrelation Detected"

            results[name] = {
                "q_stat": float(q_stat),
                "p_value": float(p_val),
                "critical_value": crit_val,
                "decision": decision,
                "autocorrelations": autocorr,
                "is_clean": bool(q_stat < crit_val)
            }

        except Exception as e:
            raise DiagnosticError(f"Ljung-Box test failed for {name} equation: {e}")

    return results

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_ljung_box_diagnostics(
    msvar_estimates: Dict[Tuple[str, str], Dict[str, Any]],
    bivariate_systems: Dict[str, Dict[str, Any]],
    study_config: Dict[str, Any]
) -> Dict[Tuple[str, str], Dict[str, Any]]:
    """
    Orchestrates residual diagnostics for all estimated models.

    Sequence:
    1. Extract residuals based on most probable regime.
    2. Compute Ljung-Box statistics.
    3. Evaluate against critical values.

    Parameters
    ----------
    msvar_estimates : Dict
        Output from Task 15.
    bivariate_systems : Dict
        Data dictionary.
    study_config : Dict
        Configuration.

    Returns
    -------
    Dict
        Diagnostic results keyed by (country, system).
    """
    logger.info("Starting Ljung-Box residual diagnostics...")

    diagnostic_results = {}

    for key, estimate in msvar_estimates.items():
        country, system_name = key

        # Retrieve data
        # We need the original data to compute residuals
        # msvar_estimates doesn't store data, so we look it up
        try:
            data = bivariate_systems[country][system_name]["data"]
        except KeyError:
            logger.error(f"Data not found for {key} during diagnostics.")
            continue

        params = estimate["params"]
        smoothed_probs = estimate["smoothed_probs"]

        try:
            # Step 1: Extract Residuals
            residuals = extract_model_residuals(data, params, smoothed_probs)

            # Step 2 & 3: Compute Stats
            lb_stats = compute_ljung_box_stats(residuals, study_config)

            diagnostic_results[key] = lb_stats

            # Log summary
            gdp_clean = lb_stats["GDP"]["is_clean"]
            fin_clean = lb_stats["Financial"]["is_clean"]
            status = "Pass" if (gdp_clean and fin_clean) else "Fail"
            logger.info(f"Diagnostics for {country} - {system_name}: {status}")

        except DiagnosticError as e:
            logger.error(f"Diagnostics failed for {key}: {e}")

    logger.info("Residual diagnostics completed.")
    return diagnostic_results


In [None]:
# Task 17: Compute Eigenvalues and Minsky Oscillation Conditions

# ==============================================================================
# Task 17: Compute Eigenvalues and Minsky Oscillation Conditions
# ==============================================================================

class MinskyAnalysisError(Exception):
    """
    Custom exception for failures during Minsky condition analysis.

    Purpose:
        This class serves as a specific exception identifier for logical or numerical
        failures encountered when testing the Minsky instability hypothesis.
        It flags issues such as the inability to calculate correlation regimes
        between returns and volatility, or insufficient data points within a
        specific regime to validate the transition from hedge/speculative finance
        to Ponzi finance strategies (as defined in the context of the MS-VAR
        analysis of financial stability).

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        analysis failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the analytical
        modules to distinguish between core estimation errors and failures in
        post-estimation economic interpretation steps.

    Outputs:
        When raised, it propagates a `MinskyAnalysisError` instance up the call
        stack, signaling that the specific economic hypothesis testing could
        not be completed.
    """
    # Execute the pass statement to maintain valid class syntax.
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 1 & 2: Compute eigenvalues and evaluate oscillation conditions.
# -------------------------------------------------------------------------------------------------------------------------------

def evaluate_eigenvalues_and_discriminant(A1: np.ndarray) -> Dict[str, Any]:
    """
    Computes eigenvalues and evaluates the oscillation discriminant for Regime 1.

    Formulas:
        Tr(A1) = alpha1 + beta2
        Det(A1) = alpha1*beta2 - alpha2*beta1
        Discriminant Delta' = (alpha1 - beta2)^2 + 4*alpha2*beta1
        Eigenvalues = (Tr +/- sqrt(Tr^2 - 4*Det)) / 2

    Parameters
    ----------
    A1 : np.ndarray
        (2, 2) coefficient matrix for Regime 1.

    Returns
    -------
    Dict[str, Any]
        Dictionary containing eigenvalues, discriminant, and boolean flags.
    """
    alpha1 = A1[0, 0]
    alpha2 = A1[0, 1]
    beta1 = A1[1, 0]
    beta2 = A1[1, 1]

    # 1. Eigenvalues
    # Use numpy for robust calculation handling complex numbers
    eigenvalues = np.linalg.eigvals(A1)

    # 2. Discriminant Delta'
    # Condition for complex eigenvalues in the specific 2x2 form
    # Note: The characteristic equation discriminant is Tr^2 - 4Det.
    # The paper's Delta' = (alpha1 - beta2)^2 + 4*alpha2*beta1 is algebraically equivalent.
    delta_prime = (alpha1 - beta2)**2 + 4 * alpha2 * beta1

    # 3. Necessary Sign Condition
    # alpha2 * beta1 < 0
    cross_product = alpha2 * beta1

    return {
        "eigenvalues": eigenvalues,
        "delta_prime": delta_prime,
        "cross_product": cross_product,
        "is_oscillatory": delta_prime < 0,
        "has_necessary_sign": cross_product < 0
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 3: Classify Minsky regime status.
# -------------------------------------------------------------------------------------------------------------------------------

def classify_minsky_regime(
    params: Dict[str, np.ndarray],
    std_errors: Dict[str, np.ndarray]
) -> Dict[str, Any]:
    """
    Classifies whether Regime 1 qualifies as a 'Minsky Regime'.

    Criteria:
    1. Oscillatory dynamics (Delta' < 0).
    2. Minsky sign pattern: beta1 > 0 (financial pro-cyclical), alpha2 < 0 (financial drag).
    3. Statistical significance of beta1 and alpha2 (one-sided t-test, 5% level).

    Parameters
    ----------
    params : Dict[str, np.ndarray]
        Estimated parameters.
    std_errors : Dict[str, np.ndarray]
        Standard errors.

    Returns
    -------
    Dict[str, Any]
        Classification results and details.
    """
    A1 = params["A"][0] # Regime 1
    SE_A1 = std_errors["A"][0]

    # 1. Dynamics
    dynamics = evaluate_eigenvalues_and_discriminant(A1)

    # 2. Sign Pattern
    beta1 = A1[1, 0]
    alpha2 = A1[0, 1]

    sign_pattern_valid = (beta1 > 0) and (alpha2 < 0)

    # 3. Significance
    # Critical value for one-sided 5% test (normal approx)
    t_crit = 1.645

    se_beta1 = SE_A1[1, 0]
    se_alpha2 = SE_A1[0, 1]

    # Handle missing SEs (NaN)
    if np.isnan(se_beta1) or np.isnan(se_alpha2):
        beta1_sig = False
        alpha2_sig = False
        significance_valid = False
        logger.warning("Standard errors missing; assuming coefficients not significant.")
    else:
        t_beta1 = beta1 / se_beta1
        t_alpha2 = alpha2 / se_alpha2

        # H1: beta1 > 0 => t > 1.645
        beta1_sig = t_beta1 > t_crit
        # H1: alpha2 < 0 => t < -1.645
        alpha2_sig = t_alpha2 < -t_crit

        significance_valid = beta1_sig and alpha2_sig

    # Final Classification
    is_minsky = (
        dynamics["is_oscillatory"] and
        sign_pattern_valid and
        significance_valid
    )

    return {
        "is_minsky": is_minsky,
        "dynamics": dynamics,
        "signs": {
            "beta1": beta1,
            "alpha2": alpha2,
            "pattern_valid": sign_pattern_valid
        },
        "significance": {
            "beta1_sig": beta1_sig,
            "alpha2_sig": alpha2_sig,
            "valid": significance_valid
        }
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def analyze_minsky_conditions(
    msvar_estimates: Dict[Tuple[str, str], Dict[str, Any]]
) -> Dict[Tuple[str, str], Dict[str, Any]]:
    """
    Orchestrates the Minsky condition analysis for all estimated models.

    Parameters
    ----------
    msvar_estimates : Dict
        Output from Task 15.

    Returns
    -------
    Dict
        Analysis results keyed by (country, system).
    """
    logger.info("Starting Minsky condition analysis...")

    analysis_results = {}

    for key, estimate in msvar_estimates.items():
        country, system_name = key

        try:
            params = estimate["params"]
            std_errors = estimate["std_errors"]

            # Execute Classification
            result = classify_minsky_regime(params, std_errors)

            analysis_results[key] = result

            # Log outcome
            status = "YES" if result["is_minsky"] else "NO"
            logger.info(f"Minsky Regime identified for {country} - {system_name}: {status}")

        except Exception as e:
            logger.error(f"Analysis failed for {key}: {e}")

    logger.info("Minsky analysis completed.")
    return analysis_results


In [None]:
# Task 18: Extract and Visualize Regime Probabilities

# ==============================================================================
# Task 18: Extract and Visualize Regime Probabilities
# ==============================================================================

class VisualizationError(Exception):
    """
    Custom exception for failures during visualization.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered during the generation of graphical outputs (e.g., impulse
        response functions, regime probability plots, or dynamic spillover networks).
        It distinguishes failures in the presentation layer—such as dimensionality
        mismatches in `matplotlib` calls, invalid file paths for saving figures,
        or data formatting issues specific to plotting libraries—from core
        analytical or estimation errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        plotting failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the execution
        pipeline to potentially skip a failed plot and continue with subsequent
        visualizations or reporting, rather than crashing the entire analysis
        due to a display-related issue.

    Outputs:
        When raised, it propagates a `VisualizationError` instance up the call
        stack, signaling that a specific visual artifact could not be rendered
        or saved.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 1 & 2: Retrieve probabilities and compute metrics.
# -------------------------------------------------------------------------------------------------------------------------------

def compute_regime_metrics(
    smoothed_probs: np.ndarray,
    transition_matrix: np.ndarray,
    index: pd.DatetimeIndex
) -> Tuple[pd.DataFrame, Dict[str, float]]:
    """
    Constructs a time series of regime probabilities and computes dominance metrics.

    Metrics:
    - Fraction of time in Regime 1 (Minsky).
    - Expected duration of each regime: D_i = 1 / (1 - p_ii).

    Parameters
    ----------
    smoothed_probs : np.ndarray
        (T, 2) array of smoothed probabilities.
    transition_matrix : np.ndarray
        (2, 2) transition matrix P.
    index : pd.DatetimeIndex
        Time index for the data.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, float]]
        - DataFrame with columns 'prob_regime1', 'prob_regime2', 'dominant_regime'.
        - Dictionary of summary metrics.
    """
    # 1. Construct DataFrame
    df_probs = pd.DataFrame(
        smoothed_probs,
        index=index,
        columns=["prob_regime1", "prob_regime2"]
    )

    # Determine dominant regime (1-based index for reporting)
    # argmax returns 0 or 1; we add 1 to get Regime 1 or 2
    df_probs["dominant_regime"] = np.argmax(smoothed_probs, axis=1) + 1

    # 2. Compute Metrics
    T = len(df_probs)
    fraction_regime1 = (df_probs["dominant_regime"] == 1).sum() / T

    # Expected durations
    p11 = transition_matrix[0, 0]
    p22 = transition_matrix[1, 1]

    # Handle absorbing states (p_ii = 1) to avoid division by zero
    dur1 = 1.0 / (1.0 - p11) if p11 < 1.0 else np.inf
    dur2 = 1.0 / (1.0 - p22) if p22 < 1.0 else np.inf

    metrics = {
        "fraction_regime1": fraction_regime1,
        "expected_duration_regime1": dur1,
        "expected_duration_regime2": dur2,
        "persistence_p11": p11,
        "persistence_p22": p22
    }

    return df_probs, metrics

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 3: Define visualization specifications.
# -------------------------------------------------------------------------------------------------------------------------------

def plot_regime_probabilities(
    df_probs: pd.DataFrame,
    country: str,
    system_name: str,
    save_path: str = None
) -> None:
    """
    Generates and saves a plot of regime probabilities.

    Visuals:
    - Solid line: Regime 1 (Minsky).
    - Dashed line: Regime 2 (No Minsky).
    - Vertical lines: 1973 (Oil), 2001 (Dot-com), 2008 (GFC).

    Parameters
    ----------
    df_probs : pd.DataFrame
        DataFrame containing probabilities.
    country : str
        Country name.
    system_name : str
        System identifier.
    save_path : str, optional
        Path to save the figure. If None, plot is not saved.
    """
    try:
        fig, ax = plt.subplots(figsize=(12, 6))

        # Plot probabilities
        ax.plot(df_probs.index, df_probs["prob_regime1"], label="Minsky Regime (Regime 1)", color="black", linestyle="-")
        ax.plot(df_probs.index, df_probs["prob_regime2"], label="No Minsky Regime (Regime 2)", color="gray", linestyle="--")

        # Add crisis markers
        crisis_years = [1973, 2001, 2008]
        for year in crisis_years:
            # Convert year to timestamp approx
            date = pd.Timestamp(f"{year}-01-01")
            if date >= df_probs.index.min() and date <= df_probs.index.max():
                ax.axvline(x=date, color="red", linestyle=":", alpha=0.6, label=f"Crisis {year}" if year == 1973 else "")

        # Formatting
        ax.set_title(f"Regime Probabilities: {country} - {system_name}")
        ax.set_ylabel("Probability")
        ax.set_xlabel("Year")
        ax.set_ylim(-0.05, 1.05)
        ax.legend(loc="best")
        ax.grid(True, alpha=0.3)

        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches="tight")
            logger.info(f"Saved plot to {save_path}")

        plt.close(fig)

    except Exception as e:
        raise VisualizationError(f"Plotting failed for {country} - {system_name}: {e}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def extract_regime_probabilities(
    msvar_estimates: Dict[Tuple[str, str], Dict[str, Any]],
    bivariate_systems: Dict[str, Dict[str, Any]]
) -> Dict[Tuple[str, str], Dict[str, Any]]:
    """
    Orchestrates the extraction, analysis, and visualization of regime probabilities.

    Parameters
    ----------
    msvar_estimates : Dict
        Output from Task 15.
    bivariate_systems : Dict
        Data dictionary (for indices).

    Returns
    -------
    Dict
        Results keyed by (country, system) containing the probability DataFrame and metrics.
    """
    logger.info("Starting regime probability analysis...")

    regime_results = {}

    for key, estimate in msvar_estimates.items():
        country, system_name = key

        try:
            # Retrieve data index
            index = bivariate_systems[country][system_name]["index"]

            # Retrieve estimates
            smoothed_probs = estimate["smoothed_probs"]
            P = estimate["params"]["P"]

            # Step 1 & 2: Compute Metrics
            df_probs, metrics = compute_regime_metrics(smoothed_probs, P, index)

            # Step 3: Visualize (Optional: set save_path to actually save)
            # We define the function but don't call it to avoid filesystem I/O in this environment
            # plot_regime_probabilities(df_probs, country, system_name)

            regime_results[key] = {
                "probabilities": df_probs,
                "metrics": metrics
            }

            logger.info(f"Analyzed regimes for {country} - {system_name}. Regime 1 Fraction: {metrics['fraction_regime1']:.2f}")

        except Exception as e:
            logger.error(f"Regime analysis failed for {key}: {e}")

    logger.info("Regime analysis completed.")
    return regime_results


In [None]:
# Task 19: Design End-to-End Orchestrator Function

# ==============================================================================
# Task 19: Design End-to-End Orchestrator Function
# ==============================================================================

class PipelineError(Exception):
    """
    Custom exception for critical pipeline failures.

    Purpose:
        This class serves as the terminal exception identifier for the MS-VAR
        analytical pipeline. It is raised when an unrecoverable error occurs that
        cannot be isolated to a specific module (like filtering or optimization)
        but rather indicates a structural breakdown of the workflow (e.g.,
        dependency failures, input/output corruption, or critical configuration
        inconsistencies).

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the catastrophic
        failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy to signal the need for an
        immediate, hard stop of the execution script.

    Outputs:
        When raised, it propagates a `PipelineError` instance up the call
        stack, triggering the final error logging and cleanup routines.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 1, 2 & 3: Define orchestrator interface, sequence, and controls.
# -------------------------------------------------------------------------------------------------------------------------------

def run_minsky_msvar_pipeline(
    study_config: Dict[str, Any],
    raw_datasets: Dict[str, pd.DataFrame]
) -> Dict[str, Any]:
    """
    Orchestrates the complete MS-VAR estimation and analysis pipeline.

    This function executes the research workflow from raw data validation to
    final regime analysis, ensuring strict adherence to the defined methodology.

    Sequence:
    1.  Validation (Config & Data)
    2.  Data Engineering (Cleansing, Log Transform, HP Filter)
    3.  Econometric Setup (Bivariate Systems, ADF Tests)
    4.  Estimation (Initialization, EM Algorithm)
    5.  Diagnostics (Residuals, Minsky Conditions)
    6.  Analysis (Regime Probabilities)

    Parameters
    ----------
    study_config : Dict[str, Any]
        The master configuration dictionary.
    raw_datasets : Dict[str, pd.DataFrame]
        The raw country datasets.

    Returns
    -------
    Dict[str, Any]
        A comprehensive results dictionary containing all intermediate and final outputs.

    Raises
    ------
    PipelineError
        If a critical stage of the pipeline fails.
    """
    start_time = datetime.now()
    logger.info("Starting Minsky MS-VAR Research Pipeline...")

    # Reproducibility
    np.random.seed(42) # Fixed seed for any stochastic elements (e.g. initialization perturbation)

    results = {
        "config": study_config,
        "timestamp": start_time.isoformat()
    }

    try:
        # --- Phase 1: Validation ---
        logger.info("Phase 1: Validation")
        # Task 1: Validate Config
        validate_study_config(study_config)

        # Task 2: Validate Raw Data
        validate_raw_datasets(raw_datasets)

        # --- Phase 2: Data Engineering ---
        logger.info("Phase 2: Data Engineering")
        # Task 3: Cleansing
        cleansing_output = cleanse_and_align_datasets(raw_datasets)
        cleansed_data = cleansing_output["cleansed_datasets"]
        results["data_summary"] = cleansing_output["summary_statistics"]

        # Task 4: Transformation
        logged_data, transform_meta = apply_log_transform(cleansed_data)
        results["transformation_metadata"] = transform_meta

        # Task 5: Signal Extraction
        cycles_data = extract_hp_cycles(logged_data, study_config)

        # --- Phase 3: Econometric Setup ---
        logger.info("Phase 3: Econometric Setup")
        # Task 6: Bivariate Systems
        bivariate_systems = construct_bivariate_systems(cycles_data)

        # Task 7 & 8: Stationarity Tests
        adf_summary = execute_all_adf_tests(bivariate_systems, study_config)
        results["adf_tests"] = adf_summary

        # Check stationarity critical failure
        # We proceed even if some fail, but log it heavily (as per Task 8 logic)
        if not adf_summary["all_stationary"]:
            logger.warning("Some series failed stationarity tests. Proceeding with caution.")

        # --- Phase 4: Estimation ---
        logger.info("Phase 4: Estimation")
        # Task 10: Initialization
        initial_params = initialize_em_parameters(bivariate_systems)

        # Task 15: Batch Estimation (includes Tasks 11-14 internally)
        msvar_estimates = estimate_all_msvar_models(bivariate_systems, initial_params, study_config)
        results["msvar_estimates"] = msvar_estimates

        if not msvar_estimates:
            raise PipelineError("No models were successfully estimated.")

        # --- Phase 5: Diagnostics & Analysis ---
        logger.info("Phase 5: Diagnostics & Analysis")
        # Task 16: Residual Diagnostics
        lb_diagnostics = compute_ljung_box_diagnostics(msvar_estimates, bivariate_systems, study_config)
        results["ljung_box_tests"] = lb_diagnostics

        # Task 17: Minsky Conditions
        minsky_analysis = analyze_minsky_conditions(msvar_estimates)
        results["minsky_analysis"] = minsky_analysis

        # Task 18: Regime Probabilities
        regime_probs = extract_regime_probabilities(msvar_estimates, bivariate_systems)
        results["regime_probabilities"] = regime_probs

        end_time = datetime.now()
        duration = end_time - start_time
        logger.info(f"Pipeline completed successfully in {duration}.")

        return results

    except Exception as e:
        logger.critical(f"Pipeline failed: {e}")
        raise PipelineError(f"Critical failure in pipeline execution: {e}")


In [None]:
# Task 20: Design Monte Carlo Robustness Orchestrator

# ==============================================================================
# Task 20: Design Monte Carlo Robustness Orchestrator
# ==============================================================================

class MonteCarloError(Exception):
    """
    Custom exception for failures during Monte Carlo simulation.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered during stochastic simulation processes, such as the generation
        of Generalized Impulse Response Functions (GIRFs) or the bootstrapping of
        standard errors. It isolates failures in the Data Generating Process (DGP)
        reconstruction or random number generation (e.g., Cholesky decomposition
        failures on simulated covariance matrices) from deterministic estimation errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        simulation failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy to facilitate targeted error
        handling during computationally intensive resampling steps.

    Outputs:
        When raised, it propagates a `MonteCarloError` instance up the call
        stack, signaling that the stochastic component of the analysis has failed
        and associated confidence intervals or impulse responses are invalid.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 1, 2 & 3: Orchestrate MC simulations and aggregation.
# -------------------------------------------------------------------------------------------------------------------------------

def run_monte_carlo_robustness(
    msvar_estimates: Dict[Tuple[str, str], Dict[str, Any]],
    study_config: Dict[str, Any]
) -> Dict[Tuple[str, str], Dict[str, Any]]:
    """
    Executes parametric bootstrap Monte Carlo simulations to assess parameter stability.

    For each estimated model:
    1. Simulates N synthetic datasets using the estimated parameters.
    2. Re-estimates the MS-VAR model on each synthetic dataset.
    3. Aggregates parameter estimates to compute means, standard deviations, and confidence intervals.
    4. Assesses the robustness of the Minsky classification (sign patterns and oscillation).

    Parameters
    ----------
    msvar_estimates : Dict
        The dictionary of estimated models from Task 15.
    study_config : Dict
        The study configuration containing MC settings.

    Returns
    -------
    Dict
        A dictionary keyed by (country, system) containing MC statistics.
    """
    # Check if enabled
    mc_config = study_config["model_physics"]["monte_carlo_simulation"]
    if not mc_config["enabled"]:
        logger.info("Monte Carlo simulation disabled in configuration.")
        return {}

    iterations = mc_config["iterations"]
    logger.info(f"Starting Monte Carlo robustness analysis ({iterations} iterations per model)...")

    mc_results = {}

    # Load masks once
    try:
        mask1 = np.array(study_config["model_physics"]["regime_constraints"]["regime_1_mask"], dtype=int)
        mask2 = np.array(study_config["model_physics"]["regime_constraints"]["regime_2_mask"], dtype=int)
        masks = {0: mask1, 1: mask2}
    except KeyError:
        raise MonteCarloError("Regime masks missing in config.")

    for key, estimate in msvar_estimates.items():
        country, system_name = key
        logger.info(f"Running MC for {country} - {system_name}")

        # Retrieve estimated parameters (True parameters for DGP)
        true_params = estimate["params"]
        T = len(estimate["smoothed_probs"]) # Sample size

        # Storage for replications
        replications = []

        success_count = 0

        for r in range(iterations):
            try:
                # 1. Simulate Data (Task 21)
                # simulate_msvar_path is defined in Task 21 context
                sim_data, _ = simulate_msvar_path(T, true_params)

                # 2. Re-estimate (Task 15 logic)
                # We use the true parameters as initialization to speed up convergence
                # and avoid label switching issues common in mixture models.
                # estimate_single_model is defined in Task 15 context
                sim_result = estimate_single_model(sim_data, true_params, masks, study_config)

                replications.append(sim_result["params"])
                success_count += 1

            except Exception as e:
                # Log failure but continue
                # logger.debug(f"MC replication {r} failed: {e}")
                pass

        if success_count < iterations * 0.5:
            logger.warning(f"High failure rate in MC for {key}: {success_count}/{iterations} successful.")

        # 3. Aggregate Results (Task 22)
        # aggregate_mc_results is defined in Task 22 context
        if success_count > 0:
            stats = aggregate_mc_results(replications)
            mc_results[key] = stats
        else:
            logger.error(f"All MC replications failed for {key}.")

    logger.info("Monte Carlo analysis completed.")
    return mc_results


In [None]:
# Task 21: Implement Monte Carlo Data Generation

# ==============================================================================
# Task 21: Implement Monte Carlo Data Generation
# ==============================================================================

class SimulationError(Exception):
    """
    Custom exception for failures during data simulation.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered when generating synthetic data from the estimated MS-VAR
        model parameters (e.g., for validating model recovery or stress testing).
        It distinguishes failures in the Data Generating Process (DGP) logic—such
        as unstable regime transitions or explosive autoregressive processes
        in the simulated paths—from standard estimation errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        simulation constraint violation.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy to facilitate targeted error
        handling during model validation and counterfactual analysis steps.

    Outputs:
        When raised, it propagates a `SimulationError` instance up the call
        stack, signaling that the generation of the synthetic trajectory has
        failed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 1: Define regime simulation from estimated transition matrix.
# -------------------------------------------------------------------------------------------------------------------------------

def simulate_regime_path(T: int, P: np.ndarray) -> np.ndarray:
    """
    Simulates a sequence of regimes based on the Markov transition matrix.

    Parameters
    ----------
    T : int
        Length of the sequence.
    P : np.ndarray
        (2, 2) transition matrix.

    Returns
    -------
    np.ndarray
        (T,) array of integer regimes (0 or 1).
    """
    # 1. Compute Stationary Distribution
    # Solve pi = pi * P => (I - P.T) * pi = 0
    eigvals, eigvecs = np.linalg.eig(P.T)
    idx = np.argmin(np.abs(eigvals - 1.0))
    pi = np.real(eigvecs[:, idx])
    pi /= np.sum(pi)

    # 2. Simulate Path
    regimes = np.zeros(T, dtype=int)

    # Initial state
    regimes[0] = np.random.choice([0, 1], p=pi)

    # Subsequent states
    for t in range(1, T):
        prev = regimes[t-1]
        probs = P[prev]
        # Ensure probs sum to 1 (handle float errors)
        probs = probs / np.sum(probs)
        regimes[t] = np.random.choice([0, 1], p=probs)

    return regimes

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 2: Simulate observables from regime-dependent VAR.
# -------------------------------------------------------------------------------------------------------------------------------

def generate_var_observables(
    regimes: np.ndarray,
    params: Dict[str, np.ndarray]
) -> np.ndarray:
    """
    Simulates bivariate time series data given a regime sequence and VAR parameters.

    Model: y_t = A_{s_t} * y_{t-1} + epsilon_t
           epsilon_t ~ N(0, Sigma_{s_t})

    Parameters
    ----------
    regimes : np.ndarray
        (T,) array of regimes.
    params : Dict[str, np.ndarray]
        Dictionary containing 'A' and 'Sigma'.

    Returns
    -------
    np.ndarray
        (T, 2) array of simulated data.
    """
    T = len(regimes)
    K = 2
    A = params["A"]
    Sigma = params["Sigma"]

    data = np.zeros((T, K))

    # Initialize y_0 at the unconditional mean (0 for cycles)
    # Or draw from stationary distribution of first regime
    # For simplicity and robustness, we start at 0.
    data[0] = np.zeros(K)

    for t in range(1, T):
        s_t = regimes[t]

        # Draw innovation
        # Check PD of Sigma before drawing? Assumed valid from estimation.
        try:
            epsilon = np.random.multivariate_normal(np.zeros(K), Sigma[s_t])
        except np.linalg.LinAlgError:
            # Fallback for near-singular matrices in simulation
            # Add small ridge
            Sigma_reg = Sigma[s_t] + 1e-6 * np.eye(K)
            epsilon = np.random.multivariate_normal(np.zeros(K), Sigma_reg)

        # Autoregressive update
        # y_t = A[s_t] @ y_{t-1} + epsilon
        data[t] = A[s_t] @ data[t-1] + epsilon

    return data

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 3: Verify simulated data properties.
# -------------------------------------------------------------------------------------------------------------------------------

def verify_simulation(data: np.ndarray) -> bool:
    """
    Checks simulated data for validity (no explosion, no NaNs).

    Parameters
    ----------
    data : np.ndarray
        Simulated data.

    Returns
    -------
    bool
        True if valid.
    """
    if not np.isfinite(data).all():
        return False

    # Check for explosion
    # If values exceed a reasonable threshold (e.g., 100 standard deviations of typical cycles)
    # Typical cycle std is ~0.02 (log GDP). 100 * 0.02 = 2.0.
    # Let's set a loose bound of 10.0 for log-levels/ratios to catch true divergence.
    if np.max(np.abs(data)) > 10.0:
        return False

    return True

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def simulate_msvar_path(
    T: int,
    params: Dict[str, np.ndarray],
    max_retries: int = 5
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Orchestrates the simulation of a synthetic MS-VAR path.

    Parameters
    ----------
    T : int
        Length of simulation.
    params : Dict[str, np.ndarray]
        Model parameters.
    max_retries : int
        Number of attempts to generate a stable path.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray]
        - Simulated data (T, 2).
        - Regime sequence (T,).

    Raises
    ------
    SimulationError
        If valid data cannot be generated after max_retries.
    """
    for attempt in range(max_retries):
        try:
            # 1. Regimes
            regimes = simulate_regime_path(T, params["P"])

            # 2. Observables
            data = generate_var_observables(regimes, params)

            # 3. Verify
            if verify_simulation(data):
                return data, regimes

        except Exception:
            continue

    raise SimulationError(f"Failed to generate stable simulation after {max_retries} attempts.")


In [None]:
# Task 22: Aggregate Monte Carlo Results

# ==============================================================================
# Task 22: Aggregate Monte Carlo Results
# ==============================================================================

class AggregationError(Exception):
    """
    Custom exception for failures during MC aggregation.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered when compiling, summarizing, or reducing the results of
        numerous Monte Carlo iterations (e.g., calculating Generalized Impulse
        Response Functions or confidence intervals). It flags issues such as
        dimensionality mismatches when stacking result arrays, memory exhaustion
        during large-scale tensor operations, or failures in quantile calculations
        due to insufficient valid simulation draws.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        aggregation failure (e.g., "Shape mismatch during GIRF stacking").

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the simulation
        controller to distinguish between errors in individual simulation runs
        and critical errors in the final synthesis of the distribution.

    Outputs:
        When raised, it propagates an `AggregationError` instance up the call
        stack, signaling that the statistical consolidation of the simulation
        outputs has failed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 1: Compute parameter summary statistics.
# -------------------------------------------------------------------------------------------------------------------------------

def compute_parameter_stats(replications: List[Dict[str, np.ndarray]]) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Computes mean, standard deviation, and confidence intervals for all parameters.

    Parameters
    ----------
    replications : List[Dict[str, np.ndarray]]
        List of parameter dictionaries from MC simulations.

    Returns
    -------
    Dict[str, Dict[str, np.ndarray]]
        Nested dictionary: param_name -> {'mean': ..., 'std': ..., 'ci_lower': ..., 'ci_upper': ...}
    """
    if not replications:
        raise AggregationError("No replications to aggregate.")

    # Stack parameters
    # keys: 'A', 'Sigma', 'P'
    keys = replications[0].keys()
    stacked = {k: [] for k in keys}

    for rep in replications:
        for k in keys:
            stacked[k].append(rep[k])

    stats = {}
    for k in keys:
        # Shape: (N_reps, ...)
        arr = np.array(stacked[k])

        stats[k] = {
            "mean": np.mean(arr, axis=0),
            "std": np.std(arr, axis=0, ddof=1),
            "ci_lower": np.percentile(arr, 2.5, axis=0),
            "ci_upper": np.percentile(arr, 97.5, axis=0)
        }

    return stats

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 2: Assess Minsky condition robustness.
# -------------------------------------------------------------------------------------------------------------------------------

def assess_minsky_robustness(replications: List[Dict[str, np.ndarray]]) -> Dict[str, float]:
    """
    Computes the fraction of MC replications that satisfy Minsky conditions.

    Conditions:
    1. Sign Pattern: beta1 > 0 AND alpha2 < 0
    2. Oscillation: Delta' < 0

    Parameters
    ----------
    replications : List[Dict[str, np.ndarray]]
        List of parameter dictionaries.

    Returns
    -------
    Dict[str, float]
        - 'frac_sign': Fraction satisfying sign pattern.
        - 'frac_osc': Fraction satisfying oscillation condition.
        - 'frac_robust': Fraction satisfying both.
    """
    count_sign = 0
    count_osc = 0
    count_both = 0
    n = len(replications)

    for rep in replications:
        # Extract Regime 1 parameters
        A1 = rep["A"][0]
        alpha1 = A1[0, 0]
        alpha2 = A1[0, 1]
        beta1 = A1[1, 0]
        beta2 = A1[1, 1]

        # Check signs
        sign_valid = (beta1 > 0) and (alpha2 < 0)

        # Check oscillation
        delta_prime = (alpha1 - beta2)**2 + 4 * alpha2 * beta1
        osc_valid = delta_prime < 0

        if sign_valid:
            count_sign += 1
        if osc_valid:
            count_osc += 1
        if sign_valid and osc_valid:
            count_both += 1

    return {
        "frac_sign": count_sign / n,
        "frac_osc": count_osc / n,
        "frac_robust": count_both / n
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 3: Organize results in replication tables.
# -------------------------------------------------------------------------------------------------------------------------------

def create_summary_table(stats: Dict[str, Dict[str, np.ndarray]]) -> pd.DataFrame:
    """
    Flattens parameter statistics into a readable DataFrame.

    Parameters
    ----------
    stats : Dict
        Output from compute_parameter_stats.

    Returns
    -------
    pd.DataFrame
        Table with columns: Parameter, Mean, Std, 95% CI Lower, 95% CI Upper.
    """
    rows = []

    # Helper to add rows
    def add_rows(param_name, stat_dict):
        mean = stat_dict["mean"]
        std = stat_dict["std"]
        lower = stat_dict["ci_lower"]
        upper = stat_dict["ci_upper"]

        it = np.nditer(mean, flags=['multi_index'])
        for _ in it:
            idx = it.multi_index
            rows.append({
                "Parameter": f"{param_name}{idx}",
                "Mean": mean[idx],
                "Std": std[idx],
                "CI_Lower": lower[idx],
                "CI_Upper": upper[idx]
            })

    for k, v in stats.items():
        add_rows(k, v)

    return pd.DataFrame(rows)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_mc_results(replications: List[Dict[str, np.ndarray]]) -> Dict[str, Any]:
    """
    Orchestrates the aggregation of Monte Carlo simulation results.

    Parameters
    ----------
    replications : List[Dict[str, np.ndarray]]
        List of parameter dictionaries from MC runs.

    Returns
    -------
    Dict[str, Any]
        - 'parameter_stats': Nested dict of stats.
        - 'robustness_metrics': Dict of Minsky robustness fractions.
        - 'summary_table': DataFrame of parameter stats.
    """
    if not replications:
        return {}

    # Step 1: Parameter Stats
    param_stats = compute_parameter_stats(replications)

    # Step 2: Robustness
    robustness = assess_minsky_robustness(replications)

    # Step 3: Table
    table = create_summary_table(param_stats)

    return {
        "parameter_stats": param_stats,
        "robustness_metrics": robustness,
        "summary_table": table
    }


In [None]:
# Task 23: Cross-Validate Results Against Paper Tables

# ==============================================================================
# Task 23: Cross-Validate Results Against Paper Tables
# ==============================================================================

class ValidationReportError(Exception):
    """
    Custom exception for failures during cross-validation.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered during the model validation stage. It flags failures in
        generating performance metrics (e.g., RMSE, MAE) across cross-validation
        folds, or errors in data splitting logic (e.g., leakage between training
        and testing sets) that compromise the integrity of the out-of-sample
        evaluation.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        reporting failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the validation
        framework to abort defective evaluation runs without crashing the
        underlying estimation engine, ensuring that invalid performance reports
        are not generated.

    Outputs:
        When raised, it propagates a `ValidationReportError` instance up the call
        stack, signaling that the cross-validation or reporting routine has
        failed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

import numpy as np
import pandas as pd
from typing import Dict, Any, Tuple, List
import logging

# ==============================================================================
# Task 23: Cross-Validate Results Against Paper Tables
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 1: Extract paper benchmark values.
# -------------------------------------------------------------------------------------------------------------------------------

def load_paper_benchmarks() -> Dict[Tuple[str, str], Dict[str, float]]:
    """
    Loads benchmark parameter estimates from the original paper's tables.

    This function serves as the ground truth repository for cross-validation.
    It contains hardcoded values extracted directly from the published results
    (e.g., Table 1 for GDP/NFCD) to allow for automated comparison with the
    replication's estimates.

    Note: In a production environment, this data would typically be loaded from
    an external CSV or JSON file to separate data from code.

    Returns
    -------
    Dict[Tuple[str, str], Dict[str, float]]
        A dictionary mapping a (country, system) tuple to a dictionary of
        parameter values (e.g., 'alpha1_r1', 'beta1_r1', 'p11').
    """
    # Values from Table 1: Estimation Results for GDP/NFCD
    benchmarks = {
        ("USA", "GDP_NFCD"): {
            "alpha1_r1": 0.8215, "alpha2_r1": -0.1066,
            "beta1_r1": 1.6348,  "beta2_r1": 0.4027,
            "psi1_r2": 0.5389,   "omega2_r2": 0.9884,
            "p11": 0.837,        "p22": 0.868
        },
        ("UK", "GDP_NFCD"): {
            "alpha1_r1": 0.8541, "alpha2_r1": -0.0924,
            "beta1_r1": 1.4605,  "beta2_r1": 0.7026,
            "psi1_r2": 0.04429,  "omega2_r2": 0.5760,
            "p11": 0.900,        "p22": 0.690
        },
        ("Germany", "GDP_NFCD"): {
            "alpha1_r1": 0.5682, "alpha2_r1": -0.1550,
            "beta1_r1": 0.59835, "beta2_r1": 0.8093,
            "psi1_r2": 0.6848,   "omega2_r2": 0.5400,
            "p11": 0.945,        "p22": 0.850
        }
        # Add other countries/systems as needed from paper tables
    }
    return benchmarks

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 2: Compute discrepancies and classify alignment.
# -------------------------------------------------------------------------------------------------------------------------------

def compute_relative_errors(
    estimates: Dict[str, Any],
    benchmarks: Dict[str, float]
) -> Dict[str, Any]:
    """
    Computes relative errors between estimated parameters and paper benchmarks.

    This function calculates the relative error (RE) for each parameter to quantify
    the deviation of the replication results from the published values. It also
    classifies the alignment quality based on predefined thresholds.

    Formula:
        RE = |estimate - benchmark| / (|benchmark| + epsilon)
        where epsilon = 1e-10 to prevent division by zero.

    Parameters
    ----------
    estimates : Dict[str, Any]
        The dictionary of estimated model results, containing 'params' which holds
        the 'A' and 'P' matrices.
    benchmarks : Dict[str, float]
        The dictionary of benchmark values for the specific model.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing:
        - 'relative_errors': Dict mapping parameter names to their RE values.
        - 'classifications': Dict mapping parameter names to alignment categories
          (Exact, Close, Moderate, Large).
        - 'estimated_values': Dict of the extracted estimate values used for comparison.
        - 'benchmark_values': The input benchmark values.
    """
    params = estimates["params"]
    A = params["A"]
    P = params["P"]

    # Map internal array structure to benchmark keys
    # Regime 1 (Index 0)
    est_map = {
        "alpha1_r1": A[0, 0, 0], "alpha2_r1": A[0, 0, 1],
        "beta1_r1": A[0, 1, 0],  "beta2_r1": A[0, 1, 1],

        # Regime 2 (Index 1) - Diagonal
        "psi1_r2": A[1, 0, 0],   "omega2_r2": A[1, 1, 1],

        # Transition Matrix
        "p11": P[0, 0],          "p22": P[1, 1]
    }

    errors = {}
    classifications = {}

    # Extract benchmarks
    for key, paper_val in benchmarks.items():
        if key in est_map:
            est_val = est_map[key]
            re = abs(est_val - paper_val) / (abs(paper_val) + 1e-10)
            errors[key] = re

            if re < 0.01:
                classifications[key] = "Exact Match"
            elif re < 0.05:
                classifications[key] = "Close Match"
            elif re < 0.10:
                classifications[key] = "Moderate Discrepancy"
            else:
                classifications[key] = "Large Discrepancy"

    return {
        "relative_errors": errors,
        "classifications": classifications,
        "estimated_values": est_map,
        "benchmark_values": benchmarks
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 3: Document and diagnose discrepancies.
# -------------------------------------------------------------------------------------------------------------------------------

def generate_validation_report(
    comparison_results: Dict[str, Any]
) -> str:
    """
    Generates a text report summarizing the cross-validation results.

    This function creates a human-readable summary of the alignment between the
    replication and the benchmark. It aggregates the counts of alignment classifications
    and explicitly lists any parameters with large discrepancies (>10% RE) to facilitate
    diagnosis.

    Parameters
    ----------
    comparison_results : Dict[str, Any]
        The output dictionary from `compute_relative_errors`.

    Returns
    -------
    str
        A formatted string containing the validation summary and details of discrepancies.
    """
    report = []
    report.append("--- Cross-Validation Report ---")

    classifications = comparison_results["classifications"]
    errors = comparison_results["relative_errors"]

    # Count discrepancies
    counts = {"Exact Match": 0, "Close Match": 0, "Moderate Discrepancy": 0, "Large Discrepancy": 0}
    for c in classifications.values():
        counts[c] += 1

    report.append(f"Summary: {counts}")

    # List large discrepancies
    large_discrepancies = {k: v for k, v in errors.items() if v >= 0.10}
    if large_discrepancies:
        report.append("\nLarge Discrepancies (>10% RE):")
        for k, re in large_discrepancies.items():
            est = comparison_results["estimated_values"][k]
            paper = comparison_results["benchmark_values"][k]
            report.append(f"  {k}: Est={est:.4f}, Paper={paper:.4f}, RE={re:.4f}")

    return "\n".join(report)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cross_validate_results(
    msvar_estimates: Dict[Tuple[str, str], Dict[str, Any]]
) -> Dict[Tuple[str, str], Dict[str, Any]]:
    """
    Orchestrates the cross-validation of results against paper benchmarks.

    This function manages the validation workflow: loading benchmarks, computing
    errors for each estimated model, generating reports, and logging the outcomes.
    It ensures that the replication results are systematically compared against
    the published findings.

    Parameters
    ----------
    msvar_estimates : Dict[Tuple[str, str], Dict[str, Any]]
        The dictionary of estimated MS-VAR models from Task 15.

    Returns
    -------
    Dict[Tuple[str, str], Dict[str, Any]]
        A dictionary of validation results keyed by (country, system), containing
        metrics and the text report.
    """
    logging.info("Starting cross-validation against paper benchmarks...")

    benchmarks = load_paper_benchmarks()
    validation_results = {}

    # Extract benchmarks
    for key, benchmark_vals in benchmarks.items():
        if key in msvar_estimates:
            estimate = msvar_estimates[key]

            # Compute Errors
            comp_res = compute_relative_errors(estimate, benchmark_vals)

            # Generate Report
            report_text = generate_validation_report(comp_res)

            validation_results[key] = {
                "metrics": comp_res,
                "report": report_text
            }

            logging.info(f"Validated {key}. Report:\n{report_text}")
        else:
            logging.warning(f"Benchmark available for {key} but no estimate found.")

    logging.info("Cross-validation completed.")
    return validation_results


In [None]:
# Task 24: Final Synthesis and Documentation

# ==============================================================================
# Task 24: Final Synthesis and Documentation
# ==============================================================================

class ReportingError(Exception):
    """
    Custom exception for failures during report generation.

    Purpose:
        This class serves as a specific exception identifier for runtime errors
        encountered when compiling the final analytical outputs. It distinguishes
        failures in the presentation or I/O layer—such as permission errors when
        saving LaTeX tables, formatting errors in CSV exports, or missing
        dependencies for PDF generation—from the core mathematical estimation
        or simulation errors.

    Inputs:
        No specific input parameters are enforced by this class definition; it
        accepts the standard arguments passed to the base `Exception` class,
        typically a descriptive error message string detailing the specific
        reporting failure.

    Processes:
        The class inherits the standard behavior and attributes of the Python
        `Exception` class. It does not implement custom logic but establishes
        a unique type in the exception hierarchy. This allows the main execution
        script to gracefully handle output failures (e.g., by logging the error
        without losing the calculated results in memory).

    Outputs:
        When raised, it propagates a `ReportingError` instance up the call
        stack, signaling that the creation of the final project artifacts has
        failed.
    """
    # Execute the pass statement to maintain valid class syntax
    pass

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 1: Compile comprehensive results structure.
# -------------------------------------------------------------------------------------------------------------------------------

def compile_master_results(
    config: Dict[str, Any],
    data_artifacts: Dict[str, Any],
    estimates: Dict[str, Any],
    diagnostics: Dict[str, Any],
    analysis: Dict[str, Any],
    mc_results: Dict[str, Any],
    validation: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Aggregates all pipeline outputs into a single master results dictionary.

    Parameters
    ----------
    config : Dict
        Study configuration.
    data_artifacts : Dict
        Cleansed data, cycles, systems.
    estimates : Dict
        MS-VAR parameter estimates.
    diagnostics : Dict
        Residual diagnostics.
    analysis : Dict
        Minsky condition analysis and regime probabilities.
    mc_results : Dict
        Monte Carlo robustness stats.
    validation : Dict
        Cross-validation results.

    Returns
    -------
    Dict
        The master results object.
    """
    # Convert tuple keys to strings for JSON serialization compatibility if needed later
    # (Tuple keys are valid in Python dicts but not JSON)
    # We keep tuple keys for internal consistency, but a serializer would need to handle them.

    master = {
        "metadata": {
            "timestamp": datetime.now().isoformat(),
            "description": "Final results for Minsky MS-VAR Replication"
        },
        "configuration": config,
        "data": data_artifacts,
        "estimation": {
            "msvar_models": estimates
        },
        "diagnostics": {
            "ljung_box": diagnostics
        },
        "analysis": {
            "minsky_conditions": analysis["minsky_conditions"],
            "regime_probabilities": analysis["regime_probabilities"]
        },
        "robustness": {
            "monte_carlo": mc_results
        },
        "validation": {
            "paper_comparison": validation
        }
    }
    return master

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 2: Generate publication-ready tables.
# -------------------------------------------------------------------------------------------------------------------------------

def generate_latex_tables(master_results: Dict[str, Any]) -> Dict[str, str]:
    """
    Generates LaTeX-formatted tables for parameters and diagnostics.

    Parameters
    ----------
    master_results : Dict
        The master results object.

    Returns
    -------
    Dict[str, str]
        Dictionary mapping table names to LaTeX source strings.
    """
    tables = {}

    # 1. Parameter Estimates Table (Simplified view)
    # We iterate over models and extract key parameters
    rows = []
    estimates = master_results["estimation"]["msvar_models"]

    for key, res in estimates.items():
        country, system = key
        params = res["params"]
        A = params["A"]
        P = params["P"]

        # Extract specific coefficients for reporting (Regime 1)
        alpha1 = A[0, 0, 0]
        alpha2 = A[0, 0, 1]
        beta1 = A[0, 1, 0]
        beta2 = A[0, 1, 1]
        p11 = P[0, 0]
        p22 = P[1, 1]

        rows.append({
            "Country": country,
            "System": system,
            "$\\alpha_1$": f"{alpha1:.3f}",
            "$\\alpha_2$": f"{alpha2:.3f}",
            "$\\beta_1$": f"{beta1:.3f}",
            "$\\beta_2$": f"{beta2:.3f}",
            "$p_{11}$": f"{p11:.3f}",
            "$p_{22}$": f"{p22:.3f}"
        })

    df_params = pd.DataFrame(rows)
    tables["parameter_estimates"] = df_params.to_latex(index=False, escape=False)

    # 2. Minsky Classification Table
    minsky_res = master_results["analysis"]["minsky_conditions"]
    rows_minsky = []
    for key, res in minsky_res.items():
        country, system = key
        rows_minsky.append({
            "Country": country,
            "System": system,
            "Oscillatory": "Yes" if res["dynamics"]["is_oscillatory"] else "No",
            "Sign Pattern": "Yes" if res["signs"]["pattern_valid"] else "No",
            "Significant": "Yes" if res["significance"]["valid"] else "No",
            "Is Minsky": "YES" if res["is_minsky"] else "NO"
        })

    df_minsky = pd.DataFrame(rows_minsky)
    tables["minsky_classification"] = df_minsky.to_latex(index=False)

    return tables

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 3: Document implementation choices.
# -------------------------------------------------------------------------------------------------------------------------------

def generate_technical_appendix(config: Dict[str, Any]) -> str:
    """
    Generates a text-based technical appendix documenting implementation details.

    Parameters
    ----------
    config : Dict
        Study configuration.

    Returns
    -------
    str
        The technical appendix text.
    """
    lines = []
    lines.append("# Technical Appendix: Minsky MS-VAR Replication")
    lines.append(f"Generated: {datetime.now().isoformat()}")
    lines.append("\n## 1. Data Processing")
    lines.append("- **Log Transformation**: Applied to Real GDP only.")
    lines.append("- **HP Filter**: Lambda = 100.0 (Annual data).")
    lines.append("- **Cleansing**: Linear interpolation for single-year gaps; truncation for multi-year gaps.")

    lines.append("\n## 2. Model Specification")
    lines.append("- **MS-VAR(1)**: Zero intercept.")
    lines.append("- **Regime 1**: Unrestricted interaction.")
    lines.append("- **Regime 2**: Diagonal coefficient matrix (No interaction).")
    lines.append("- **Estimation**: EM Algorithm with Hamilton Filter and Kim Smoother.")

    lines.append("\n## 3. Statistical Tests")
    lines.append("- **ADF Test**: Constant only (no trend). Critical value: -1.94.")
    lines.append("- **Ljung-Box**: Lags = 3. Critical value: 6.63.")

    lines.append("\n## 4. Robustness")
    mc_conf = config["model_physics"]["monte_carlo_simulation"]
    lines.append(f"- **Monte Carlo**: {mc_conf['iterations']} iterations, Parametric Bootstrap.")

    return "\n".join(lines)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def synthesize_final_report(
    config: Dict[str, Any],
    data_artifacts: Dict[str, Any],
    estimates: Dict[str, Any],
    diagnostics: Dict[str, Any],
    analysis: Dict[str, Any],
    mc_results: Dict[str, Any],
    validation: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the final synthesis of the project.

    Sequence:
    1. Compile master results dictionary.
    2. Generate LaTeX tables.
    3. Generate technical documentation.

    Parameters
    ----------
    [All pipeline outputs]

    Returns
    -------
    Dict
        Contains 'master_results', 'latex_tables', 'technical_appendix'.
    """
    logger.info("Synthesizing final report...")

    # 1. Master Results
    master = compile_master_results(
        config, data_artifacts, estimates, diagnostics, analysis, mc_results, validation
    )

    # 2. Tables
    latex_tables = generate_latex_tables(master)

    # 3. Documentation
    appendix = generate_technical_appendix(config)

    logger.info("Synthesis complete.")

    return {
        "master_results": master,
        "latex_tables": latex_tables,
        "technical_appendix": appendix
    }



In [None]:
# Top-Level Orchestrator

# ==============================================================================
# Top-Level Project Orchestration
# ==============================================================================

def execute_minsky_research_project(
    study_config: Dict[str, Any],
    raw_datasets: Dict[str, pd.DataFrame]
) -> Dict[str, Any]:
    """
    Executes the complete end-to-end research project for "Regime Changes and Real-Financial Cycles".

    This top-level orchestrator manages the execution of the four major research phases:
    1.  **Main Estimation Pipeline (Task 19)**: Validation, Data Engineering, Estimation, Diagnostics, Analysis.
    2.  **Robustness Analysis (Task 20)**: Monte Carlo simulations (invoking Tasks 21 & 22 internally).
    3.  **Cross-Validation (Task 23)**: Comparison against published benchmarks.
    4.  **Synthesis (Task 24)**: Generation of final reports, tables, and documentation.

    Parameters
    ----------
    study_config : Dict[str, Any]
        The master configuration dictionary defining model physics and data schemas.
    raw_datasets : Dict[str, pd.DataFrame]
        The raw input data for all countries.

    Returns
    -------
    Dict[str, Any]
        A comprehensive dictionary containing the outputs of every stage of the project.
    """
    project_start_time = datetime.now()
    logger.info("INITIATING MINSKY RESEARCH PROJECT EXECUTION")

    try:
        # ----------------------------------------------------------------------
        # Phase 1: Main Estimation Pipeline (Task 19)
        # ----------------------------------------------------------------------
        logger.info(">>> Executing Phase 1: Main Estimation Pipeline (Task 19)")
        # run_minsky_msvar_pipeline is the orchestrator for Tasks 1-18
        pipeline_results = run_minsky_msvar_pipeline(study_config, raw_datasets)

        # Extract key artifacts needed for subsequent phases
        msvar_estimates = pipeline_results["msvar_estimates"]
        bivariate_systems = pipeline_results["data"]["bivariate_systems"] # Implicitly available in data artifacts

        # ----------------------------------------------------------------------
        # Phase 2: Robustness Analysis (Task 20)
        # ----------------------------------------------------------------------
        logger.info(">>> Executing Phase 2: Monte Carlo Robustness (Task 20)")
        # run_monte_carlo_robustness orchestrates Tasks 21 (Simulation) and 22 (Aggregation)
        mc_results = run_monte_carlo_robustness(msvar_estimates, study_config)

        # ----------------------------------------------------------------------
        # Phase 3: Cross-Validation (Task 23)
        # ----------------------------------------------------------------------
        logger.info(">>> Executing Phase 3: Cross-Validation (Task 23)")
        validation_results = cross_validate_results(msvar_estimates)

        # ----------------------------------------------------------------------
        # Phase 4: Final Synthesis (Task 24)
        # ----------------------------------------------------------------------
        logger.info(">>> Executing Phase 4: Final Synthesis (Task 24)")

        # Extract components for synthesis
        data_artifacts = pipeline_results.get("data", {}) # Cleansed data, cycles
        diagnostics = pipeline_results.get("ljung_box_tests", {})
        analysis = {
            "minsky_conditions": pipeline_results.get("minsky_analysis", {}),
            "regime_probabilities": pipeline_results.get("regime_probabilities", {})
        }

        final_report = synthesize_final_report(
            config=study_config,
            data_artifacts=data_artifacts,
            estimates=msvar_estimates,
            diagnostics=diagnostics,
            analysis=analysis,
            mc_results=mc_results,
            validation=validation_results
        )

        project_end_time = datetime.now()
        duration = project_end_time - project_start_time

        logger.info(f"PROJECT EXECUTION COMPLETE. Total Duration: {duration}")

        return final_report

    except Exception as e:
        logger.critical(f"Project execution failed: {e}")
        raise e


#

#