# `README.md`

# A High-Resolution Framework for Analyzing Transatlantic Monetary Spillovers

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Type Checking: mypy](https://img.shields.io/badge/type_checking-mypy-blue)](http://mypy-lang.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2509.13578-b31b1b.svg)](https://arxiv.org/abs/2509.13578)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Discipline](https://img.shields.io/badge/Discipline-International%20Macroeconomics-blue)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Methodology](https://img.shields.io/badge/Methodology-BVAR%20%7C%20Local%20Projection%20%7C%20HF%20Identification-orange)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Data Source](https://img.shields.io/badge/Data-High--Frequency%20Futures%20%26%20Macro-lightgrey)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%23025596?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![Statsmodels](https://img.shields.io/badge/statsmodels-1A568C.svg?style=flat)](https://www.statsmodels.org/stable/index.html)
[![PyYAML](https://img.shields.io/badge/PyYAML-4B5F6E.svg?style=flat)](https://pyyaml.org/)
[![Joblib](https://img.shields.io/badge/joblib-2F72A4.svg?style=flat)](https://joblib.readthedocs.io/en/latest/)
[![TQDM](https://img.shields.io/badge/tqdm-FFC107.svg?style=flat)](https://tqdm.github.io/)
[![Analysis](https://img.shields.io/badge/Analysis-Monetary%20Policy-brightgreen)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Framework](https://img.shields.io/badge/Framework-Bayesian-blueviolet)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Identification](https://img.shields.io/badge/Identification-Sign%20Restrictions-red)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Validation](https://img.shields.io/badge/Validation-Out--of--Sample-yellow)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![Robustness](https://img.shields.io/badge/Robustness-Sensitivity%20Analysis-lightgrey)](https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances)
[![License](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)

--

**Repository:** `https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"In-between Transatlantic (Monetary) Disturbances"** by:

*   Santiago Camara
*   Jeanne Aublin

The project provides a complete, end-to-end computational framework for identifying source-dependent monetary policy shocks and analyzing their international spillovers. It delivers a modular, auditable, and extensible pipeline that replicates the paper's entire workflow: from rigorous high-frequency data processing and validation, through the sophisticated rotational decomposition for shock identification, to the estimation of BVAR and Local Projection models and a comprehensive suite of robustness checks.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: execute_full_study_pipeline](#key-callable-execute_full_study_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "In-between Transatlantic (Monetary) Disturbances." The core of this repository is the iPython Notebook `between_transatlantic_monetary_disturbances_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation to the final generation and analysis of impulse response functions and robustness tests.

The paper addresses a key question in international macroeconomics: How do monetary policy shocks from major economic blocs (the U.S. and the Euro Area) propagate to a smaller, open economy (Canada), and do the transmission channels differ? This codebase operationalizes the paper's advanced approach, allowing users to:
-   Rigorously validate and cleanse high-frequency financial data and low-frequency macroeconomic data.
-   Identify "pure" monetary policy shocks, purged of central bank information effects, using a high-frequency identification strategy with sign restrictions.
-   Estimate the dynamic effects of these shocks using both Bayesian Vector Autoregressions (BVAR) and Local Projections (LP).
-   Conduct a full suite of robustness checks to validate the stability of the findings across different identification schemes, sample periods, and model specifications.
-   Systematically investigate specific transmission channels (e.g., trade, financial) by running augmented models.

## Theoretical Background

The implemented methods are grounded in modern time-series econometrics and international finance.

**1. High-Frequency Identification with Sign Restrictions:**
Standard event studies can be confounded by the "information effect," where a central bank's policy action reveals private information about the economic outlook. To solve this, the paper uses the methodology of Jarociński & Karadi (2020). Raw surprises in interest rates ($s^{rate}$) and equity prices ($s^{equity}$) are assumed to be linear combinations of two structural shocks: a pure monetary policy shock ($\varepsilon^{MP}$) and an information shock ($\varepsilon^{INFO}$).
$$ \begin{bmatrix} s^{rate}_t \\ s^{equity}_t \end{bmatrix} = A \begin{bmatrix} \varepsilon^{MP}_t \\ \varepsilon^{INFO}_t \end{bmatrix} $$
The identification of the mixing matrix `A` is achieved by finding all rotations of an initial Cholesky decomposition that satisfy a set of theoretical sign restrictions on the impulse responses (e.g., a contractionary MP shock must raise rates and lower equity prices).

**2. Bayesian Vector Autoregression (BVAR):**
The primary workhorse model is a VAR-X, where the identified shocks are treated as exogenous variables. The model for a vector of endogenous variables $Y_t$ is:
$$ Y_t = c + \sum_{i=1}^{p} B_i Y_{t-i} + \Gamma_1 s_t^{ECB} + \Gamma_2 s_t^{Fed} + D_t + e_t $$
The model is estimated using Bayesian methods with a Normal-Wishart prior and a Minnesota-style specification for the prior hyperparameters. Inference is conducted by drawing from the posterior distribution using a Gibbs sampler.

**3. Local Projections (LP):**
As a robustness check, the impulse responses are also estimated using the Local Projection method of Jordà (2005). This involves running a separate regression for each forecast horizon `h`:
$$ y_{k, t+h} = \beta_h^{Shock} s_t^{Shock} + \text{controls}_t + \epsilon_{t+h} $$
The sequence of estimated coefficients $\{\hat{\beta}_h^{Shock}\}_{h=0}^H$ forms the impulse response function. This method is robust to misspecification but less efficient than a VAR. Inference requires HAC (Newey-West) standard errors.

## Features

The provided iPython Notebook (`between_transatlantic_monetary_disturbances_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Phase Architecture:** The entire pipeline is broken down into 17 distinct, modular tasks, from data validation to final robustness checks.
-   **Configuration-Driven Design:** All methodological and computational parameters are managed in an external `config.yaml` file, allowing for easy customization without code changes.
-   **Professional-Grade Data Pipeline:** A comprehensive validation, quality assessment, and cleansing suite for both high-frequency and low-frequency data, including robust handling of timezones and DST.
-   **High-Fidelity Shock Identification:** A precise, vectorized implementation of the rotational-angle decomposition method.
-   **Robust BVAR Estimation:** A complete Gibbs sampler for a BVAR with a Normal-Wishart prior, including intra-run convergence diagnostics.
-   **Complete Local Projections Estimator:** A full implementation of the LP method with HAC-robust standard errors.
-   **Advanced Robustness Toolkit:**
    -   A framework for testing alternative identification schemes (Poor Man's Sign Restriction).
    -   A parallelized framework for quantifying identification uncertainty by integrating over all admissible rotations.
    -   A framework for testing sensitivity to alternative sample periods (pre-GFC, pre-COVID).
    -   A framework for testing sensitivity to estimation choices (prior hyperparameters, lag length).

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Validation & Preprocessing (Tasks 1-3):** Ingests and rigorously validates all raw data and the `config.yaml` file, performs a deep data quality audit, and produces clean, analysis-ready data streams.
2.  **Shock Identification (Tasks 4-6):** Defines event windows, extracts high-frequency prices, calculates raw surprises, and performs the rotational decomposition to identify structural shocks.
3.  **Model Preparation (Tasks 7-8):** Aggregates the identified shocks to a monthly frequency and assembles the final, transformed dataset for econometric modeling.
4.  **Estimation (Tasks 9-11):** Sets up and estimates the baseline BVAR via Gibbs sampling and the Local Projections model via OLS with HAC errors.
5.  **Results & Validation (Tasks 12-14):** Calculates impulse response functions from the BVAR posterior and runs a full suite of in-sample and out-of-sample validation tests.
6.  **Robustness Analysis (Tasks 16-17):** Orchestrates the entire suite of robustness checks on the identification and estimation methods.

## Core Components (Notebook Structure)

The `between_transatlantic_monetary_disturbances_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: execute_full_study_pipeline

The central function in this project is `execute_full_study_pipeline`. It orchestrates the entire analytical workflow, providing a single entry point for running the baseline study and all associated robustness checks.

```python
def execute_full_study_pipeline(
    equity_tick_df: pd.DataFrame,
    rate_tick_df: pd.DataFrame,
    macro_df: pd.DataFrame,
    announcement_df: pd.DataFrame,
    target_market: str,
    study_config: Dict[str, Any],
    run_identification_robustness: bool = True,
    run_estimation_robustness: bool = True
) -> Dict[str, Any]:
    """
    Executes the entire research study, including the main analysis and all robustness checks.
    """
    # ... (implementation is in the notebook)
```

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `statsmodels`, `pyyaml`, `tqdm`, `joblib`, `pandas_market_calendars`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances.git
    cd between_transatlantic_monetary_disturbances
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy statsmodels pyyaml tqdm joblib pandas_market_calendars
    ```

## Input Data Structure

The pipeline requires four `pandas.DataFrame`s and a configuration file as input. Mock data generation functions are provided in the main notebook to create valid examples for testing.
1.  **`equity_tick_df` / `rate_tick_df`:** Must contain columns `['timestamp_micros_utc', 'price', 'volume', 'type']`.
2.  **`macro_df`:** A long-format DataFrame with columns `['date', 'source_series_id', 'country', 'variable_name', 'value_raw']`.
3.  **`announcement_df`:** Must contain columns `['event_id', 'central_bank', 'announcement_date_local', 'announcement_time_local', 'local_timezone']`.

## Usage

The `between_transatlantic_monetary_disturbances_draft.ipynb` notebook provides a complete, step-by-step guide. The core workflow is:

1.  **Prepare Inputs:** Load your four raw `pandas.DataFrame`s. Ensure the `config.yaml` file is present in the same directory.
2.  **Execute Pipeline:** Call the grand orchestrator function.

    ```python
    # This single call runs the entire project.
    final_results = execute_full_study_pipeline(
        equity_tick_df=my_equity_data,
        rate_tick_df=my_rate_data,
        macro_df=my_macro_data,
        announcement_df=my_announcement_data,
        target_market='CAN',
        study_config=my_config_dict,
        run_identification_robustness=False,  # Set to True for the full analysis
        run_estimation_robustness=False
    )
    ```
3.  **Inspect Outputs:** The returned `final_results` dictionary contains all generated artifacts, including intermediate data, final IRFs, and validation reports.

## Output Structure

The `execute_full_study_pipeline` function returns a single, comprehensive dictionary containing all generated artifacts, structured by analytical phase. Key outputs include:
-   `benchmark_run`: The results of the main analysis.
    -   `phase_2_identification['structural_shocks']`: The identified monthly shock series.
    -   `phase_3_model_prep['analysis_ready_df']`: The final dataset used for estimation.
    -   `phase_5_results['bvar_irfs']`: The final impulse response functions from the BVAR.
    -   `phase_5_results['model_validation_reports']`: The full suite of diagnostic reports.
-   `identification_robustness_suite`: (If run) Contains the results of the PMSR, rotational uncertainty, and sub-sample analyses.
-   `estimation_robustness_suite`: (If run) Contains the results of the prior, lag, and specification sensitivity analyses.

## Project Structure

```
between_transatlantic_monetary_disturbances/
│
├── between_transatlantic_monetary_disturbances_draft.ipynb   # Main implementation notebook
├── config.yaml                                               # Master configuration file
├── requirements.txt                                          # Python package dependencies
├── LICENSE                                                   # MIT license file
└── README.md                                                 # This documentation file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can easily modify all methodological parameters, such as BVAR lags, prior hyperparameters, MCMC settings, and window definitions, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Visualization Module:** Creating a function that takes the final IRF results and automatically generates publication-quality plots that replicate the figures in the paper.
-   **Automated Reporting:** Building a module that uses the generated results and validation reports to automatically create a full PDF or HTML summary report of the findings.
-   **Alternative Priors:** Implementing other BVAR prior structures, such as the Independent Normal-Wishart prior or stochastic volatility.
-   **Structural VAR Identification:** Adding modules for other SVAR identification schemes, such as Cholesky or long-run restrictions, for comparison.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{camara2025inbetween,
  title={{In-between Transatlantic (Monetary) Disturbances}},
  author={Camara, Santiago and Aublin, Jeanne},
  journal={arXiv preprint arXiv:2509.13578},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A High-Resolution Framework for Analyzing Transatlantic Monetary Spillovers.
GitHub repository: https://github.com/chirindaopensource/between_transatlantic_monetary_disturbances
```

## Acknowledgments

-   Credit to **Santiago Camara and Jeanne Aublin** for their foundational research, which forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, Statsmodels, and Joblib**, whose work makes complex computational analysis accessible and robust.

--

*This README was generated based on the structure and content of `between_transatlantic_monetary_disturbances_draft.ipynb` and follows best practices for research software documentation.*

# Paper

Title: "*In-between Transatlantic (Monetary) Disturbances*"

Authors: Santiago Camara, Jeanne Aublin

E-Journal Submission Date: 16 September 2025

Link: https://arxiv.org/abs/2509.13578

Abstract:

This paper studies the spillovers of European Central Bank (ECB) interest rate shocks into the Canadian economy and compares them with those of the U.S. Federal Reserve (Fed). We combine a VAR model and local projection regressions with identification strategies that explicitly purge information effects around policy announcements. We find that an ECB rate hike leads to a depreciation of the Canadian dollar and a sharp contraction in economic activity. The main transmission channel is international trade: ECB shocks trigger a decline in oil prices and exports, while leaving domestic financial conditions largely unaffected. By contrast, Fed shocks tighten Canadian financial conditions significantly, with more limited effects on trade flows. These findings show that Canada is exposed to foreign monetary policy both directly and indirectly, through its integration in global financial and trade markets.


Keywords: Monetary policy; Canadian Economy; Federal Reserve; European Central Bank; International Spillovers; Exchange Rates, International Spillovers; “Central
Bank Information”. JEL Codes: F40, F41, E44, E51.


# Summary

Here is a step-by-step summary of the paper "In-between Transatlantic (Monetary) Disturbances" by Aublin and Camara.

**

### **The Central Research Question and Motivation**

The paper seeks to answer a fundamental question for small, open economies: How are they affected by monetary policy shocks from different major economic blocs?

Specifically, it investigates the spillovers of monetary policy from the **European Central Bank (ECB)** to the Canadian economy and systematically compares them to the well-documented spillovers from the **U.S. Federal Reserve (Fed)**. The motivation is to challenge the conventional "U.S.-centric" view of Canada's external environment, which often models the rest of the world using U.S. variables alone. The authors hypothesize that this view is incomplete and that shocks from other major players like the ECB transmit through different, yet equally important, channels.

### **The Core Methodological Engine**

The authors' empirical strategy is sophisticated and rests on two key pillars, which are crucial for the credibility of their findings.

**Pillar A: The Identification Strategy**
This is the most critical piece of their methodology. They need to isolate *pure* monetary policy shocks—unexpected changes in the policy rate—from other news revealed during central bank announcements.

1.  **The Problem:** A central bank rate hike could be a pure tightening action, or it could be a reaction to positive private information the bank has about future economic growth (an "information effect"). A standard high-frequency surprise, which just measures the change in asset prices around an announcement, conflates these two.
2.  **The Solution:** They employ the high-frequency identification strategy pioneered by Jarociński and Karadi (2020). The logic is as follows:
    *   A **pure contractionary monetary policy shock** should raise interest rates but lower equity prices (as future profits are discounted more heavily and economic activity is expected to slow). This implies a *negative* correlation between interest rate surprises and equity surprises.
    *   An **information shock** (e.g., the central bank signals a stronger economy) should cause both interest rates and equity prices to rise. This implies a *positive* correlation.
3.  **The Technique:** Using high-frequency data on interest rate and stock index futures around policy announcements, they use a "rotational-angle decomposition" with sign restrictions to separate the raw surprises into these two orthogonal shocks: a pure monetary policy shock and an information shock. Their benchmark results use the median rotation that satisfies these restrictions.

**Pillar B: The Estimation Framework**
Once the shocks are identified, the authors estimate their dynamic effects on the Canadian economy using two complementary econometric models to ensure robustness:

1.  **Bayesian Vector Autoregression (BVAR):** A standard, efficient tool in macroeconomics for estimating impulse response functions (IRFs). It models the joint dynamics of a system of variables.
2.  **Local Projections (LP):** A more flexible, single-equation approach that is less prone to misspecification bias than VARs, though often less precise.

Using both methods allows them to check if their results are an artifact of a specific modeling choice.

### **The Benchmark Empirical Findings**

The paper's central results reveal a stark asymmetry in how ECB and Fed shocks affect Canada.

*   **Response to an ECB Rate Hike (Contractionary Shock):**
    *   **Exchange Rate:** The Canadian dollar (CAD) depreciates significantly against the euro.
    *   **Domestic Policy:** The Bank of Canada *eases* policy, lowering its own interest rate to cushion the blow.
    *   **Real Economy:** Canadian GDP contracts sharply and immediately.
    *   **Financial Conditions:** Domestic financial conditions are largely unaffected or even ease slightly.

*   **Response to a Fed Rate Hike (Contractionary Shock):**
    *   **Exchange Rate:** The CAD depreciates, but much less than in the ECB case.
    *   **Domestic Policy:** The Bank of Canada *tightens* policy, raising its interest rate in lockstep with the Fed.
    *   **Real Economy:** Canadian GDP contracts more slowly, but the contraction is more persistent and lasts longer.
    *   **Financial Conditions:** Canadian financial conditions tighten significantly. Equity markets fall sharply and remain depressed.

In short, ECB shocks look like a foreign *real* shock to Canada, while Fed shocks act like a domestic *financial* shock.

### **Decomposing the Transmission Channels**

The authors then dig deeper to understand *why* these responses are so different.

*   **The ECB Channel: Trade and Commodities**
    *   An ECB tightening reduces demand in the large and open Euro Area, which lowers global commodity prices, particularly **oil**.
    *   The fall in oil prices directly hits Canada's terms of trade, oil production, and oil exports, leading to the immediate contraction in GDP.
    *   The effects are concentrated in commodity-related sectors. Non-oil exports are less affected.
    *   Interestingly, the paper shows that this effect is largely indirect: the ECB shock slows the U.S. economy, which in turn reduces U.S. demand for Canadian exports.

*   **The Fed Channel: Financial Linkages**
    *   Due to the deep integration of North American financial markets, a Fed tightening directly spills over into Canadian financial conditions.
    *   The Bank of Canada is compelled to follow the Fed to manage capital flows and exchange rate pressures.
    *   This leads to higher government bond yields, higher mortgage rates, and falling house prices in Canada.
    *   The economic contraction is therefore broad-based, affecting credit-sensitive sectors like construction and durable goods, consistent with a domestic financial tightening.

### **Robustness and Validation**

The paper conducts a battery of tests to confirm its findings are not spurious.

1.  **Alternative Identification:** They show the results hold using a simpler "Poor Man's Sign Restriction" method.
2.  **Bias from Ignoring Information:** They explicitly show that using the raw, conflated high-frequency surprise (as older studies did) produces biased and misleading results. For instance, it would wrongly suggest an ECB hike is *expansionary* for Canadian GDP on impact.
3.  **Sample Period:** The results are robust to excluding the volatile COVID-19 period.
4.  **Identification Uncertainty:** The findings hold even when considering the full range of possible shock decompositions ("admissible rotations"), not just the median one.

### **Conclusion and Implications**

The paper concludes with powerful implications for both researchers and policymakers:

*   **For Researchers:** The standard U.S.-centric model of the global economy is insufficient, even for a country as closely tied to the U.S. as Canada. The source of a global shock matters immensely, as different central banks transmit their policy through distinct channels (real vs. financial).
*   **For Canadian Policymakers:** The challenge of conducting monetary policy is multifaceted. Canada is caught "in-between" two types of transatlantic disturbances. It is exposed to **global demand and commodity shocks** originating from Europe and to **direct financial shocks** from the United States. The appropriate domestic policy response depends critically on diagnosing the origin of the external shock.

# Critique

## Critical Assessment and Implementation Implications

From a methodological standpoint, this work represents significant advancement in several dimensions:

* **Identification Innovation**: The rotational-angle decomposition addresses a fundamental confound in monetary policy identification. The demonstration that standard high-frequency identification conflates policy and information effects has broad implications for the spillovers literature.

* **Multi-source spillover framework**: Moving beyond US-centric analysis of Canadian external dependence represents an important theoretical and empirical contribution.

* **Channel decomposition**: The systematic comparison of trade vs. financial transmission mechanisms provides actionable insights for both researchers and policymakers.

However, several methodological considerations merit attention:

* **Sample limitations**: Monthly frequency may mask higher-frequency adjustment dynamics, particularly in financial markets.

* **Linear specification**: The VAR/LP framework assumes linear responses, potentially missing state-dependent transmission (e.g., during financial stress periods).

* **Exchange rate endogeneity**: While the authors control for policy responses, exchange rate movements themselves may feedback to affect the transmission of foreign shocks.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  High-Resolution Identification and Analysis of Transatlantic Monetary Spillovers
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "In-between Transatlantic (Monetary)
#  Disturbances" by Santiago Camara and Jeanne Aublin (2025). It delivers a
#  computationally intensive, high-fidelity system for identifying source-
#  dependent monetary policy shocks and tracing their international spillovers
#  through distinct trade and financial channels.
#
#  Core Methodological Components:
#  • High-frequency identification of monetary policy and information shocks
#  • Rotational-angle decomposition with sign restrictions for shock separation
#  • Bayesian Vector Autoregression (BVAR) with Minnesota-style Normal-Wishart priors
#  • Gibbs sampling for posterior inference of model parameters
#  • Local Projections (Jordà, 2005) for robust impulse response estimation
#  • Comprehensive robustness checks for identification, sample period, and priors
#
#  Technical Implementation Features:
#  • Granular high-frequency data processing and cleansing pipeline
#  • Robust time-series validation and anomaly detection
#  • Explicit handling of timezone conversions and Daylight Saving Time
#  • Vectorized, high-performance numerical linear algebra for identification
#  • Modular, multi-phase orchestration for full pipeline reproducibility
#  • Extensive logging, auditing, and structured results management
#
#  Paper Reference:
#  Camara, S., & Aublin, J. (2025). In-between Transatlantic (Monetary)
#  Disturbances. arXiv preprint arXiv:2509.13578.
#  https://arxiv.org/abs/2509.13578
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# =============================================================================
# STANDARD LIBRARIES
# =============================================================================
import copy
import logging
import time
import warnings
from datetime import datetime, time

# =============================================================================
# THIRD-PARTY LIBRARIES
# =============================================================================

# Core data manipulation and numerical computation
import numpy as np
import pandas as pd

# Timezone and market calendar handling
import pandas_market_calendars as mcal
import pytz

# Statistical modeling and econometrics
import statsmodels.api as sm
from scipy import stats
from scipy.stats import f, multivariate_normal, t
from statsmodels.regression.linear_model import OLS
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.stats.sandwich_covariance import cov_hac
from statsmodels.tools.tools import add_constant

# Parallel processing and progress monitoring
from joblib import Parallel, delayed
from tqdm import tqdm

# =============================================================================
# TYPE HINTING
# =============================================================================
from typing import (Any, Dict, List, NoReturn, Optional, Set, Tuple, Union)


# Implementation

## Draft 1

### **Explication of Key Callables**

#### **1. `run_phase1_task1_validation`**

*   **Inputs:** Four raw `pd.DataFrame`s (`equity_tick_df`, `rate_tick_df`, `macro_df`, `announcement_df`), the `target_market` string, and the `study_config` dictionary.
*   **Processes:** This function orchestrates the foundational integrity checks of the entire project.
    1.  It calls `validate_study_configuration` to perform a deep validation of all critical parameters in the `study_config` dictionary against the paper's specified values (e.g., BVAR lags, prior hyperparameters).
    2.  It calls `validate_dataframe_schema` for each of the four input DataFrames to ensure their columns, data types (including timezone awareness), and categorical levels exactly match the required schema for the analysis.
    3.  It calls the remediated `validate_cross_dataset_alignment` to perform critical cross-dataset checks: ensuring the target market exists, that all announcement events are temporally covered by the high-frequency data (with a 24-hour buffer), and that the macroeconomic series for the target market do not have debilitating data gaps (> 3 months).
*   **Outputs:** A dictionary of `pd.DataFrame` reports detailing the results of every validation check. The function's primary purpose is to halt execution via a specific exception (`ConfigurationValidationError`, `SchemaError`, `ValueError`) if any critical check fails.
*   **Role in Research Pipeline:** This callable is the **Gatekeeper**. It ensures that the raw data and configuration are of sufficient quality and consistency to even begin the analysis. It programmatically enforces the basic data assumptions made on pages 6 ("Dataset") and 9 ("Bayesian VARs") of the paper, preventing the propagation of errors from corrupted or misconfigured inputs.

#### **2. `run_phase1_task2_quality_assessment`**

*   **Inputs:** The four raw `pd.DataFrame`s.
*   **Processes:** This function orchestrates a deep, statistical quality assessment of the raw data.
    1.  It calls `detect_tick_data_anomalies` to screen the high-frequency data for duplicates, price outliers (using the Modified Z-score), volume anomalies, and timestamp gaps.
    2.  It calls `verify_announcement_metadata` to check the integrity of the event schedule, validating timezone strings, checking for events on non-business days, and ensuring chronological consistency.
    3.  It calls the remediated `assess_macro_series_quality` to perform statistical checks on each macroeconomic time series, including a statistically correct Chow test for structural breaks and a Hampel filter for outliers.
*   **Outputs:** A dictionary of `pd.DataFrame` reports containing detailed, observation-level anomaly flags and per-series quality metrics.
*   **Role in Research Pipeline:** This callable is the **Microscope**. It moves beyond schema validation to inspect the statistical properties of the data itself. It is the practical implementation of the data scrutiny that is implicit in any high-quality empirical study, ensuring that the data used for the core analysis is clean and free of anomalies that could bias the results.

#### **3. `run_phase1_task3_preprocessing`**

*   **Inputs:** The four raw `pd.DataFrame`s, the anomaly flag reports from the previous task, and the `target_market` string.
*   **Processes:** This orchestrator manages the initial cleaning and structuring of the data.
    1.  It calls `cleanse_tick_data` to filter the raw tick data, removing duplicates, non-trade ticks, low-volume trades, and identified outliers.
    2.  It calls `standardize_announcement_timestamps` to convert all local announcement times to robust, unambiguous UTC timestamps.
    3.  It calls the amended `prepare_macroeconomic_data` to transform the raw long-format macro data into a clean, wide-format panel, indexed by date, with all series in their original levels. This includes limited interpolation and the exclusion of series with large gaps.
*   **Outputs:** A dictionary containing the cleansed DataFrames and their associated audit trails. The key output is the `prepared_macro_df_levels`.
*   **Role in Research Pipeline:** This callable is the **Refinery**. It takes the validated raw materials and transforms them into a set of clean, consistently structured, and aligned datasets that are the direct inputs for the core economic analysis. It separates the universal cleaning process from the model-specific transformations that come later.

#### **4. `run_phase2_task4_data_extraction`**

*   **Inputs:** The `clean_announcement_df`, `clean_equity_df`, `clean_rate_df`, and the `study_config`.
*   **Processes:** This function orchestrates the core high-frequency event study data extraction.
    1.  It calls `construct_event_windows` to define the precise `t_before` and `t_after` UTC timestamps for the analysis window around each announcement, correctly merging any overlapping windows.
    2.  It calls `extract_window_prices` twice (once for equity, once for rates) to find the last traded price before the window and the first *stable* price after the window, using a robust multi-tiered fallback search and an iterative stabilization check.
*   **Outputs:** A dictionary containing the event window definitions and the detailed price extraction results (including prices and metadata) for both equity and rate instruments.
*   **Role in Research Pipeline:** This callable implements the **Event Window Measurement**. It is the direct, high-fidelity implementation of the data extraction process required for the high-frequency identification strategy described on page 7 of the paper. It turns a list of event times into a set of pre- and post-event prices, which are the fundamental inputs for calculating surprises.

#### **5. `run_phase2_task5_surprise_calculation`**

*   **Inputs:** The price extraction results for equity and rates, and the `clean_announcement_df`.
*   **Processes:** This orchestrator converts the extracted prices into reduced-form surprises.
    1.  It calls the remediated `calculate_rate_surprises`, which uses an explicit configuration to apply the correct, instrument-specific formula (e.g., for `100 - R` contracts) to calculate the interest rate surprise in basis points.
    2.  It calls `calculate_equity_surprises` to compute the equity surprise using the log-return formula: $s^{equity}_{t} = \ln\left(\frac{P_{after}}{P_{before}}\right) \times 100$.
    3.  It calls `construct_surprise_vectors` to assemble these into 2x1 vectors, $S_t = \begin{bmatrix} s^{rate}_{t} \\ s^{equity}_{t} \end{bmatrix}$, and applies a 5-sigma outlier filter on a per-bank basis.
*   **Outputs:** A dictionary containing the final `(N, 2)` NumPy arrays of surprise vectors for each central bank, along with any identified outliers.
*   **Role in Research Pipeline:** This callable is the **Surprise Generator**. It implements the first step of the identification strategy on page 7, converting raw price changes into the economically meaningful surprise vectors that form the basis for the rotational decomposition.

#### **6. `run_phase2_task6_shock_identification`**

*   **Inputs:** The dictionary of surprise vectors and the `study_config`.
*   **Processes:** This is the core identification orchestrator. It calls `identify_structural_shocks`, which performs the full rotational-angle decomposition.
    1.  It generates a grid of rotation matrices $Q(\theta) = \begin{bmatrix} \cos(\theta) & -\sin(\theta) \\ \sin(\theta) & \cos(\theta) \end{bmatrix}$.
    2.  For each angle, it generates candidate structural shocks $\epsilon(\theta) = Q(\theta) S^T$.
    3.  It calls the remediated `_find_admissible_rotations` helper to check each angle against the sign restrictions (e.g., $\rho(\varepsilon^{MP}(\theta), s^{rate}) > 0$ and $\rho(\varepsilon^{MP}(\theta), s^{equity}) < 0$) to find the set of admissible angles.
    4.  It calculates the median of the admissible angles (handling circularity) and uses this to generate the final, normalized benchmark structural shock series.
*   **Outputs:** A dictionary containing the final, identified structural shock series for each central bank, along with metadata about the identification process (e.g., the set of admissible angles).
*   **Role in Research Pipeline:** This callable is the **Shock Disentangler**. It is the direct implementation of the sophisticated identification strategy from Jarociński & Karadi (2020) described on page 7 of the paper. Its purpose is to solve the "central bank information" puzzle by separating the raw surprises into a pure monetary policy shock ($\varepsilon^{MP}$) and an orthogonal information shock ($\varepsilon^{INFO}$).

#### **7. `run_phase3_task7_temporal_aggregation`**

*   **Inputs:** The identified structural shocks and the `clean_announcement_df`.
*   **Processes:** This orchestrator bridges the frequency gap between the high-frequency identification and the monthly models.
    1.  It calls the remediated `map_events_to_months` to assign each event to a specific calendar month, correctly implementing the "> 15th day" rule and the December edge case.
    2.  It calls the remediated `aggregate_shocks_to_monthly` to sum all shocks within each assigned month, $\varepsilon^{MP}_{m} = \sum_{t \in \text{month } m} \varepsilon^{MP}_{t}$, and to create a dense time series where non-announcement months have a shock value of zero.
    3.  It calls `validate_monthly_shock_series` to perform statistical checks (zero mean, no autocorrelation) on the final monthly series.
*   **Outputs:** A dictionary containing the final monthly shock `pd.DataFrame` and a statistical validation report.
*   **Role in Research Pipeline:** This callable is the **Frequency Bridge**. It converts the event-dated shock series into a monthly time series suitable for inclusion as an exogenous regressor in the VAR and LP models, as specified on page 9 ("...include the identified ECB and Fed monetary policy shocks as exogenous regressors.").

#### **8. `run_phase3_task8_final_data_assembly`**

*   **Inputs:** The `prepared_macro_df` (in levels), the `monthly_shocks_df`, and the `study_config`.
*   **Processes:** This orchestrator was refactored. Its new, correct role is to perform the final model-specific transformations and assemble the complete dataset.
    1.  It calls `transform_core_macro_variables` to apply log transformations ($y_t = 100 \times \ln(x_t)$) and calculate any necessary cross-rates.
    2.  It calls `construct_control_variables` to generate the time trend and COVID-19 dummy.
    3.  It calls the remediated `assemble_final_dataset` to merge the transformed macro data, the monthly shocks, and the control variables using the robust `outer join -> fill -> drop` procedure.
*   **Outputs:** The final, complete, analysis-ready `pd.DataFrame`.
*   **Role in Research Pipeline:** This callable is the **Model Assembler**. It creates the final, canonical dataset that is the direct input to all subsequent econometric models, ensuring all variables are correctly transformed and aligned.

#### **9. `run_phase4_task9_bvar_setup`**

*   **Inputs:** The `analysis_ready_df` and the `study_config`.
*   **Processes:** This orchestrator prepares the inputs for the BVAR estimation.
    1.  It calls `create_var_matrices` to transform the analysis DataFrame into the `Y` (endogenous) and `X` (regressors, including lags) NumPy matrices.
    2.  It calls `construct_minnesota_prior` to translate the high-level hyperparameters from the config into the precise prior mean vector ($\beta_0$) and covariance matrix ($V_0$) for the coefficients, and the parameters ($\nu_0, S_0$) for the Inverse-Wishart prior on the error covariance.
*   **Outputs:** A dictionary containing the `Y` and `X` matrices, the `priors` dictionary, and other model metadata.
*   **Role in Research Pipeline:** This callable is the **BVAR Architect**. It implements the specific model and prior structure described on page 9 ("Bayesian VARs"), including the Normal-Wishart prior with Minnesota-style hyperparameters ($\lambda_1=0.1$, etc.).

#### **10. `run_bvar_gibbs_sampler`**

*   **Inputs:** The `bvar_setup` dictionary and the `study_config`.
*   **Processes:** This is a core computational engine, not an orchestrator. It performs the Gibbs sampling algorithm to draw from the posterior distribution of the BVAR parameters by iteratively drawing from the two conditional posteriors:
    1.  Draw $\Sigma | Y, X, B \sim \text{Inverse-Wishart}(\nu_0+T, S_0 + U'U)$.
    2.  Draw $\text{vec}(B) | Y, X, \Sigma \sim \mathcal{N}(\bar{\beta}, \bar{V})$.
    It runs for the specified number of draws, discards the burn-in, and includes the remediated intra-run Geweke convergence diagnostic.
*   **Outputs:** A dictionary containing the posterior draws for the coefficient matrices (`B_draws`) and the error covariance matrices (`Sigma_draws`).
*   **Role in Research Pipeline:** This callable is the **Bayesian Engine**. It is the direct implementation of the MCMC estimation procedure required to fit the BVAR model as described on page 9.

#### **11. `run_phase4_task11_local_projections`**

*   **Inputs:** The `analysis_ready_df` and the `study_config`.
*   **Processes:** This orchestrator estimates the Local Projections model. It calls `estimate_local_projections`, which loops through each endogenous variable and each forecast horizon `h`. In each loop, it:
    1.  Constructs the regression $y_{t+h} = \beta_h \text{shock}_t + \text{controls}_t + \epsilon_{t+h}$.
    2.  Estimates the equation via OLS.
    3.  Calculates Newey-West HAC standard errors to account for the serial correlation induced by the overlapping horizons.
*   **Outputs:** A dictionary containing the impulse response functions (the sequence of $\hat{\beta}_h$) and their HAC-robust confidence intervals.
*   **Role in Research Pipeline:** This callable is the **Robustness Estimator**. It implements the Local Projections method of Jordà (2005), which is used as the primary robustness check for the BVAR impulse responses, as described on page 10 ("Local projections").

#### **12. `calculate_var_impulse_responses`**

*   **Inputs:** The `posterior_draws` and `bvar_setup` dictionaries, and the `study_config`.
*   **Processes:** This function translates the BVAR posterior draws into impulse responses. For each of the 15,000 posterior draws of the coefficients, it:
    1.  Constructs the VAR's companion matrix `F`.
    2.  Recursively calculates the impulse response functions up to the specified horizon using matrix powers: $\Psi_h = J F^h J' B_0$.
    3.  It then summarizes the resulting 15,000 IRF draws by calculating their posterior median (the point estimate) and percentiles (the credible interval).
*   **Outputs:** A dictionary containing the summarized IRFs (median, lower/upper bounds) and a significance indicator.
*   **Role in Research Pipeline:** This callable is the **Dynamic Interpreter**. It takes the static BVAR coefficient estimates and translates them into the key objects of interest for the paper's analysis: the dynamic impulse response functions that trace the effect of monetary policy shocks over time.

#### **13. `run_transmission_channel_analysis`**

*   **Inputs:** The `full_analysis_df` (containing all possible variables), the `study_config`, and a structured `channel_specifications` list.
*   **Processes:** This is a high-level "campaign manager" orchestrator. It iterates through the provided specifications. For each, it creates an augmented set of endogenous variables (either by adding a group or by iterating one-by-one) and re-runs the entire BVAR pipeline (Tasks 9, 10, 12) on this new model specification.
*   **Outputs:** A dictionary where keys are the channel names (e.g., `financial_channel`, `oil_exports_channel`) and values are the full IRF results for that specific augmented model.
*   **Role in Research Pipeline:** This callable is the **Channel Investigator**. It directly implements the analysis described in Section 4 of the paper, where the baseline model is augmented with trade, oil, and financial variables to investigate the specific transmission channels of the monetary policy shocks.

#### **14. `run_full_model_validation_suite`**

*   **Inputs:** The `analysis_ready_df`, the estimated BVAR `bvar_setup` and `posterior_draws`, and the `study_config`.
*   **Processes:** This orchestrator runs a suite of diagnostic and validation checks.
    1.  It calls `assess_in_sample_fit` to compute RMSE and the Log Marginal Likelihood for the BVAR, and compares the BVAR's in-sample fit to the LP's using a Diebold-Mariano test.
    2.  It calls the remediated `evaluate_forecasting_performance_full` to conduct a rigorous, computationally intensive, recursive out-of-sample forecasting horse race between the BVAR and LP models.
    3.  It orchestrates the re-estimation of the BVAR for multiple lag lengths to compute information criteria (AIC, BIC, HQ) for lag selection.
*   **Outputs:** A dictionary of `pd.DataFrame` reports summarizing all in-sample, out-of-sample, and diagnostic test results.
*   **Role in Research Pipeline:** This callable is the **Skeptic**. Its purpose is to rigorously challenge the validity of the baseline model by assessing its fit, its forecasting performance, and the appropriateness of its specification choices, as is standard practice in any high-quality empirical paper.

#### **15. `run_transatlantic_spillovers_analysis` (Master Orchestrator)**

*   **Inputs:** The four raw `pd.DataFrame`s, the `target_market` string, and the `study_config`.
*   **Processes:** This is the top-level master orchestrator. It executes the entire analytical workflow in the correct, logical sequence by calling the task-specific orchestrators from Task 1 through Task 14. Its refactored design ensures a clean separation of concerns between data cleaning (Phase 1) and model-specific data transformation (Phase 3).
*   **Outputs:** A single, comprehensive, nested dictionary containing all inputs, intermediate data products, final results, and validation reports generated during the entire pipeline run.
*   **Role in Research Pipeline:** This callable is the **Director**. It is the single entry point that executes the entire research paper from start to finish, ensuring perfect reproducibility.

#### **16. `run_identification_robustness_suite`**

*   **Inputs:** The results object from a benchmark run and the original raw data.
*   **Processes:** This orchestrator manages the suite of robustness checks related to the shock identification.
    1.  It calls `run_pmsr_robustness_analysis` to re-run the pipeline using the simpler PMSR identification.
    2.  It calls `run_rotational_uncertainty_analysis` to re-run the pipeline 1000s of times in parallel for different valid rotation angles to quantify identification uncertainty.
    3.  It calls `run_alternative_sample_analysis` to re-run the entire pipeline on truncated datasets (e.g., pre-COVID).
*   **Outputs:** A dictionary containing the detailed results from each of the three identification robustness analyses.
*   **Role in Research Pipeline:** This callable is the **Identification Validator**. It directly implements the robustness checks described in Section 5 of the paper, which are designed to ensure the main conclusions are not artifacts of the specific identification scheme or sample period.

#### **17. `run_estimation_robustness_suite`**

*   **Inputs:** The `analysis_ready_df` and the `study_config`.
*   **Processes:** This orchestrator manages the suite of robustness checks related to the BVAR model's specification.
    1.  It calls `run_prior_sensitivity_analysis` to re-estimate the model with different prior hyperparameter values.
    2.  It calls `run_lag_length_robustness_analysis` to re-estimate the model with different lag lengths.
    3.  It calls `run_specification_robustness_analysis` to re-estimate the model with different data transformations or deterministic components.
*   **Outputs:** A dictionary containing the detailed results from each of the three estimation robustness analyses.
*   **Role in Research Pipeline:** This callable is the **Model Validator**. It implements the final set of robustness checks, ensuring the results are not overly sensitive to specific modeling choices like the tightness of the prior or the number of lags included.


<br> <br>

### **Usage Example**

### **Example Usage of the End-to-End Pipeline**

This document provides a complete, implementation-grade example of how to use the top-level orchestrator function, `execute_full_study_pipeline`. It covers the setup of all required inputs and the interpretation of the structured output.

#### **Prerequisites**

Before executing the pipeline, ensure the following conditions are met:

1.  **Environment:** A Python environment with all necessary libraries installed is active. The consolidated import block provided previously contains the full list of dependencies (e.g., `pandas`, `numpy`, `pyyaml`, `statsmodels`, `joblib`, `tqdm`, `pandas_market_calendars`).
2.  **Code Availability:** All the modular functions and orchestrators developed and remediated throughout Tasks 1-17 are available in the Python session, either in the same script or imported from their respective modules.
3.  **Configuration File:** The `config.yaml` file, as previously generated, is present in the working directory.

#### **Step 1: Load the Study Configuration from YAML**

The first step in any run is to load the study parameters from the external configuration file. This is a critical best practice that separates the model's specification from its implementation, allowing for easy modification and experimentation without altering the source code.

```python
import yaml
from typing import Dict, Any

# Define the path to the configuration file.
config_path = "config.yaml"

# Open and parse the YAML file into a Python dictionary.
# The `yaml.safe_load` function is used to prevent arbitrary code execution.
with open(config_path, 'r') as f:
    study_config: Dict[str, Any] = yaml.safe_load(f)

# At this point, `study_config` is a nested dictionary containing all
# parameters needed to control the pipeline's execution.
```

#### **Step 2: Prepare Input Data Structures**

The pipeline requires four raw `pandas` DataFrames as input. For this example, we will generate **illustrative** synthetic but structurally correct data for each. In a real-world scenario, this data would be loaded from a database, CSV files, or a data vendor API.

**A. Announcement Metadata (`announcement_df`)**

This DataFrame contains the schedule of monetary policy announcements.

```python
import pandas as pd

# Create a sample DataFrame for two announcements.
announcement_data = {
    'event_id': [101, 201],
    'central_bank': ['FED', 'ECB'],
    'announcement_date_local': pd.to_datetime(['2022-03-16', '2022-03-10']),
    'announcement_time_local': ['14:00:00', '13:45:00'],
    'local_timezone': ['America/New_York', 'Europe/Berlin']
}
announcement_df = pd.DataFrame(announcement_data)

# Ensure categorical data types are correctly set.
announcement_df['central_bank'] = announcement_df['central_bank'].astype('category')
```

**B. High-Frequency Tick Data (`equity_tick_df` and `rate_tick_df`)**

This data represents the raw, tick-by-tick trades for the relevant financial instruments. We generate a small window of data around each announcement.

```python
import numpy as np

# --- Generate synthetic tick data for the FED announcement ---
fed_ann_time_utc = pd.Timestamp('2022-03-16 18:00:00', tz='UTC')
fed_time_range = pd.date_range(
    start=fed_ann_time_utc - pd.Timedelta(hours=1),
    end=fed_ann_time_utc + pd.Timedelta(hours=1),
    freq='1S' # Generate one tick per second for simplicity
)
fed_ticks = pd.DataFrame({
    'timestamp_micros_utc': fed_time_range,
    'price': 100 + np.random.randn(len(fed_time_range)).cumsum() * 0.01,
    'volume': np.random.randint(1, 100, size=len(fed_time_range)),
    'type': 'TRADE'
})

# --- Generate synthetic tick data for the ECB announcement ---
ecb_ann_time_utc = pd.Timestamp('2022-03-10 12:45:00', tz='UTC')
ecb_time_range = pd.date_range(
    start=ecb_ann_time_utc - pd.Timedelta(hours=1),
    end=ecb_ann_time_utc + pd.Timedelta(hours=1),
    freq='1S'
)
ecb_ticks = pd.DataFrame({
    'timestamp_micros_utc': ecb_time_range,
    'price': 98.5 + np.random.randn(len(ecb_time_range)).cumsum() * 0.005,
    'volume': np.random.randint(1, 100, size=len(ecb_time_range)),
    'type': 'TRADE'
})

# For this example, we use the same data for equity and rate futures.
# In a real scenario, these would be two distinct datasets.
equity_tick_df = pd.concat([fed_ticks, ecb_ticks]).reset_index(drop=True)
rate_tick_df = equity_tick_df.copy() # Use a copy
rate_tick_df['price'] = 100 - (rate_tick_df['price'] / 50) # Simulate a rate-like price

# Ensure categorical data types are correctly set.
equity_tick_df['type'] = equity_tick_df['type'].astype('category')
rate_tick_df['type'] = rate_tick_df['type'].astype('category')
```

**C. Macroeconomic Time Series Data (`macro_df`)**

This DataFrame contains the monthly macroeconomic data in a long ("tidy") format.

```python
# Create a monthly date range for the sample period.
date_rng = pd.date_range(start='2020-01-01', end='2023-12-31', freq='MS')
n_months = len(date_rng)

# Generate synthetic data for Canada (the target market) and the US (a foreign country).
macro_data_list = []
for country in ['CAN', 'USA']:
    for var in ['gdp', 'cpi', 'policy_rate', 'equity_index', 'exchange_rate']:
        base_val = 1000 if var == 'gdp' else 100
        data = {
            'date': date_rng,
            'source_series_id': f'{country}_{var.upper()}_M',
            'country': country,
            'variable_name': var,
            'value_raw': base_val + np.random.randn(n_months).cumsum()
        }
        macro_data_list.append(pd.DataFrame(data))

macro_df = pd.concat(macro_data_list, ignore_index=True)

# Ensure categorical data types are correctly set.
macro_df['country'] = macro_df['country'].astype('category')
```

#### **Step 3: Define Other Inputs**

The remaining inputs are the `target_market` string and the boolean flags to control the execution of the computationally intensive robustness checks. For a demonstration, it is prudent to disable them.

```python
# Define the target market for the analysis.
target_market = 'CAN'

# Define flags to control the execution of the robustness suites.
# Set to False for a faster initial run. Set to True for a full, rigorous analysis.
run_identification_robustness = False
run_estimation_robustness = False
```

#### **Step 4: Execute the Full Pipeline**

With all inputs prepared, the entire research pipeline can be executed with a single function call.

```python
# This is the single entry point that runs the entire study.
# It is assumed that `execute_full_study_pipeline` and all its dependencies
# have been defined in the current session.

master_results = execute_full_study_pipeline(
    equity_tick_df=equity_tick_df,
    rate_tick_df=rate_tick_df,
    macro_df=macro_df,
    announcement_df=announcement_df,
    target_market=target_market,
    study_config=study_config,
    run_identification_robustness=run_identification_robustness,
    run_estimation_robustness=run_estimation_robustness
)
```

#### **Step 5: Inspect the Output**

The `master_results` object is a deeply nested dictionary containing every artifact generated during the run. This structure allows for a complete and transparent audit of the entire analysis.

```python
# --- Check the overall status and metadata of the run ---
print("--- Run Metadata ---")
print(f"Final Status: {master_results['metadata']['final_status']}")
print(f"Total Runtime: {master_results['metadata']['total_runtime_seconds']} seconds")
if master_results['metadata']['final_status'] == 'FAIL':
    print(f"Error Message: {master_results['metadata']['error_message']}")

# --- Inspect a key intermediate output: the identified shocks ---
if master_results['metadata']['final_status'] == 'SUCCESS':
    print("\n--- Identified Structural Shocks (FED) ---")
    fed_shocks = master_results['phase_2_identification']['structural_shocks']['FED']
    # In the remediated version, this is a DataFrame indexed by event_id
    print(fed_shocks.head())

    # --- Inspect a key final output: the BVAR impulse responses ---
    print("\n--- BVAR Impulse Response Functions (Median Estimates) ---")
    bvar_irfs_median = master_results['phase_5_results']['bvar_irfs']['irf_median']
    # Shape is (num_endog_vars, num_shocks, horizon + 1)
    print(f"Shape of IRF tensor: {bvar_irfs_median.shape}")

    # Example: Get the response of the first endogenous variable to the first shock
    endog_vars = study_config['bvar_specification']['endogenous_variables']
    shock_vars = [v for v in study_config['bvar_specification']['exogenous_variables'] if 'shock' in v]
    
    print(f"\nResponse of '{endog_vars[0]}' to a '{shock_vars[0]}' shock:")
    print(bvar_irfs_median[0, 0, :])

    # --- Inspect a validation report ---
    print("\n--- Lag Selection Report ---")
    # This report is only generated if the estimation robustness suite is run.
    if 'estimation_robustness_suite' in master_results:
        lag_report = master_results['estimation_robustness_suite']['lag_selection_report']
        print(lag_report)
    else:
        print("Estimation robustness suite was not run.")
```

In [None]:
# Task 1: Parameter and Data Structure Validation

# =============================================================================
# CUSTOM EXCEPTION CLASSES
# =============================================================================

class ConfigurationValidationError(ValueError):
    """
    Custom exception raised for critical errors in the study configuration.

    Purpose:
        This exception is specifically designed to be raised when the main
        `study_configuration` dictionary fails validation checks. It signals
        that the foundational parameters of the analysis are incorrect,
        preventing the pipeline from proceeding with potentially invalid
        assumptions.

    Attributes:
        report (pd.DataFrame): A detailed DataFrame containing the results of
                               every validation check performed on the
                               configuration. This report includes timestamps,
                               the parameter path, expected vs. actual values,
                               and a pass/fail status for each check, providing
                               a comprehensive audit trail for debugging.

    Process:
        The exception is initialized with an error message and the validation
        report DataFrame. It extends Python's built-in `ValueError` and
        overrides the constructor to store the report. The error message
        presented to the user is automatically augmented with a string
        representation of this report, ensuring that the context of the
        failure is immediately clear.
    """
    def __init__(self, message: str, report: pd.DataFrame) -> NoReturn:
        # Store the detailed validation report as an attribute of the exception instance.
        # This allows programmatic access to the validation results in an error handling block.
        self.report: pd.DataFrame = report

        # Construct a comprehensive error message that includes the high-level summary
        # and the full, detailed report for immediate diagnosis.
        full_message: str = f"{message}\nValidation Report:\n{report.to_string()}"

        # Call the parent class (ValueError) constructor to properly initialize the exception.
        super().__init__(full_message)


class SchemaError(ValueError):
    """
    Custom exception raised for DataFrame schema and data type violations.

    Purpose:
        This exception indicates that an input `pd.DataFrame` does not conform
        to its required schema. This could involve missing columns, unexpected
        extra columns, incorrect data types, or invalid categorical levels.
        It is a critical data integrity check that prevents downstream
        processing errors.

    Process:
        This exception is raised by schema validation functions when a
        discrepancy is detected. The error message should be specific,
        detailing the exact nature of the schema mismatch (e.g., listing
        the missing columns) to facilitate rapid correction of the input data.
        It extends `ValueError` without adding new attributes, serving as a
        semantically distinct error type for targeted `try...except` blocks.
    """
    pass


class EmptyDataError(ValueError):
    """
    Custom exception raised when a required input DataFrame is empty or None.

    Purpose:
        This exception is raised as a preliminary check to ensure that essential
        input `pd.DataFrame` objects (e.g., tick data, macro data) contain
        data before any processing is attempted. Running the pipeline on empty
        data would lead to cryptic errors or incorrect results.

    Process:
        Validation functions should check if a DataFrame is `None` or if its
        `empty` attribute is `True` at the very beginning of their execution.
        If so, this exception should be raised with a message identifying the
        problematic DataFrame. It extends `ValueError` and provides a clear,
        early-exit signal that a required data source is missing or invalid.
    """
    pass

# =============================================================================
# TASK 1, STEP 1: CONFIGURATION DICTIONARY DEEP VALIDATION
# =============================================================================

def _validate_config_parameter(
    config: Dict[str, Any],
    path: List[str],
    expected_value: Any,
    validation_type: str,
    report_data: List[Dict[str, Any]]
) -> None:
    """
    Helper function to validate a single parameter within the configuration.

    This function navigates a nested dictionary using a path, compares the
    actual value to an expected value, and appends the result to a report list.
    It handles missing keys and type errors gracefully.

    Args:
        config (Dict[str, Any]): The configuration dictionary to validate.
        path (List[str]): A list of keys representing the path to the parameter.
        expected_value (Any): The expected value of the parameter.
        validation_type (str): The type of validation ('exact' or 'isclose').
        report_data (List[Dict[str, Any]]): A list to which the validation
                                            result dictionary will be appended.
    """
    # Initialize report entry with common fields
    path_str = ".".join(path)
    result_entry = {
        "timestamp": pd.Timestamp.now(tz='UTC'),
        "validation_type": "Configuration Parameter",
        "parameter_path": path_str,
        "expected_value": str(expected_value),
        "actual_value": "N/A",
        "pass_fail": "FAIL",
        "error_message": ""
    }

    try:
        # Traverse the nested dictionary to get the actual value
        actual_value = config
        for key in path:
            actual_value = actual_value[key]
        result_entry["actual_value"] = str(actual_value)

        # Perform validation based on the specified type
        is_pass = False
        if validation_type == 'isclose':
            # Use numpy.isclose for robust floating-point comparison
            if not isinstance(actual_value, (int, float)):
                raise TypeError("Value is not a number.")
            is_pass = np.isclose(
                actual_value,
                expected_value,
                rtol=1e-9,
                atol=1e-12
            )
        elif validation_type == 'exact':
            # Use exact string matching, stripping whitespace
            if isinstance(actual_value, str):
                is_pass = actual_value.strip() == expected_value
            else:
                is_pass = actual_value == expected_value

        # Update the report entry based on the validation outcome
        if is_pass:
            result_entry["pass_fail"] = "PASS"
        else:
            result_entry["error_message"] = "Actual value does not match expected value."

    except KeyError:
        # Handle cases where a key in the path does not exist
        result_entry["error_message"] = f"Key not found in configuration: {path_str}"
    except TypeError as e:
        # Handle type errors during comparison (e.g., comparing float with string)
        result_entry["error_message"] = f"Type error during validation: {e}"

    # Append the detailed result to the main report list
    report_data.append(result_entry)


def validate_study_configuration(
    study_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Performs a deep validation of the study_configuration dictionary.

    This function meticulously checks for the presence and correctness of all
    critical parameters specified in the project plan. It verifies nested keys,
    data types, and specific values required for the analysis to proceed
    correctly, using numpy.isclose for floating-point comparisons.

    Args:
        study_config (Dict[str, Any]): The configuration dictionary for the
                                       entire study.

    Returns:
        pd.DataFrame: A detailed report of the validation process, with each
                      row representing a specific check.

    Raises:
        ConfigurationValidationError: If any critical parameter fails validation,
                                      this exception is raised, containing the
                                      full validation report.
    """
    # Initialize a list to store dictionaries, which is more efficient for appending
    report_data: List[Dict[str, Any]] = []

    # Define the checklist of parameters to validate
    # Each tuple contains: (path_list, expected_value, validation_type)
    validation_checklist = [
        # Top-level keys
        (["identification_params"], None, 'exists'),
        (["bvar_specification"], None, 'exists'),
        (["bvar_priors"], None, 'exists'),
        (["local_projection_specification"], None, 'exists'),
        (["computation_settings"], None, 'exists'),
        # Nested parameters with specific values
        (["identification_params", "method"], "rotational_angle_decomposition", 'exact'),
        (["identification_params", "benchmark_selection"], "median_rotation", 'exact'),
        (["bvar_specification", "lags"], 3, 'exact'),
        (["bvar_priors", "hyperparameters", "ar_coeff_prior"], 0.8, 'isclose'),
        (["bvar_priors", "hyperparameters", "lambda_1_overall_tightness"], 0.1, 'isclose'),
        (["bvar_priors", "hyperparameters", "lambda_3_lag_decay"], 1.0, 'isclose'),
        (["local_projection_specification", "lags_dependent_variable"], 3, 'exact'),
        (["computation_settings", "impulse_response_functions", "credible_interval_level"], 0.90, 'isclose'),
    ]

    # Iterate through the checklist and validate each parameter
    for path, expected, v_type in validation_checklist:
        # The 'exists' type is a special case to check for key presence
        if v_type == 'exists':
            # Use dict.get() to safely check for the existence of top-level keys
            if study_config.get(path[0]) is None:
                report_data.append({
                    "timestamp": pd.Timestamp.now(tz='UTC'),
                    "validation_type": "Configuration Key Presence",
                    "parameter_path": path[0],
                    "expected_value": "Exists",
                    "actual_value": "Missing",
                    "pass_fail": "FAIL",
                    "error_message": f"Required top-level key '{path[0]}' is missing."
                })
        else:
            # Use the helper function for standard value validation
            _validate_config_parameter(study_config, path, expected, v_type, report_data)

    # Convert the list of results into a pandas DataFrame
    validation_report = pd.DataFrame(report_data)

    # Check if any validation failed
    if "FAIL" in validation_report["pass_fail"].values:
        # If any check fails, raise a custom exception with the full report
        raise ConfigurationValidationError(
            "Critical error in study configuration.",
            report=validation_report
        )

    # If all checks pass, return the successful validation report
    return validation_report


# =============================================================================
# TASK 1, STEP 2: DATAFRAME SCHEMA AND DATA TYPE VERIFICATION
# =============================================================================

def validate_dataframe_schema(
    df: pd.DataFrame,
    df_name: str,
    schema: Dict[str, Union[type, Tuple[type, Set[str]]]],
    expected_frequency: Optional[str] = None
) -> pd.DataFrame:
    """
    Validates the schema of a single DataFrame against a detailed definition.

    Purpose:
        This function provides a rigorous and comprehensive validation of a
        DataFrame's structure. It serves as a critical data integrity gateway,
        ensuring that input data conforms to all expected column names, data
        types (including timezone awareness), categorical levels, and,
        optionally, time-series frequency.

    Process:
        1.  **Initial Checks**: Verifies that the input DataFrame is not None or empty.
        2.  **Column Conformance**: Checks for an exact match between the
            DataFrame's columns and the required columns defined in the schema.
            It reports both missing and unexpected extra columns.
        3.  **Column-wise Validation**: For each column specified in the schema:
            - **Data Type**: Uses the precise `pandas.api.types` module to
              verify the data type. For datetimes, it also validates timezone
              awareness against the schema.
            - **Categorical Levels**: For categorical columns, it verifies that
              the set of categories is an exact match to the expected set.
        4.  **Frequency Validation (Optional)**: If `expected_frequency` is
            provided (e.g., 'MS' for monthly), it performs a check to ensure
            the time series conforms to this frequency by analyzing the modal
            difference between consecutive dates.
        5.  **Reporting**: Compiles the results of all checks into a single,
            detailed compliance report DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame to validate.
        df_name (str): The name of the DataFrame (for reporting purposes).
        schema (Dict[str, Union[type, Tuple[type, Set[str]]]]):
            A dictionary defining the required schema. Keys are column names.
            Values are either a type (e.g., `np.float64`) or a tuple of
            (type, set_of_valid_categories) for categorical columns.
        expected_frequency (Optional[str]): If not None, specifies the expected
            pandas frequency string (e.g., 'MS'). Triggers a time-series
            frequency check.

    Returns:
        pd.DataFrame: A report detailing the compliance of each column and the
                      overall frequency check.

    Raises:
        EmptyDataError: If the input DataFrame is empty or None.
        SchemaError: If the set of columns does not exactly match the schema.
    """
    # --- 1. Initial Checks ---
    # A validation function must first ensure it has valid data to operate on.
    if df is None or df.empty:
        raise EmptyDataError(f"Input DataFrame '{df_name}' is None or empty.")

    # --- 2. Column Conformance ---
    # Check for an exact match between the required and actual column sets.
    required_cols = set(schema.keys())
    actual_cols = set(df.columns)

    if required_cols != actual_cols:
        # If they don't match, provide a detailed error message.
        missing_cols = required_cols - actual_cols
        extra_cols = actual_cols - required_cols
        error_msg = f"Schema mismatch for DataFrame '{df_name}'."
        if missing_cols:
            error_msg += f" Missing columns: {sorted(list(missing_cols))}."
        if extra_cols:
            error_msg += f" Extra columns found: {sorted(list(extra_cols))}."
        raise SchemaError(error_msg)

    # --- 3. Column-wise Validation ---
    # Initialize a list to store compliance results for each column.
    compliance_report_data: List[Dict[str, Any]] = []

    # Iterate through the defined schema to check each column individually.
    for col, spec in schema.items():
        status = "PASS"
        message = ""
        expected_type = spec[0] if isinstance(spec, tuple) else spec
        actual_type = df[col].dtype

        # Use the precise pandas.api.types module for robust dtype checking.
        type_match = False
        if pd.api.types.is_datetime64_ns_dtype(expected_type):
            if pd.api.types.is_datetime64_ns_dtype(actual_type):
                # For datetimes, timezone awareness is a critical part of the type.
                if getattr(expected_type, 'tz', None) is not None:
                    if df[col].dt.tz is None:
                        status, message = "FAIL", "Timestamp column is timezone-naive; expected aware."
                    elif str(df[col].dt.tz) != str(expected_type.tz):
                        status, message = "FAIL", f"Incorrect timezone; expected {expected_type.tz}, got {df[col].dt.tz}."
                    else:
                        type_match = True
                else: # Expected is timezone-naive
                    if df[col].dt.tz is not None:
                        status, message = "FAIL", "Timestamp column is timezone-aware; expected naive."
                    else:
                        type_match = True
            else:
                status, message = "FAIL", "Expected datetime64[ns] dtype."
        # Check other common dtypes.
        elif pd.api.types.is_categorical_dtype(expected_type):
            type_match = pd.api.types.is_categorical_dtype(actual_type)
        elif pd.api.types.is_float_dtype(expected_type):
            type_match = pd.api.types.is_float_dtype(actual_type)
        elif pd.api.types.is_integer_dtype(expected_type):
            type_match = pd.api.types.is_integer_dtype(actual_type)
        elif pd.api.types.is_object_dtype(expected_type):
            type_match = pd.api.types.is_object_dtype(actual_type)

        if not type_match and status == "PASS":
            status, message = "FAIL", f"Type mismatch. Expected: {expected_type}, Actual: {actual_type}."

        # If the column is categorical, validate its defined levels.
        if status == "PASS" and isinstance(spec, tuple):
            expected_categories = spec[1]
            actual_categories = set(df[col].cat.categories)
            if actual_categories != expected_categories:
                status, message = "FAIL", f"Category mismatch. Expected: {expected_categories}, Actual: {actual_categories}."

        # Append the detailed result for the current column to the report list.
        compliance_report_data.append({
            "dataframe_name": df_name, "column_name": col, "expected_spec": str(spec),
            "actual_dtype": str(actual_type), "compliance_status": status, "message": message
        })

    # --- 4. Frequency Validation (Optional) ---
    # This new block implements the previously omitted frequency check.
    if expected_frequency:
        freq_status = "PASS"
        freq_message = ""
        # The check requires a 'date' column of a datetime type.
        if 'date' not in df.columns or not pd.api.types.is_datetime64_ns_dtype(df['date'].dtype):
            freq_status, freq_message = "FAIL", "Frequency check requires a 'date' column of datetime dtype."
        else:
            # Sort by date to ensure differences are chronological.
            day_diffs = df['date'].sort_values().diff().dt.days.dropna()

            if day_diffs.empty:
                freq_status, freq_message = "WARN", "Not enough data points to check frequency."
            else:
                modal_diff = day_diffs.mode()
                # Handle the edge case of an empty mode result.
                if modal_diff.empty:
                    freq_status, freq_message = "FAIL", "Could not determine a modal frequency."
                elif expected_frequency == 'MS': # Monthly check
                    # Check if the most common difference is within the range for a month.
                    if not (28 <= modal_diff[0] <= 31):
                        freq_status, freq_message = "FAIL", f"Expected monthly frequency, but modal day difference is {modal_diff[0]}."

        # Append the frequency check result to the report.
        compliance_report_data.append({
            "dataframe_name": df_name, "column_name": "_FREQUENCY_CHECK_", "expected_spec": expected_frequency,
            "actual_dtype": "N/A", "compliance_status": freq_status, "message": freq_message
        })

    # --- 5. Reporting ---
    # Convert the list of results into the final pandas DataFrame report.
    return pd.DataFrame(compliance_report_data)


# =============================================================================
# TASK 1, STEP 3: CROSS-DATASET ALIGNMENT VERIFICATION
# =============================================================================

def validate_cross_dataset_alignment(
    equity_tick_df: pd.DataFrame,
    rate_tick_df: pd.DataFrame,
    macro_df: pd.DataFrame,
    announcement_df: pd.DataFrame,
    target_market: str
) -> Dict[str, Any]:
    """
    Performs comprehensive alignment checks across all input datasets.

    Purpose:
        This function serves as a critical data integrity gateway. It verifies
        that all disparate datasets (high-frequency, low-frequency, event metadata)
        are logically consistent and temporally aligned before any analysis is
        attempted. It checks for content (variable coverage), time (event
        coverage), and data quality (macroeconomic series gaps).

    Process:
        1.  **Content Validation**:
            - Verifies that the specified `target_market` exists in the macro data.
            - Ensures that all required macroeconomic variables for that market
              are present in the dataset.
        2.  **Temporal Alignment**:
            - **UTC Conversion**: Calls the robust `standardize_announcement_timestamps`
              function to accurately convert all local announcement times to UTC,
              correctly handling timezones and Daylight Saving Time.
            - **Range Check**: Determines the full temporal range of the high-
              frequency tick data.
            - **Buffer Implementation**: Adds a 24-hour buffer to each end of
              the tick data range.
            - **Verification**: Checks if all UTC announcement timestamps fall
              within this buffered range. Any events outside this range are
              flagged as critical errors.
        3.  **Macroeconomic Gap Analysis**:
            - **Pivoting**: Transforms the long-format macro data for the target
              market into a wide-format DataFrame.
            - **Densification**: Reindexes the data to a complete, uninterrupted
              monthly time grid.
            - **Gap Detection**: Identifies any gaps in the series and flags
              series with gaps exceeding 3 consecutive months as critical failures.

    Args:
        equity_tick_df (pd.DataFrame): High-frequency equity data.
        rate_tick_df (pd.DataFrame): High-frequency interest rate data.
        macro_df (pd.DataFrame): Monthly macroeconomic time series data.
        announcement_df (pd.DataFrame): Central bank announcement metadata.
        target_market (str): The market to study (e.g., 'CAN').

    Returns:
        Dict[str, Any]: A dictionary containing a summary status and detailed
                        report DataFrames for each alignment check.

    Raises:
        ValueError: If a critical alignment check fails (e.g., target market
                    not found, announcements outside tick data range, or macro
                    gaps > 3 months).
    """
    # --- Initialization of the results and reporting structure ---
    # This dictionary will hold the final status and any generated reports.
    results: Dict[str, Any] = {
        "overall_status": "PASS",
        "reports": {}
    }

    # --- 1. Content Validation: Target Market and Variable Coverage ---
    # Check if the specified market exists in the 'country' column.
    if target_market not in macro_df['country'].unique():
        # If not, this is a fatal configuration error.
        raise ValueError(f"Target market '{target_market}' not found in macroeconomic data's 'country' column.")

    # Filter the macro data to only the target market.
    target_macro_df = macro_df[macro_df['country'] == target_market]

    # Define the set of essential variables for the core VAR model.
    required_vars = {'exchange_rate', 'policy_rate', 'cpi', 'equity_index', 'gdp'}

    # Check if all required variables are present for the target market.
    present_vars = set(target_macro_df['variable_name'].unique())
    if not required_vars.issubset(present_vars):
        # If any are missing, this is a fatal data error.
        missing_vars = required_vars - present_vars
        raise ValueError(f"Target market '{target_market}' is missing required variables: {sorted(list(missing_vars))}")

    # --- 2. Temporal Alignment Validation ---
    # Determine the absolute start and end times of the available tick data.
    min_tick_time: pd.Timestamp = min(equity_tick_df['timestamp_micros_utc'].min(), rate_tick_df['timestamp_micros_utc'].min())
    max_tick_time: pd.Timestamp = max(equity_tick_df['timestamp_micros_utc'].max(), rate_tick_df['timestamp_micros_utc'].max())

    # Define the specified 24-hour buffer.
    buffer: pd.Timedelta = pd.Timedelta(hours=24)

    # Calculate the valid temporal range, inclusive of the buffer.
    valid_start_time: pd.Timestamp = min_tick_time - buffer
    valid_end_time: pd.Timestamp = max_tick_time + buffer

    try:
        # **CRITICAL STEP**: Use the robust, DST-aware function from Task 3 to get accurate UTC timestamps.
        # This replaces the previous naive and incorrect conversion logic.
        ann_df_with_utc, _ = standardize_announcement_timestamps(announcement_df)
        ann_utc_timestamps: pd.Series = ann_df_with_utc['announcement_timestamp_utc']
    except Exception as e:
        # If the robust conversion itself fails, it's a critical data quality issue.
        raise ValueError(f"Failed to standardize announcement timestamps for alignment check. Error: {e}")

    # Create a boolean mask to identify any announcements that fall outside the valid range.
    uncovered_mask: pd.Series = (ann_utc_timestamps < valid_start_time) | (ann_utc_timestamps > valid_end_time)

    # Check if any announcements were flagged.
    if uncovered_mask.any():
        # If so, this is a critical failure.
        results['overall_status'] = "FAIL"
        # Isolate the specific events that are uncovered for clear reporting.
        uncovered_announcements: pd.DataFrame = ann_df_with_utc[uncovered_mask]
        results['reports']['temporal_alignment_failures'] = uncovered_announcements
        # Raise an error with a detailed message and context.
        raise ValueError(
            f"{uncovered_mask.sum()} announcements are outside the buffered high-frequency data time range "
            f"({valid_start_time} to {valid_end_time}). See report for details."
        )

    # --- 3. Macroeconomic Data Gap Analysis ---
    # Pivot the data to wide format to make missing values explicit.
    macro_wide: pd.DataFrame = target_macro_df.pivot_table(
        index='date', columns='variable_name', values='value_raw'
    )

    # Create a complete monthly date range for the sample period of the macro data.
    full_date_range: pd.DatetimeIndex = pd.date_range(
        start=macro_wide.index.min(), end=macro_wide.index.max(), freq='MS'
    )
    # Reindex to the full grid to make any implicit gaps explicit NaNs.
    macro_wide = macro_wide.reindex(full_date_range)

    # Identify gaps and check for consecutive missing months exceeding the threshold.
    gap_report_data: List[Dict[str, Any]] = []
    for var in required_vars:
        # Isolate the series for the current variable.
        series: pd.Series = macro_wide[var]
        # Create a boolean series indicating where data is missing.
        is_na: pd.Series = series.isna()

        if is_na.any():
            # Identify contiguous blocks of NaNs.
            blocks: pd.Series = (is_na.diff() != 0).cumsum()
            nan_blocks = series[is_na].groupby(blocks[is_na])

            for _, block_series in nan_blocks:
                # For each block of missing data, calculate its length.
                gap_length = len(block_series)
                # If the gap is longer than 3 months, it's a critical failure.
                status = "FAIL" if gap_length > 3 else "WARN"
                gap_report_data.append({
                    "variable": var,
                    "gap_start": block_series.index.min(),
                    "gap_end": block_series.index.max(),
                    "gap_months": gap_length,
                    "status": status
                })

    # Check if any gaps were detected.
    if gap_report_data:
        # Create a report DataFrame from the collected gap information.
        gap_report = pd.DataFrame(gap_report_data)
        results['reports']['macro_gap_report'] = gap_report
        # If any of the gaps were marked as a failure, this is a critical issue.
        if "FAIL" in gap_report['status'].values:
            results['overall_status'] = "FAIL"
            raise ValueError(
                "Critical Error: Macroeconomic data contains gaps exceeding 3 consecutive months. "
                "See 'macro_gap_report' in the returned dictionary for details."
            )

    # If all checks passed, return the success status and any warning reports.
    return results

# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 1
# =============================================================================

def run_phase1_task1_validation(
    equity_tick_df: pd.DataFrame,
    rate_tick_df: pd.DataFrame,
    macro_df: pd.DataFrame,
    announcement_df: pd.DataFrame,
    target_market: str,
    study_config: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the execution of all validation steps in Task 1.

    This master function sequentially calls the validation functions for the
    configuration dictionary, DataFrame schemas, and cross-dataset alignment.
    It aggregates the results into a final dictionary of reports.

    Args:
        equity_tick_df (pd.DataFrame): Raw high-frequency equity data.
        rate_tick_df (pd.DataFrame): Raw high-frequency rate data.
        macro_df (pd.DataFrame): Raw macroeconomic time series data.
        announcement_df (pd.DataFrame): Raw announcement metadata.
        target_market (str): The market to study (e.g., 'CAN').
        study_config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary containing the detailed validation
                                 reports from each step.
    """
    # --- Step 1: Configuration Dictionary Validation ---
    config_report = validate_study_configuration(study_config)

    # --- Step 2: DataFrame Schema Validation ---
    # Define the required schemas for each DataFrame
    tick_schema = {
        "timestamp_micros_utc": pd.DatetimeTZDtype(unit='ns', tz=pytz.UTC),
        "price": np.float64,
        "volume": np.int64,
        "type": (pd.CategoricalDtype, {'TRADE', 'BID', 'ASK'})
    }
    macro_schema = {
        "date": pd.to_datetime("2000-01-01").__class__, # A way to get datetime64[ns] type
        "source_series_id": np.object_,
        "country": pd.CategoricalDtype,
        "variable_name": np.object_,
        "value_raw": np.float64
    }
    announcement_schema = {
        "event_id": np.int64,
        "central_bank": (pd.CategoricalDtype, {'ECB', 'FED'}),
        "announcement_date_local": pd.to_datetime("2000-01-01").__class__,
        "announcement_time_local": np.object_,
        "local_timezone": np.object_
    }

    # Validate each DataFrame and collect reports
    equity_schema_report = validate_dataframe_schema(equity_tick_df, "equity_tick_df", tick_schema)
    rate_schema_report = validate_dataframe_schema(rate_tick_df, "rate_tick_df", tick_schema)
    macro_schema_report = validate_dataframe_schema(macro_df, "macro_df", macro_schema)
    announcement_schema_report = validate_dataframe_schema(announcement_df, "announcement_df", announcement_schema)

    # Combine schema reports into a single DataFrame
    full_schema_report = pd.concat([
        equity_schema_report,
        rate_schema_report,
        macro_schema_report,
        announcement_schema_report
    ], ignore_index=True)

    if "FAIL" in full_schema_report["compliance_status"].values:
        raise SchemaError(f"One or more DataFrames failed schema validation.\n{full_schema_report.to_string()}")

    # --- Step 3: Cross-Dataset Alignment Validation ---
    alignment_results = validate_cross_dataset_alignment(
        equity_tick_df, rate_tick_df, macro_df, announcement_df, target_market
    )

    # --- Aggregate and return all reports ---
    final_reports = {
        "config_validation_report": config_report,
        "schema_validation_report": full_schema_report,
        "alignment_validation_results": alignment_results
    }

    return final_reports


In [None]:
# Task 2: Data Quality Assessment and Anomaly Detection

# =============================================================================
# TASK 2, STEP 1: HIGH-FREQUENCY TICK DATA ANOMALY DETECTION
# =============================================================================

def detect_tick_data_anomalies(
    tick_df: pd.DataFrame,
    market_hours: Dict[str, time],
    local_timezone_str: str
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Performs a comprehensive quality assessment on high-frequency tick data.

    Purpose:
        This function identifies common anomalies in tick-level financial data
        to ensure its integrity before it is used for surprise calculation. It
        flags duplicates, price outliers, volume irregularities, timestamp gaps,
        and other issues.

    Process:
        1.  **Duplicate Timestamps**: Identifies ticks with identical timestamps.
        2.  **Price Outliers**: Uses the robust Modified Z-score method to flag
            prices that deviate significantly from the local median. The formula is:
            Z_i = (0.6745 * (price_i - median_price)) / MAD
            where MAD is the Median Absolute Deviation. A threshold of 3.5 is used.
        3.  **Volume Anomalies**: Flags trades with volume less than 1 or
            exceeding the 99.9th percentile of the sample.
        4.  **Timestamp Gaps**: Detects time gaps greater than 5 minutes that
            occur during specified market hours.
        5.  **Invalid Prices**: Flags any ticks with a price less than or equal to zero.
        6.  **Monotonicity**: Checks if the entire timestamp series is strictly
            increasing.

    Args:
        tick_df (pd.DataFrame): DataFrame containing tick data with columns
                                ['timestamp_micros_utc', 'price', 'volume'].
        market_hours (Dict[str, time]): A dictionary with 'start' and 'end'
                                        keys defining market hours.
        local_timezone_str (str): The IANA timezone string for the market
                                  (e.g., 'America/New_York').

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]:
        - A DataFrame of boolean flags with the same index as `tick_df`, where
          each column corresponds to a specific anomaly type.
        - A summary DataFrame quantifying the number and percentage of each
          detected anomaly.
    """
    # Input validation
    if not isinstance(tick_df, pd.DataFrame) or tick_df.empty:
        raise ValueError("tick_df must be a non-empty pandas DataFrame.")
    required_cols = {'timestamp_micros_utc', 'price', 'volume'}
    if not required_cols.issubset(tick_df.columns):
        raise ValueError(f"tick_df missing required columns: {required_cols - set(tick_df.columns)}")

    # Initialize a DataFrame to store anomaly flags, sharing the same index
    n_ticks = len(tick_df)
    anomalies = pd.DataFrame(index=tick_df.index)

    # --- Anomaly 1: Duplicate Timestamps ---
    # Find all occurrences of timestamps that are not unique.
    anomalies['duplicate_timestamp'] = tick_df['timestamp_micros_utc'].duplicated(keep=False)

    # --- Anomaly 2: Price Outliers (Modified Z-Score) ---
    # Calculate the median price, which is robust to outliers.
    median_price = tick_df['price'].median()
    # Calculate the Median Absolute Deviation (MAD) from the median.
    mad = np.median(np.abs(tick_df['price'] - median_price))
    # Define a minimum MAD to avoid division by zero if all prices are identical.
    mad_safe = mad if mad > 1e-9 else 1.0
    # Calculate the Modified Z-Score for each price point.
    # The constant 0.6745 is norm.ppf(0.75), scaling MAD to be a consistent
    # estimator of the standard deviation.
    modified_z_score = (norm.ppf(0.75) * (tick_df['price'] - median_price)) / mad_safe
    # Flag prices where the absolute Modified Z-Score exceeds the threshold of 3.5.
    anomalies['price_outlier'] = np.abs(modified_z_score) > 3.5

    # --- Anomaly 3: Volume Anomalies ---
    # Calculate the 99.9th percentile for volume to identify extreme trades.
    volume_q999 = tick_df['volume'].quantile(0.999)
    # Flag trades with zero or negative volume.
    anomalies['low_volume'] = tick_df['volume'] < 1
    # Flag trades with volume exceeding the high threshold.
    anomalies['high_volume'] = tick_df['volume'] > volume_q999

    # --- Anomaly 4: Timestamp Gaps during Market Hours ---
    # Calculate the time difference between consecutive ticks in seconds.
    time_diff_seconds = tick_df['timestamp_micros_utc'].diff().dt.total_seconds()
    # Get the local timezone object.
    local_tz = pytz.timezone(local_timezone_str)
    # Convert UTC timestamps to the market's local time.
    local_time = tick_df['timestamp_micros_utc'].dt.tz_convert(local_tz).dt.time
    # Create a boolean mask for ticks occurring within market hours.
    in_market_hours = (local_time >= market_hours['start']) & (local_time <= market_hours['end'])
    # Flag gaps greater than 5 minutes (300 seconds) that occur within market hours.
    anomalies['timestamp_gap'] = (time_diff_seconds > 300) & in_market_hours

    # --- Anomaly 5: Invalid Prices ---
    # Flag any ticks with a price that is not strictly positive.
    anomalies['invalid_price'] = tick_df['price'] <= 0

    # --- Anomaly 6: Timestamp Monotonicity ---
    # Check if the entire timestamp series is monotonically increasing.
    is_monotonic = tick_df['timestamp_micros_utc'].is_monotonic_increasing
    # If not monotonic, this is a serious data quality issue.
    # This flag will be False for all rows if the check fails globally.
    anomalies['non_monotonic'] = not is_monotonic

    # --- Generate Summary Report ---
    summary_data = []
    for col in anomalies.columns:
        count = anomalies[col].sum()
        percentage = (count / n_ticks) * 100 if n_ticks > 0 else 0
        summary_data.append({
            "anomaly_type": col,
            "count": count,
            "percentage": percentage
        })
    summary_df = pd.DataFrame(summary_data)

    return anomalies, summary_df


# =============================================================================
# TASK 2, STEP 2: ANNOUNCEMENT METADATA INTEGRITY VERIFICATION
# =============================================================================

def verify_announcement_metadata(
    announcement_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Performs a rigorous integrity check on the announcement metadata.

    Purpose:
        This function validates the announcement schedule data to ensure it is
        reliable. It checks for unique identifiers, valid timezones, parseable
        times, and flags announcements on non-business days.

    Process:
        1.  **Unique Event IDs**: Verifies that every `event_id` is unique.
        2.  **Timezone Validity**: Checks `local_timezone` strings against the
            IANA database using `pytz`.
        3.  **Time Format**: Attempts to parse `announcement_time_local`.
        4.  **Weekend/Holiday Announcements**: Flags events scheduled on weekends
            or on known market holidays for the relevant exchanges (CME for FED,
            EUREX for ECB).
        5.  **Chronological Order**: Ensures announcements for each central bank
            are sorted chronologically.
        6.  **Duplicate Announcements**: Detects logical duplicates based on
            central bank, date, and time.

    Args:
        announcement_df (pd.DataFrame): DataFrame with announcement metadata.

    Returns:
        pd.DataFrame: A detailed report DataFrame, indexed by `event_id`,
                      containing the status and messages for each validation check.
    """
    # Input validation
    if not isinstance(announcement_df, pd.DataFrame) or announcement_df.empty:
        raise ValueError("announcement_df must be a non-empty pandas DataFrame.")

    # Create a report DataFrame indexed by the event_id for clear mapping
    report = pd.DataFrame(index=announcement_df['event_id'])
    df = announcement_df.set_index('event_id')

    # --- Check 1: Unique Event IDs ---
    report['is_unique_id'] = ~df.index.duplicated(keep=False)

    # --- Check 2: Timezone Validity ---
    def is_valid_timezone(tz_str):
        try:
            pytz.timezone(tz_str)
            return True
        except pytz.UnknownTimeZoneError:
            return False
    report['is_valid_timezone'] = df['local_timezone'].apply(is_valid_timezone)

    # --- Check 3: Time Format Consistency ---
    # Attempt to parse the time string; unparseable formats become NaT.
    parsed_time = pd.to_datetime(df['announcement_time_local'], format='%H:%M:%S', errors='coerce').dt.time
    report['is_valid_time_format'] = parsed_time.notna()

    # --- Check 4: Weekend/Holiday Announcements ---
    # Check for weekends (Saturday=5, Sunday=6).
    report['is_weekday'] = df['announcement_date_local'].dt.dayofweek < 5
    # Check for market holidays using pandas_market_calendars.
    cme_cal = mcal.get_calendar('CME')
    eurex_cal = mcal.get_calendar('EUREX')
    cme_holidays = cme_cal.holidays().holidays
    eurex_holidays = eurex_cal.holidays().holidays

    def is_not_holiday(row):
        if row['central_bank'] == 'FED':
            return row['announcement_date_local'].date() not in cme_holidays
        elif row['central_bank'] == 'ECB':
            return row['announcement_date_local'].date() not in eurex_holidays
        return True
    report['is_not_holiday'] = df.apply(is_not_holiday, axis=1)

    # --- Check 5: Chronological Ordering ---
    # Check if dates are sorted within each central bank group.
    is_sorted = df.groupby('central_bank')['announcement_date_local'].is_monotonic_increasing.all()
    report['is_chronological'] = is_sorted # Broadcasts the global result to all rows

    # --- Check 6: Duplicate Announcements ---
    # Detect duplicates based on a composite key.
    key_cols = ['central_bank', 'announcement_date_local', 'announcement_time_local']
    report['is_not_duplicate_event'] = ~df.duplicated(subset=key_cols, keep=False)

    return report.reset_index()


# =============================================================================
# TASK 2, STEP 3: MACROECONOMIC TIME SERIES STATISTICAL QUALITY CONTROL
# =============================================================================

def _chow_test(
    series: pd.Series,
    breakpoint_date: pd.Timestamp
) -> float:
    """
    Performs a Chow test for a structural break at a specific date.

    This helper function tests the null hypothesis that the coefficients of a
    linear regression model are stable across a breakpoint.

    Args:
        series (pd.Series): The time series data to test.
        breakpoint_date (pd.Timestamp): The date of the potential structural break.

    Returns:
        float: The p-value of the Chow test. Returns np.nan if the test cannot be performed.
    """
    # Define the regression model: y_t = beta_0 + beta_1 * t + e_t
    y = series
    X = sm.add_constant(np.arange(len(series)))
    X.index = y.index
    k = X.shape[1]  # Number of regressors (2: constant and trend)
    N = len(y)

    # Check if the breakpoint is valid and allows for estimation in both sub-samples.
    if breakpoint_date not in y.index or y.index.get_loc(breakpoint_date) < k:
        return np.nan

    # Split the data into two sub-samples at the breakpoint.
    X1, y1 = X[X.index < breakpoint_date], y[y.index < breakpoint_date]
    X2, y2 = X[X.index >= breakpoint_date], y[y.index >= breakpoint_date]

    # Check if each sub-sample is large enough to estimate the model.
    if len(y1) < k or len(y2) < k or (N - 2 * k) <= 0:
        return np.nan

    # --- Perform Regressions ---
    # 1. Restricted model (pooled data)
    rss_restricted = sm.OLS(y, X).fit().ssr

    # 2. Unrestricted models (separate sub-samples)
    rss_unrestricted_1 = sm.OLS(y1, X1).fit().ssr
    rss_unrestricted_2 = sm.OLS(y2, X2).fit().ssr
    rss_unrestricted_total = rss_unrestricted_1 + rss_unrestricted_2

    # --- Calculate F-statistic ---
    # Equation: F = [(RSS_R - RSS_UR) / k] / [RSS_UR / (N - 2k)]
    f_statistic = ((rss_restricted - rss_unrestricted_total) / k) / (rss_unrestricted_total / (N - 2 * k))

    # --- Calculate p-value ---
    # The p-value is derived from the F-distribution's survival function (1 - CDF).
    # Numerator degrees of freedom: dfn = k
    # Denominator degrees of freedom: dfd = N - 2k
    p_value = f.sf(f_statistic, dfn=k, dfd=N - 2 * k)

    return p_value


def _assess_single_series(
    series_group: pd.DataFrame
) -> pd.Series:
    """
    Worker function to perform all quality checks on a single time series.

    This function is designed to be used with `groupby().apply()`.

    Args:
        series_group (pd.DataFrame): A DataFrame for a single source_series_id,
                                     with 'date' as index and 'value_raw' as the column.

    Returns:
        pd.Series: A Series containing all calculated quality metrics for the input series.
    """
    # Extract the series and its original size for missing value calculation.
    original_size = len(series_group)
    series = series_group['value_raw'].dropna()

    # If the series has too few observations after dropping NaNs, return a partial report.
    if len(series) < 24:  # Require at least 2 years of data for meaningful tests.
        return pd.Series({
            'missing_pct': (1 - len(series) / original_size) * 100 if original_size > 0 else 100,
            'chow_p_val_gfc': np.nan,
            'chow_p_val_covid': np.nan,
            'outlier_pct': np.nan,
            'is_monthly': False,
            'is_strictly_positive': False,
            'notes': 'Series too short for full assessment.'
        })

    # --- 1. Missing Value Percentage ---
    missing_pct = (1 - len(series) / original_size) * 100

    # --- 2. Structural Break (Chow Test) at two key dates ---
    chow_p_gfc = _chow_test(series, pd.Timestamp('2008-09-15'))
    chow_p_covid = _chow_test(series, pd.Timestamp('2020-03-11'))

    # --- 3. Outliers (Hampel Filter) ---
    # Use a 7-month centered window (k=3) to calculate local median and MAD.
    rolling_median = series.rolling(window=7, center=True, min_periods=4).median()
    # Calculate Median Absolute Deviation from the rolling median.
    rolling_mad = (series - rolling_median).abs().rolling(window=7, center=True, min_periods=4).median()
    # An observation is an outlier if it's more than 3 MADs from the local median.
    is_outlier = (series - rolling_median).abs() > (3 * rolling_mad)
    outlier_pct = is_outlier.sum() / len(series) * 100

    # --- 4. Temporal Consistency (Monthly Frequency Check) ---
    # Calculate the difference in days between consecutive observations.
    day_diffs = series.index.to_series().diff().dt.days
    # The mode of these differences should be between 28 and 31 for a monthly series.
    modal_diff = day_diffs.mode()
    is_monthly = (modal_diff >= 28).all() and (modal_diff <= 31).all() if not modal_diff.empty else False

    # --- 5. Value Range Check (Strictly Positive) ---
    # A common requirement for variables like GDP, CPI, etc.
    is_positive = (series > 0).all()

    # --- Assemble and return the results Series ---
    return pd.Series({
        'missing_pct': missing_pct,
        'chow_p_val_gfc': chow_p_gfc,
        'chow_p_val_covid': chow_p_covid,
        'outlier_pct': outlier_pct,
        'is_monthly': is_monthly,
        'is_strictly_positive': is_positive,
        'notes': ''
    })


def assess_macro_series_quality(
    macro_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Performs statistical quality control on macroeconomic time series.

    Purpose:
        This function assesses each time series in the provided long-format
        DataFrame for common issues like missing data, structural breaks,
        outliers, and inconsistent frequency. It produces a comprehensive
        report to guide data cleaning and model specification.

    Process:
        1.  **Group Data**: Groups the input DataFrame by `source_series_id`.
        2.  **Apply Checks**: Uses the `_assess_single_series` worker function
            on each group to perform the full suite of tests in a vectorized manner.
            - **Missing Values**: Calculates the percentage of missing observations.
            - **Structural Breaks**: Performs a statistically correct Chow test
              for breaks at the GFC (2008-09-15) and COVID-19 (2020-03-11) dates.
            - **Outliers**: Applies a Hampel filter with a 3-month rolling window.
            - **Temporal Consistency**: Verifies a consistent monthly frequency.
            - **Value Ranges**: Checks if values are strictly positive.
        3.  **Compile Report**: Assembles the results from all series into a
            single, clean DataFrame, indexed by `source_series_id`.

    Args:
        macro_df (pd.DataFrame): Long-format DataFrame of macro series with
                                 columns ['source_series_id', 'date', 'value_raw'].

    Returns:
        pd.DataFrame: A summary report DataFrame, indexed by `source_series_id`,
                      detailing the quality metrics for each time series.
    """
    # --- Input Validation ---
    if not isinstance(macro_df, pd.DataFrame) or macro_df.empty:
        raise ValueError("macro_df must be a non-empty pandas DataFrame.")
    required_cols = {'source_series_id', 'date', 'value_raw'}
    if not required_cols.issubset(macro_df.columns):
        raise ValueError(f"macro_df missing required columns: {required_cols - set(macro_df.columns)}")

    # --- Group by series and apply the assessment function ---
    # Set 'date' as the index to enable time-series operations within the helper.
    quality_report = macro_df.set_index('date').groupby('source_series_id').apply(_assess_single_series)

    # Return the final report.
    return quality_report


# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 2
# =============================================================================

def run_phase1_task2_quality_assessment(
    equity_tick_df: pd.DataFrame,
    rate_tick_df: pd.DataFrame,
    announcement_df: pd.DataFrame,
    macro_df: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the execution of all data quality assessment steps in Task 2.

    This master function calls the specialized assessment functions for each
    type of input data (tick, announcement, macro) and aggregates the
    resulting quality reports into a single dictionary.

    Args:
        equity_tick_df (pd.DataFrame): Raw high-frequency equity data.
        rate_tick_df (pd.DataFrame): Raw high-frequency rate data.
        announcement_df (pd.DataFrame): Raw announcement metadata.
        macro_df (pd.DataFrame): Raw macroeconomic time series data.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary containing the detailed data
                                 quality and anomaly reports from each step.
    """
    # Define market hours for anomaly detection
    fed_market_hours = {'start': time(9, 30), 'end': time(16, 0)}
    ecb_market_hours = {'start': time(9, 0), 'end': time(17, 30)}

    # --- Step 1: High-Frequency Data Quality Checks ---
    # Note: Assuming equity and rate futures trade on corresponding exchanges
    equity_anomalies, equity_summary = detect_tick_data_anomalies(
        equity_tick_df, fed_market_hours, 'America/New_York'
    )
    rate_anomalies, rate_summary = detect_tick_data_anomalies(
        rate_tick_df, ecb_market_hours, 'Europe/Berlin' # Example for ECB
    )

    # --- Step 2: Announcement Metadata Quality Assessment ---
    announcement_report = verify_announcement_metadata(announcement_df)

    # --- Step 3: Macroeconomic Time Series Quality Control ---
    macro_quality_report = assess_macro_series_quality(macro_df)

    # --- Aggregate and return all reports ---
    final_reports = {
        "equity_anomaly_flags": equity_anomalies,
        "equity_anomaly_summary": equity_summary,
        "rate_anomaly_flags": rate_anomalies,
        "rate_anomaly_summary": rate_summary,
        "announcement_integrity_report": announcement_report,
        "macro_series_quality_report": macro_quality_report
    }

    return final_reports


In [None]:
# Task 3: Data Cleansing and Preprocessing

# =============================================================================
# TASK 3, STEP 1: HIGH-FREQUENCY TICK DATA CLEANSING PIPELINE
# =============================================================================

def cleanse_tick_data(
    raw_tick_df: pd.DataFrame,
    anomaly_flags: pd.DataFrame,
    min_volume_threshold: int = 10
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Applies a rigorous cleansing pipeline to raw high-frequency tick data.

    Purpose:
        This function transforms raw tick data into a clean, analysis-ready
        dataset by systematically removing invalid, anomalous, or irrelevant
        observations. It generates a detailed audit trail of the entire process.

    Process:
        1.  **Input Validation**: Ensures the raw data and anomaly flags are valid.
        2.  **Duplicate Removal**: Removes records with identical timestamp and price.
        3.  **Type Filtering**: Retains only 'TRADE' type ticks, as these
            represent actual transactions.
        4.  **Volume Filtering**: Excludes trades with volume below a specified
            liquidity threshold (default 10).
        5.  **Outlier Removal**: Removes ticks previously flagged as price
            outliers by the anomaly detection process.
        6.  **Gap Filling**: This step is omitted here as per the instructions,
            which specify forward-filling for small gaps. This is better handled
            during the price extraction phase (Task 4) where the context of an
            event window is known, rather than on the entire tick history.
            Applying a global forward-fill on tick data is generally incorrect.

    Args:
        raw_tick_df (pd.DataFrame): The raw tick data, including 'type' column.
        anomaly_flags (pd.DataFrame): Boolean flags for outliers from Task 2.
        min_volume_threshold (int): The minimum trade volume to be considered valid.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
        - A new DataFrame containing the cleansed tick data.
        - A dictionary serving as an audit trail, detailing the number of
          records at the start and end, and records removed at each step.
    """
    # --- Input Validation ---
    if raw_tick_df.empty:
        raise ValueError("Input 'raw_tick_df' cannot be empty.")
    if anomaly_flags.empty or not raw_tick_df.index.equals(anomaly_flags.index):
        raise ValueError("Anomaly flags must be a non-empty DataFrame with an index matching the raw tick data.")

    # --- Initialization of Audit Trail ---
    initial_count = len(raw_tick_df)
    audit_trail = {
        "initial_record_count": initial_count,
        "processing_timestamp_utc": pd.Timestamp.now(tz='UTC'),
        "steps": []
    }

    # --- Step 1: Remove Duplicates ---
    # A duplicate is defined as a record with the same timestamp and price.
    df = raw_tick_df.drop_duplicates(subset=['timestamp_micros_utc', 'price'], keep='first')
    count_after_dedup = len(df)
    audit_trail["steps"].append({
        "step_name": "Remove Duplicates",
        "records_removed": initial_count - count_after_dedup,
        "records_remaining": count_after_dedup
    })

    # --- Step 2: Filter for 'TRADE' type ticks ---
    # Only actual trades are relevant for calculating price changes.
    trade_mask = df['type'] == 'TRADE'
    df = df[trade_mask]
    count_after_trade_filter = len(df)
    audit_trail["steps"].append({
        "step_name": "Filter for TRADE type",
        "records_removed": count_after_dedup - count_after_trade_filter,
        "records_remaining": count_after_trade_filter
    })

    # --- Step 3: Apply Minimum Volume Filter ---
    # Exclude trades with insufficient volume, which may not be representative.
    volume_mask = df['volume'] >= min_volume_threshold
    df = df[volume_mask]
    count_after_volume_filter = len(df)
    audit_trail["steps"].append({
        "step_name": f"Filter for Volume >= {min_volume_threshold}",
        "records_removed": count_after_trade_filter - count_after_volume_filter,
        "records_remaining": count_after_volume_filter
    })

    # --- Step 4: Remove Price Outliers ---
    # Use the pre-computed outlier flags from Task 2 for consistency.
    # Align the outlier flags with the current state of the DataFrame.
    outlier_mask = anomaly_flags.loc[df.index, 'price_outlier']
    df = df[~outlier_mask]
    count_after_outlier_removal = len(df)
    audit_trail["steps"].append({
        "step_name": "Remove Price Outliers",
        "records_removed": count_after_volume_filter - count_after_outlier_removal,
        "records_remaining": count_after_outlier_removal
    })

    # --- Finalization ---
    audit_trail["final_record_count"] = count_after_outlier_removal

    # Return the cleansed DataFrame and the detailed audit trail.
    return df.copy(), audit_trail


# =============================================================================
# TASK 3, STEP 2: ANNOUNCEMENT TIMESTAMP UTC STANDARDIZATION
# =============================================================================

def standardize_announcement_timestamps(
    announcement_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Converts local announcement times to standardized UTC timestamps.

    Purpose:
        This function creates a single, unambiguous UTC timestamp for each
        announcement, which is essential for accurately aligning events with
        the high-frequency tick data.

    Process:
        1.  **Combine Date and Time**: Merges the local date and time columns.
        2.  **Localize Timestamp**: Converts the naive datetime to a timezone-
            aware datetime using the specified `local_timezone`.
        3.  **Handle DST Transitions**: Critically, it uses `is_dst=None` to
            force `pytz` to raise errors on ambiguous or non-existent times
            during DST changes. It then catches these specific errors and
            applies documented fallback logic.
        4.  **Convert to UTC**: Converts the final localized timestamp to UTC.
        5.  **Validate**: Checks if the resulting UTC timestamps fall within
            a reasonable window (08:00-20:00 UTC).
        6.  **Audit**: Produces a detailed diagnostic report of the conversion.

    Args:
        announcement_df (pd.DataFrame): The announcement metadata DataFrame.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]:
        - The input DataFrame with a new 'announcement_timestamp_utc' column.
        - A diagnostic DataFrame auditing the conversion for each event.
    """
    # --- Input Validation ---
    if announcement_df.empty:
        raise ValueError("Input 'announcement_df' cannot be empty.")

    # --- Initialization ---
    diagnostics = []
    utc_timestamps = []

    # --- Iterate through each announcement for precise DST handling ---
    for _, row in announcement_df.iterrows():
        event_id = row['event_id']
        tz_str = row['local_timezone']
        status = "OK"
        notes = ""

        try:
            # Combine date and time strings into a naive datetime object.
            naive_dt = pd.to_datetime(
                f"{row['announcement_date_local'].date()} {row['announcement_time_local']}"
            )

            # Get the pytz timezone object.
            local_tz = pytz.timezone(tz_str)

            # Localize the naive datetime. is_dst=None is CRITICAL.
            # It forces an error on ambiguous/non-existent times instead of guessing.
            local_dt = local_tz.localize(naive_dt, is_dst=None)

        except pytz.AmbiguousTimeError:
            # Occurs during "fall back" DST transition (e.g., 1:30 AM happens twice).
            # The standard resolution is to choose the first occurrence (daylight time).
            local_dt = local_tz.localize(naive_dt, is_dst=True)
            status = "Warning"
            notes = "AmbiguousTimeError: Resolved by choosing first occurrence (DST)."

        except pytz.NonExistentTimeError:
            # Occurs during "spring forward" DST transition (e.g., 2:30 AM does not exist).
            # The standard resolution is to shift forward by the DST offset (typically 1 hour).
            local_dt = local_tz.localize(naive_dt + pd.Timedelta(hours=1), is_dst=None)
            status = "Warning"
            notes = "NonExistentTimeError: Resolved by shifting time forward by 1 hour."

        except Exception as e:
            # Catch any other parsing or timezone errors.
            local_dt = pd.NaT
            status = "FAIL"
            notes = f"Unhandled exception: {str(e)}"

        # Convert the final localized datetime to UTC.
        utc_dt = pd.to_datetime(local_dt).tz_convert('UTC') if pd.notna(local_dt) else pd.NaT
        utc_timestamps.append(utc_dt)

        # Append detailed diagnostics for this event.
        diagnostics.append({
            "event_id": event_id,
            "original_local_datetime": naive_dt,
            "local_timezone": tz_str,
            "final_utc_timestamp": utc_dt,
            "conversion_status": status,
            "notes": notes
        })

    # --- Finalize and Validate ---
    output_df = announcement_df.copy()
    output_df['announcement_timestamp_utc'] = utc_timestamps

    # Validate that timestamps are not NaT.
    if output_df['announcement_timestamp_utc'].isna().any():
        raise ValueError("Failed to convert one or more timestamps to UTC.")

    # Create and return the diagnostic report.
    diagnostic_report = pd.DataFrame(diagnostics).set_index("event_id")

    return output_df, diagnostic_report


# =============================================================================
# TASK 3, STEP 3: MACROECONOMIC SERIES TRANSFORMATION PIPELINE
# =============================================================================

def prepare_macroeconomic_data(
    macro_df: pd.DataFrame,
    target_market: str
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Cleans and prepares raw macroeconomic data into a wide-format panel.

    Purpose:
        This function serves as the primary cleaning and structuring tool for
        raw, long-format macroeconomic data. Its sole responsibility is to
        produce a clean, validated, wide-format DataFrame where each series is
        in its original, untransformed level. It explicitly does NOT perform
        model-specific transformations like logarithms.

    Process:
        1.  **Filter**: Selects data for the specified `target_market`.
        2.  **Pivot**: Converts the data from long to wide format, with dates
            as the index and variables as columns.
        3.  **Interpolate**: Fills small gaps (<= 2 consecutive months) using
            linear interpolation to handle minor, sporadic missing data.
        4.  **Validate and Exclude**: Identifies any series that still contain
            gaps after interpolation. It issues a warning, excludes these
            incomplete series, and proceeds with the remaining valid data.
        5.  **Reindex**: Ensures the final DataFrame has a complete, uninterrupted
            monthly frequency from start to end.
        6.  **Standardize Names**: Renames columns to a consistent format,
            e.g., `CAN_gdp`, ensuring clarity for downstream tasks.

    Args:
        macro_df (pd.DataFrame): The raw, long-format macroeconomic data.
        target_market (str): The country/market code to process (e.g., 'CAN').

    Returns:
        Tuple[pd.DataFrame, List[str]]:
        - An analysis-ready, wide-format DataFrame containing only the valid
          series in their original levels.
        - A list of the names of any series that were dropped due to large gaps.
    """
    # --- 1. Input Validation and Filtering ---
    # Ensure the input DataFrame is not empty.
    if macro_df.empty:
        raise ValueError("Input 'macro_df' cannot be empty.")

    # Filter the dataset for the specified target market.
    df_market = macro_df[macro_df['country'] == target_market].copy()
    if df_market.empty:
        raise ValueError(f"No data found for target market '{target_market}'.")

    # --- 2. Pivot from Long to Wide Format ---
    # This transformation makes each time series a column.
    df_wide = df_market.pivot_table(
        index='date', columns='variable_name', values='value_raw'
    )
    # Ensure the index is a proper DatetimeIndex.
    df_wide.index = pd.to_datetime(df_wide.index)

    # --- 3. Interpolate Limited Gaps ---
    # Use linear interpolation to fill at most 2 consecutive missing months.
    df_interpolated = df_wide.interpolate(method='linear', limit=2, axis=0)

    # --- 4. Validate and Exclude Incomplete Series ---
    # Identify any series that still have missing values after the limited fill.
    incomplete_series: List[str] = df_interpolated.columns[df_interpolated.isna().any()].tolist()

    # If any series are found to be incomplete:
    if incomplete_series:
        # Issue a clear, actionable warning to the user.
        warnings.warn(
            f"The following series for market '{target_market}' contain gaps > 2 months and will be excluded "
            f"from the final dataset: {incomplete_series}",
            UserWarning
        )
        # Exclude the problematic series from the dataset.
        df_interpolated.drop(columns=incomplete_series, inplace=True)

    # Check if any columns remain after potentially dropping some.
    if df_interpolated.empty:
        raise ValueError(f"No complete time series remained for market '{target_market}' after validation.")

    # --- 5. Reindex to a Complete Monthly Date Range ---
    # This step ensures the time series has a consistent frequency with no missing rows.
    full_date_index = pd.date_range(
        start=df_interpolated.index.min(),
        end=df_interpolated.index.max(),
        freq='MS' # 'MS' for Month Start frequency is the standard convention.
    )
    df_reindexed = df_interpolated.reindex(full_date_index)

    # A final check for any NaNs that might have been introduced by reindexing.
    if df_reindexed.isna().sum().sum() > 0:
        raise ValueError("Data is not on a consistent monthly frequency; gaps detected after reindexing.")

    # --- 6. Standardize Column Naming ---
    # Create clean, predictable column names for the output DataFrame.
    # Example: 'gdp' becomes 'CAN_gdp'.
    df_reindexed.columns = [f"{target_market}_{col}" for col in df_reindexed.columns]

    # Return the final prepared DataFrame (in levels) and the list of any dropped series.
    return df_reindexed, incomplete_series


# =============================================================================
# IMPLEMENTATION OF `run_phase1_task3_preprocessing` (Task 3 Orchestrator)
# =============================================================================

def run_phase1_task3_preprocessing(
    raw_equity_df: pd.DataFrame,
    raw_rate_df: pd.DataFrame,
    raw_announcement_df: pd.DataFrame,
    raw_macro_df: pd.DataFrame,
    equity_anomaly_flags: pd.DataFrame,
    rate_anomaly_flags: pd.DataFrame,
    target_market: str
) -> Dict[str, Any]:
    """
    Orchestrates the data cleansing and preparation pipeline for Phase 1.

    Purpose:
        This master function for Phase 1 executes the complete data cleansing
        and preprocessing workflow. Its sole responsibility is to transform raw,
        validated input data into clean, consistently structured datasets ready
        for the subsequent analytical phases. It strictly separates this cleaning
        process from any model-specific numerical transformations.

    Process:
        1.  **Cleanse Tick Data**: Calls the `cleanse_tick_data` function for
            both the equity and rate futures data streams. This involves
            removing duplicates, filtering by trade type and volume, and removing
            identified outliers.
        2.  **Standardize Timestamps**: Calls the `standardize_announcement_timestamps`
            function to convert all local announcement times to robust, timezone-
            aware UTC timestamps, correctly handling all DST-related edge cases.
        3.  **Prepare Macro Data**: Calls the `prepare_macroeconomic_data` function
            to convert the long-format raw macro data into a clean, wide-format,
            interpolated DataFrame. This version of the function prepares the
            data in its original levels, without applying any log transformations.
        4.  **Compile Outputs**: Gathers all the cleansed DataFrames and their
            associated audit trail metadata into a single, well-structured
            dictionary for handoff to the next phase of the pipeline.

    Args:
        raw_equity_df (pd.DataFrame): Raw high-frequency equity data.
        raw_rate_df (pd.DataFrame): Raw high-frequency rate data.
        raw_announcement_df (pd.DataFrame): Raw announcement metadata.
        raw_macro_df (pd.DataFrame): Raw macroeconomic time series data.
        equity_anomaly_flags (pd.DataFrame): Anomaly flags for equity data from Task 2.
        rate_anomaly_flags (pd.DataFrame): Anomaly flags for rate data from Task 2.
        target_market (str): The market to study (e.g., 'CAN').

    Returns:
        Dict[str, Any]: A dictionary containing all the cleansed and prepared
                        DataFrames and their associated audit trails. The macro
                        data is explicitly marked as being in 'levels'.
    """
    # --- 1. Cleanse High-Frequency Tick Data ---
    # Execute the cleansing pipeline for the equity tick data.
    clean_equity_df, equity_audit = cleanse_tick_data(
        raw_tick_df=raw_equity_df,
        anomaly_flags=equity_anomaly_flags
    )

    # Execute the cleansing pipeline for the interest rate tick data.
    clean_rate_df, rate_audit = cleanse_tick_data(
        raw_tick_df=raw_rate_df,
        anomaly_flags=rate_anomaly_flags
    )

    # --- 2. Standardize Announcement Timestamps ---
    # Convert all local announcement times to unambiguous UTC timestamps.
    clean_announcement_df, timestamp_audit = standardize_announcement_timestamps(
        announcement_df=raw_announcement_df
    )

    # --- 3. Prepare Macroeconomic Data (to Levels) ---
    # This call now correctly omits the transformation map. The function's
    # responsibility is only to clean, pivot, interpolate, and validate the macro data.
    prepared_macro_df_levels, dropped_macro_series = prepare_macroeconomic_data(
        macro_df=raw_macro_df,
        target_market=target_market
    )

    # --- 4. Compile and Return All Results ---
    # The output dictionary is structured to clearly distinguish between the
    # cleansed data and the audit trails generated during the process.
    results = {
        "clean_equity_df": clean_equity_df,
        "equity_cleaning_audit": equity_audit,
        "clean_rate_df": clean_rate_df,
        "rate_cleaning_audit": rate_audit,
        "clean_announcement_df": clean_announcement_df,
        "timestamp_conversion_audit": timestamp_audit,
        "prepared_macro_df_levels": prepared_macro_df_levels,
        "dropped_macro_series": dropped_macro_series
    }

    return results


In [None]:
# Task 4: Announcement Window Definition and Data Extraction

# =============================================================================
# TASK 4, STEP 1: PRECISE EVENT WINDOW TEMPORAL BOUNDARY CONSTRUCTION
# =============================================================================

def construct_event_windows(
    clean_announcement_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Constructs and merges temporal boundaries for monetary policy announcements.

    Purpose:
        This function defines the precise start (`t_before`) and end (`t_after`)
        of the high-frequency analysis window around each event. It is a
        critical step for isolating the market reaction. This implementation
        includes a robust mechanism to handle rare cases where event windows
        for different announcements overlap, merging them into a single,
        continuous window to avoid analytical ambiguity.

    Process:
        1.  **Parameter Extraction**: Retrieves `window_minutes_before` and
            `window_minutes_after` from the configuration dictionary.
        2.  **Initial Boundary Calculation**: For each event's UTC timestamp, it
            vectorially calculates the initial `t_before` and `t_after` boundaries.
        3.  **Overlap Group Identification**:
            - The events are sorted chronologically.
            - It identifies the start of each new, non-overlapping block of events.
              A new block begins if an event's start time (`t_before`) is after
              the previous event's end time (`t_after`).
            - A cumulative sum over this "new block" indicator creates a unique
              group ID for each set of contiguous, overlapping windows.
        4.  **Window Union**:
            - It groups the events by this new overlap group ID.
            - Within each group, it calculates the union of the windows by
              finding the minimum of all `t_before` times and the maximum of
              all `t_after` times.
            - `groupby().transform()` is used to broadcast these unionized
              boundaries back to every event within the overlapping group.
        5.  **Validation and Finalization**: The final DataFrame is returned,
            indexed by `event_id`, with consistent and non-overlapping windows.

    Args:
        clean_announcement_df (pd.DataFrame): The processed announcement data
                                              containing the essential
                                              'announcement_timestamp_utc' and
                                              'event_id' columns.
        config (Dict[str, Any]): The study's main configuration dictionary.

    Returns:
        pd.DataFrame: A new DataFrame indexed by 'event_id' containing the
                      announcement timestamp and its final, potentially merged,
                      `t_before` and `t_after` boundaries.
    """
    # --- 1. Input Validation and Parameter Extraction ---
    # Ensure the required column is present.
    if 'announcement_timestamp_utc' not in clean_announcement_df.columns:
        raise KeyError("The 'announcement_timestamp_utc' column is missing from the announcement DataFrame.")

    # Retrieve window parameters from the configuration dictionary.
    params: Dict[str, Any] = config.get("identification_params", {})
    minutes_before: int = params.get("window_minutes_before", 10)
    minutes_after: int = params.get("window_minutes_after", 20)

    # --- 2. Initial Boundary Calculation ---
    # Create a working copy with the necessary columns.
    windows_df = clean_announcement_df[['event_id', 'announcement_timestamp_utc']].copy()

    # Calculate the 'before' timestamp boundary for the window.
    windows_df['t_before'] = windows_df['announcement_timestamp_utc'] - pd.Timedelta(minutes=minutes_before)

    # Calculate the 'after' timestamp boundary for the window.
    windows_df['t_after'] = windows_df['announcement_timestamp_utc'] + pd.Timedelta(minutes=minutes_after)

    # --- 3. Overlap Group Identification ---
    # Sort the DataFrame chronologically to compare adjacent events.
    windows_df.sort_values('announcement_timestamp_utc', inplace=True)

    # An overlap occurs if the start of the current window is before the end of the previous one.
    # We identify the START of a NEW, non-overlapping block.
    is_new_block: pd.Series = windows_df['t_before'] > windows_df['t_after'].shift(1)

    # The very first event always starts a new block.
    is_new_block.iloc[0] = True

    # The cumulative sum of this boolean series creates a unique ID for each contiguous block of overlapping events.
    overlap_group_id: pd.Series = is_new_block.cumsum()

    # --- 4. Window Union ---
    # If there are any overlaps, the number of unique groups will be less than the number of events.
    if overlap_group_id.nunique() < len(windows_df):
        # For each group, find the earliest start time and latest end time.
        # `groupby().transform()` broadcasts the result of the aggregation (min/max)
        # back to every row within the group, effectively applying the union.
        union_t_before = windows_df.groupby(overlap_group_id)['t_before'].transform('min')
        union_t_after = windows_df.groupby(overlap_group_id)['t_after'].transform('max')

        # Overwrite the initial boundaries with the new, merged boundaries.
        windows_df['t_before'] = union_t_before
        windows_df['t_after'] = union_t_after

    # --- 5. Validation and Finalization ---
    # Final check for logical consistency.
    if not (windows_df['t_before'] < windows_df['announcement_timestamp_utc']).all() or \
       not (windows_df['announcement_timestamp_utc'] < windows_df['t_after']).all():
        raise ValueError("Window boundary inconsistency detected after processing overlaps.")

    # Set event_id as the index for efficient lookups and return.
    # The DataFrame is re-indexed to its original order if sorting changed it.
    return windows_df.set_index('event_id').reindex(clean_announcement_df['event_id'])


# =============================================================================
# TASK 4, STEPS 2 & 3: PRE/POST-EVENT PRICE EXTRACTION
# =============================================================================

def _extract_price_with_fallback(
    event_timestamps: pd.Series,
    tick_df: pd.DataFrame,
    direction: str,
    primary_tolerance: pd.Timedelta,
    secondary_tolerance: pd.Timedelta
) -> pd.DataFrame:
    """
    Extracts prices using a two-tiered fallback search with `pd.merge_asof`.

    Purpose:
        This is a high-performance helper function designed to find the nearest
        tick to a series of event timestamps. It is the core engine for the
        pre- and post-event price extraction. Its key feature is a robust,
        two-tiered search strategy to handle cases of low liquidity around an
        event.

    Process:
        1.  **Validation**: It first performs a critical check to ensure the
            input `tick_df` is sorted by timestamp, as this is a prerequisite
            for `pd.merge_asof` to function correctly and efficiently.
        2.  **Tier 1 (Primary Search)**: It executes a `pd.merge_asof` to find
            the nearest tick for each event timestamp within the narrow
            `primary_tolerance`. This is the ideal case, finding a price very
            close to the window boundary.
        3.  **Identify Failures**: It identifies all events for which the
            primary search failed to find a tick (resulting in a `NaN` price).
        4.  **Tier 2 (Secondary Search)**: If any failures occurred, it re-runs
            `pd.merge_asof` *only on the failed events* using the much wider
            `secondary_tolerance`. This is the fallback mechanism.
        5.  **Combine Results**: The results from the successful secondary
            search are used to update the records that failed the primary search.
        6.  **Audit**: A 'search_method' column is populated to explicitly
            document which search tier was successful ('primary' or 'secondary')
            or if both failed ('failed') for each event.

    Args:
        event_timestamps (pd.Series): A Series of UTC timestamps representing
                                      the search points (e.g., all `t_before`
                                      or `t_after` values).
        tick_df (pd.DataFrame): The cleansed tick data, containing at least
                                'timestamp_micros_utc' and 'price' columns.
                                MUST be pre-sorted by timestamp.
        direction (str): The direction for the as-of search. Must be one of
                         'backward' (for pre-event prices), 'forward' (for
                         post-event), or 'nearest'.
        primary_tolerance (pd.Timedelta): The maximum time gap allowed for the
                                          initial, high-precision search.
        secondary_tolerance (pd.Timedelta): The wider time gap allowed for the
                                            fallback search if the primary
                                            search fails.

    Returns:
        pd.DataFrame: A DataFrame with the same index as `event_timestamps`,
                      containing the extracted price, the timestamp of that
                      price, and metadata about the search method used.

    Raises:
        ValueError: If the input `tick_df` is not sorted by timestamp.
    """
    # --- Input Validation ---
    # `pd.merge_asof` relies on sorted data for its binary search algorithm.
    # This check prevents incorrect results and is a critical safeguard.
    if not tick_df['timestamp_micros_utc'].is_monotonic_increasing:
        # Raise an error if the precondition is not met.
        raise ValueError("Tick data must be sorted by timestamp for extraction.")

    # --- Data Preparation ---
    # Convert the input Series of event timestamps into a DataFrame, which is the
    # required format for the 'left' table in `pd.merge_asof`.
    events: pd.DataFrame = event_timestamps.to_frame(name='timestamp_micros_utc')

    # --- Tier 1: Primary Search Window ---
    # Execute the primary as-of merge. This is a highly optimized operation.
    # 'direction' controls whether we look for the last tick before ('backward')
    # or the first tick after ('forward') the event timestamp.
    # 'tolerance' defines the maximum distance in time to look.
    merged_primary: pd.DataFrame = pd.merge_asof(
        left=events,
        right=tick_df,
        on='timestamp_micros_utc',
        direction=direction,
        tolerance=primary_tolerance
    )
    # Annotate the results with the search method used for auditing.
    merged_primary['search_method'] = 'primary'

    # --- Tier 2: Secondary (Fallback) Search Window ---
    # Identify events where the primary search yielded no result (price is NaN).
    failed_primary_mask: pd.Series = merged_primary['price'].isna()

    # Proceed with the fallback only if there were failures to avoid unnecessary work.
    if failed_primary_mask.any():
        # Isolate the events that need a second search attempt.
        failed_events: pd.DataFrame = merged_primary.loc[failed_primary_mask, ['timestamp_micros_utc']]

        # Rerun `pd.merge_asof` on the subset of failed events, but with the wider
        # secondary tolerance, increasing the chance of finding a match.
        merged_secondary: pd.DataFrame = pd.merge_asof(
            left=failed_events,
            right=tick_df,
            on='timestamp_micros_utc',
            direction=direction,
            tolerance=secondary_tolerance
        )
        # Annotate these fallback results as 'secondary'.
        merged_secondary['search_method'] = 'secondary'

        # Use the index of the failed events to align and update the original
        # `merged_primary` DataFrame with the new results from the secondary search.
        # This efficiently patches the missing values without a slow loop.
        merged_primary.update(merged_secondary)

    # --- Final Auditing ---
    # For any events that still have a NaN price after both attempts, mark their
    # search method as 'failed' for a complete and clear audit trail.
    merged_primary['search_method'] = merged_primary['search_method'].where(
        merged_primary['price'].notna(), 'failed'
    )

    # --- Rename Columns for Clarity ---
    # Rename the merged columns to be specific about their origin (pre- or post-event)
    # to prevent name collisions when this function's output is joined later.
    # The original event timestamp is now redundant and is implicitly the index.
    # The timestamp from the tick data is renamed to `timestamp_{direction}`.
    return merged_primary.rename(
        columns={
            'price': f'price_{direction}',
            'timestamp_micros_utc_y': f'timestamp_{direction}'
        }
    )


def extract_window_prices(
    event_windows_df: pd.DataFrame,
    clean_tick_df: pd.DataFrame,
    stabilization_ticks: int = 4,
    stabilization_threshold: float = 0.01,
    max_stabilization_attempts: int = 100
) -> pd.DataFrame:
    """
    Extracts pre- and post-announcement prices from high-frequency data.

    Purpose:
        For each event, this function finds the last traded price just before
        the analysis window (`price_before`) and the first stable price just
        after the window (`price_after`). This is the core data extraction step
        for calculating high-frequency surprises.

    Process:
        1.  **Pre-Event Price**:
            - Uses `pd.merge_asof` for a highly efficient "as-of" join to find
              the last trade at or before `t_before`.
            - Implements a two-tiered fallback: first searches within the
              standard window, then extends the search to 2 hours before if
              no trade is found.
        2.  **Post-Event Price**:
            - Finds the first trade at or after `t_after`.
            - **Stabilization Check**: After finding an initial price, it
              inspects the next N-1 trades (default 3). If the price range
              (max/min - 1) of these N trades exceeds a threshold (default 1%),
              the initial price is discarded, and the check is repeated starting
              from the next trade. This continues until a stable price is found
              or a limit is reached.
        3.  **Metadata**: Returns a rich DataFrame with extracted prices and
            detailed metadata on the search method, stabilization status, and
            timestamps for full auditability.

    Args:
        event_windows_df (pd.DataFrame): DataFrame of event windows from
                                         `construct_event_windows`.
        clean_tick_df (pd.DataFrame): The cleansed high-frequency tick data.
        stabilization_ticks (int): Number of ticks (N) to check for stability.
        stabilization_threshold (float): Max allowed price range for stability.
        max_stabilization_attempts (int): Max iterations for the stabilization search.

    Returns:
        pd.DataFrame: A DataFrame indexed by `event_id` with columns for pre-
                      and post-event prices and extensive metadata.
    """
    # --- Pre-computation and Sorting ---
    # Sorting is critical for the performance of merge_asof and searchsorted.
    ticks = clean_tick_df.sort_values('timestamp_micros_utc').reset_index(drop=True)

    # --- Pre-Event Price Extraction ---
    pre_event_prices = _extract_price_with_fallback(
        event_timestamps=event_windows_df['t_before'],
        tick_df=ticks[['timestamp_micros_utc', 'price']],
        direction='backward',
        primary_tolerance=pd.Timedelta(minutes=10), # Standard window
        secondary_tolerance=pd.Timedelta(hours=2)   # Extended fallback
    )
    pre_event_prices = pre_event_prices.set_index(event_windows_df.index)

    # --- Post-Event Price Extraction with Stabilization ---
    post_event_results = []
    # Find the index of the first tick at or after each t_after.
    # searchsorted is a highly optimized binary search.
    start_indices = ticks['timestamp_micros_utc'].searchsorted(event_windows_df['t_after'], side='left')

    for event_id, start_idx in zip(event_windows_df.index, start_indices):
        attempts = 0
        stable_price_found = False

        current_idx = start_idx

        # Iteratively search for a stable price.
        while not stable_price_found and attempts < max_stabilization_attempts and (current_idx + stabilization_ticks) <= len(ticks):
            # Define the slice of ticks to check for stability.
            window_slice = ticks.iloc[current_idx : current_idx + stabilization_ticks]

            # Calculate the price range within the stabilization window.
            min_p, max_p = window_slice['price'].min(), window_slice['price'].max()
            price_range = (max_p / min_p - 1) if min_p > 0 else float('inf')

            # Check if the price is stable.
            if price_range < stabilization_threshold:
                stable_price_found = True
                post_event_results.append({
                    'event_id': event_id,
                    'price_forward': window_slice['price'].iloc[0],
                    'timestamp_forward': window_slice['timestamp_micros_utc'].iloc[0],
                    'stabilization_status': 'stable',
                    'stabilization_attempts': attempts + 1
                })
            else:
                # If not stable, advance to the next tick and try again.
                current_idx += 1
                attempts += 1

        # If the loop finishes without finding a stable price, record the failure.
        if not stable_price_found:
            price = ticks['price'].iloc[start_idx] if start_idx < len(ticks) else np.nan
            timestamp = ticks['timestamp_micros_utc'].iloc[start_idx] if start_idx < len(ticks) else pd.NaT
            post_event_results.append({
                'event_id': event_id,
                'price_forward': price,
                'timestamp_forward': timestamp,
                'stabilization_status': 'unstable' if start_idx < len(ticks) else 'no_trade_found',
                'stabilization_attempts': attempts
            })

    post_event_prices = pd.DataFrame(post_event_results).set_index('event_id')

    # --- Combine Results ---
    # Join the pre- and post-event results back to the main event window frame.
    final_results = event_windows_df.join(pre_event_prices, how='left').join(post_event_prices, how='left')

    return final_results


# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 4
# =============================================================================

def run_phase2_task4_data_extraction(
    clean_announcement_df: pd.DataFrame,
    clean_equity_df: pd.DataFrame,
    clean_rate_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the full data extraction pipeline for Task 4.

    This master function first defines the event windows and then extracts the
    pre- and post-event prices for both the equity and interest rate series,
    returning a dictionary of the results.

    Args:
        clean_announcement_df (pd.DataFrame): Cleansed announcement metadata.
        clean_equity_df (pd.DataFrame): Cleansed high-frequency equity data.
        clean_rate_df (pd.DataFrame): Cleansed high-frequency rate data.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary containing the event windows
                                 and the price extraction results for both
                                 equity and rate instruments.
    """
    # --- Step 1: Construct Event Windows ---
    # This is done once and serves as the basis for all extractions.
    event_windows = construct_event_windows(clean_announcement_df, config)

    # --- Step 2 & 3: Extract Prices for Equity Series ---
    equity_prices = extract_window_prices(
        event_windows_df=event_windows,
        clean_tick_df=clean_equity_df,
        # Parameters can be exposed or read from config if needed
        stabilization_ticks=4,
        stabilization_threshold=0.01,
        max_stabilization_attempts=100
    )

    # --- Step 2 & 3: Extract Prices for Rate Series ---
    rate_prices = extract_window_prices(
        event_windows_df=event_windows,
        clean_tick_df=clean_rate_df,
        stabilization_ticks=4,
        stabilization_threshold=0.01, # Threshold might differ for rates
        max_stabilization_attempts=100
    )

    # --- Aggregate and return all results ---
    results = {
        "event_windows": event_windows,
        "equity_price_extraction_results": equity_prices,
        "rate_price_extraction_results": rate_prices
    }

    return results


In [None]:
# Task 5: Raw Surprise Calculation

# =============================================================================
# TASK 5, STEP 1: INTEREST RATE SURPRISE COMPUTATION
# =============================================================================

def calculate_rate_surprises(
    price_extraction_results: pd.DataFrame,
    announcement_df: pd.DataFrame,
    contract_specifications: Dict[str, Dict[str, Any]]
) -> pd.Series:
    """
    Computes interest rate surprises using explicit, configuration-driven logic.

    Purpose:
        This function translates the pre- and post-event prices of interest
        rate futures into standardized monetary policy surprises, measured in
        basis points. It is designed to be robust and extensible by decoupling
        the calculation logic from the specific quoting conventions of financial
        instruments, which are provided via a configuration dictionary.

    Process:
        1.  **Input Validation**: Checks that the contract specification dictionary
            contains entries for all central banks present in the data.
        2.  **Merge Data**: Joins price data with announcement metadata to access
            the `central_bank` for each event.
        3.  **Map Specifications**: Maps the `central_bank` of each event to its
            corresponding `convention` and `multiplier` from the provided
            `contract_specifications` dictionary.
        4.  **Conditional Calculation**: Uses vectorized operations (`np.select`)
            to apply the correct formula based on the mapped 'convention':
            - **'100_minus_rate'**: For contracts like Fed Funds Futures, where
              Price = 100 - Rate. The surprise is calculated as:
              `s_rate = -multiplier * (P_after - P_before)`
            - **'yield_based'**: For contracts priced as a yield or index. The
              surprise is calculated as a basis point change:
              `s_rate = multiplier * (P_after - P_before) / P_before`
        5.  **Safety Checks**: Ensures division-by-zero is avoided for the
            'yield_based' convention.

    Args:
        price_extraction_results (pd.DataFrame): DataFrame from Task 4 with
                                                  'price_backward' and
                                                  'price_forward' columns, indexed
                                                  by 'event_id'.
        announcement_df (pd.DataFrame): The clean announcement metadata, which
                                        contains the 'central_bank' and 'event_id'
                                        columns.
        contract_specifications (Dict[str, Dict[str, Any]]): A dictionary
            mapping each central bank to its instrument's properties.
            Example: {'FED': {'convention': '100_minus_rate', 'multiplier': 100}}

    Returns:
        pd.Series: A Series of calculated interest rate surprises in basis
                   points, indexed by 'event_id'.

    Raises:
        KeyError: If a central bank in the data is not found in the
                  `contract_specifications` dictionary.
        ValueError: If input DataFrames are empty.
    """
    # --- Input Validation ---
    if price_extraction_results.empty or announcement_df.empty:
        raise ValueError("Input DataFrames cannot be empty.")

    # Check that all central banks in the data have a defined specification.
    banks_in_data = set(announcement_df['central_bank'].unique())
    banks_in_spec = set(contract_specifications.keys())
    if not banks_in_data.issubset(banks_in_spec):
        missing_banks = banks_in_data - banks_in_spec
        raise KeyError(f"Contract specifications are missing for the following central banks: {sorted(list(missing_banks))}")

    # --- Data Preparation ---
    # Merge central bank information onto the price data using the event_id index.
    data = price_extraction_results.join(
        announcement_df.set_index('event_id')[['central_bank']],
        how='left'
    )

    # --- Map Specifications to each event ---
    # Create new columns in the DataFrame for convention and multiplier.
    data['convention'] = data['central_bank'].map(lambda x: contract_specifications[x]['convention'])
    data['multiplier'] = data['central_bank'].map(lambda x: contract_specifications[x]['multiplier'])

    # --- Conditional Surprise Calculation ---
    # Define the conditions based on the mapped 'convention'.
    conditions = [
        data['convention'] == '100_minus_rate',
        data['convention'] == 'yield_based'
    ]

    # Define the calculation corresponding to each condition.
    # Calculation for '100_minus_rate' convention.
    calc_100_minus_rate = -data['multiplier'] * (data['price_forward'] - data['price_backward'])

    # Calculation for 'yield_based' convention, with a safety check for division by zero.
    safe_price_backward = data['price_backward'].where(data['price_backward'] > 1e-9, np.nan)
    calc_yield_based = data['multiplier'] * (data['price_forward'] - safe_price_backward) / safe_price_backward

    # Create the list of calculation vectors.
    calculations = [
        calc_100_minus_rate,
        calc_yield_based
    ]

    # Use np.select for efficient, vectorized conditional computation.
    # If an event's convention matches none of the conditions, the result will be NaN.
    surprises = np.select(conditions, calculations, default=np.nan)

    # Return the final result as a pandas Series with the correct name and index.
    return pd.Series(surprises, index=data.index, name='rate_surprise_bps')


# =============================================================================
# TASK 5, STEP 2: EQUITY PRICE SURPRISE LOG RETURN CALCULATION
# =============================================================================

def calculate_equity_surprises(
    price_extraction_results: pd.DataFrame,
    outlier_threshold_pct: float = 20.0
) -> Tuple[pd.Series, pd.Series]:
    """
    Computes equity price surprises using log returns.

    Purpose:
        This function calculates the continuously compounded return on an
        equity index future across the event window. This serves as the measure
        of the equity market's reaction to the announcement.

    Process:
        1.  **Log Return Formula**: Implements the standard formula for log returns:
            `s_equity = 100 * log(price_after / price_before)`
        2.  **Safety Checks**: The calculation is wrapped in a `np.where` clause
            to ensure both pre- and post-event prices are strictly positive,
            preventing mathematical errors with the logarithm.
        3.  **Outlier Flagging**: It identifies and flags any calculated surprises
            whose absolute value exceeds a large, pre-defined threshold,
            indicating a potential data error or an extremely volatile event
            that may warrant manual review.

    Args:
        price_extraction_results (pd.DataFrame): DataFrame from Task 4 with
                                                  'price_backward' and
                                                  'price_forward' columns.
        outlier_threshold_pct (float): The absolute percentage change above
                                       which a surprise is flagged as an outlier.

    Returns:
        Tuple[pd.Series, pd.Series]:
        - A Series of calculated equity surprises in percentage points,
          indexed by 'event_id'.
        - A boolean Series of the same index, where `True` indicates a
          surprise that exceeded the outlier threshold.
    """
    # --- Input Validation ---
    if price_extraction_results.empty:
        raise ValueError("Input DataFrame cannot be empty.")

    # --- Log Return Calculation ---
    # Alias prices for clarity.
    price_before = price_extraction_results['price_backward']
    price_after = price_extraction_results['price_forward']

    # Create a mask to identify valid prices for the log operation.
    valid_prices_mask = (price_before > 0) & (price_after > 0) & price_before.notna() & price_after.notna()

    # Initialize surprise series with NaNs.
    surprises = pd.Series(np.nan, index=price_extraction_results.index)

    # Equation: s_equity = 100 * ln(P_after / P_before)
    # Apply the log return formula only where prices are valid.
    surprises.loc[valid_prices_mask] = 100 * np.log(
        price_after[valid_prices_mask] / price_before[valid_prices_mask]
    )

    # --- Outlier Flagging ---
    # Identify surprises with an absolute magnitude greater than the threshold.
    outlier_flags = surprises.abs() > outlier_threshold_pct

    return surprises.rename('equity_surprise_pct'), outlier_flags.rename('is_outlier')


# =============================================================================
# TASK 5, STEP 3: SURPRISE VECTOR CONSTRUCTION AND QUALITY CONTROL
# =============================================================================

def construct_surprise_vectors(
    rate_surprises: pd.Series,
    equity_surprises: pd.Series,
    announcement_df: pd.DataFrame
) -> Tuple[Dict[str, np.ndarray], pd.DataFrame]:
    """
    Assembles surprise vectors and performs a final quality control filter.

    Purpose:
        This function combines the individual rate and equity surprises into
        2x1 vectors for each event. It then applies a statistical outlier
        filter and structures the final, clean data into NumPy arrays ready
        for the rotational decomposition analysis.

    Process:
        1.  **Combine Surprises**: Merges the rate and equity surprise Series
            into a single DataFrame, aligned by `event_id`.
        2.  **Handle Missing Data**: Drops any events for which either surprise
            could not be calculated.
        3.  **5-Sigma Filter**: For each central bank's set of announcements
            separately, it calculates the standard deviation of each surprise
            type. It then removes any event where *either* the rate or equity
            surprise exceeds 5 times its respective standard deviation.
        4.  **Structure Output**: Groups the cleaned surprises by central bank
            and converts them into `(N, 2)` NumPy arrays, stored in a dictionary.
        5.  **Audit**: Returns a DataFrame containing all the outliers that
            were removed during the filtering process for full transparency.

    Args:
        rate_surprises (pd.Series): Series of rate surprises.
        equity_surprises (pd.Series): Series of equity surprises.
        announcement_df (pd.DataFrame): Clean announcement metadata to get
                                        `central_bank` info.

    Returns:
        Tuple[Dict[str, np.ndarray], pd.DataFrame]:
        - A dictionary where keys are central bank names ('FED', 'ECB') and
          values are (N, 2) NumPy arrays of their surprise vectors.
        - A DataFrame containing the surprise data for all events that were
          filtered out as outliers.
    """
    # --- Combine Surprises into a single DataFrame ---
    df = pd.concat([rate_surprises, equity_surprises], axis=1)
    df = df.join(announcement_df.set_index('event_id')[['central_bank']], how='left')

    # --- Handle Missing Data ---
    # Drop events where either surprise is missing before statistical calculations.
    clean_df = df.dropna()
    if clean_df.empty:
        raise ValueError("No complete surprise observations remaining after dropping NaNs.")

    # --- 5-Sigma Outlier Filter (per Central Bank) ---
    # Calculate the standard deviation for each surprise type, grouped by central bank.
    # .transform() broadcasts the result back to the original shape for easy comparison.
    std_devs = clean_df.groupby('central_bank')[['rate_surprise_bps', 'equity_surprise_pct']].transform('std')

    # Define the outlier condition: absolute surprise > 5 * standard deviation.
    is_outlier = (clean_df[['rate_surprise_bps', 'equity_surprise_pct']].abs() > 5 * std_devs).any(axis=1)

    # --- Separate clean data from outliers ---
    outliers_df = clean_df[is_outlier].copy()
    final_df = clean_df[~is_outlier].copy()

    # --- Structure Output ---
    surprise_vectors = {}
    # Group the final clean DataFrame by central bank.
    for bank, group in final_df.groupby('central_bank'):
        # For each bank, extract the two surprise columns and convert to a NumPy array.
        # This creates the S_t = [s_rate, s_equity]' vectors.
        surprise_vectors[bank] = group[['rate_surprise_bps', 'equity_surprise_pct']].values

    return surprise_vectors, outliers_df


# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 5
# =============================================================================

def run_phase2_task5_surprise_calculation(
    equity_price_results: pd.DataFrame,
    rate_price_results: pd.DataFrame,
    clean_announcement_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Orchestrates the full surprise calculation pipeline for Task 5.

    This master function executes the calculation and quality control steps
    to produce the final, analysis-ready surprise vectors.

    Args:
        equity_price_results (pd.DataFrame): Price extraction results for equity.
        rate_price_results (pd.DataFrame): Price extraction results for rates.
        clean_announcement_df (pd.DataFrame): Cleansed announcement metadata.

    Returns:
        Dict[str, Any]: A dictionary containing the final surprise vectors,
                        the outliers removed, and intermediate results for
                        auditing.
    """
    # --- Step 1: Calculate Interest Rate Surprises ---
    rate_surprises = calculate_rate_surprises(
        price_extraction_results=rate_price_results,
        announcement_df=clean_announcement_df
    )

    # --- Step 2: Calculate Equity Surprises ---
    equity_surprises, equity_outlier_flags = calculate_equity_surprises(
        price_extraction_results=equity_price_results
    )

    # --- Step 3: Construct Final Surprise Vectors ---
    surprise_vectors, statistical_outliers = construct_surprise_vectors(
        rate_surprises=rate_surprises,
        equity_surprises=equity_surprises,
        announcement_df=clean_announcement_df
    )

    # --- Aggregate and return all results ---
    results = {
        "surprise_vectors": surprise_vectors,
        "statistical_outliers_removed": statistical_outliers,
        "intermediate_results": {
            "rate_surprises_raw": rate_surprises,
            "equity_surprises_raw": equity_surprises,
            "equity_extreme_value_flags": equity_outlier_flags
        }
    }

    return results


In [None]:
# Task 6: Rotational-Angle Decomposition Implementation

# =============================================================================
# CUSTOM EXCEPTION FOR THIS TASK
# =============================================================================

class IdentificationError(Exception):
    """
    Custom exception raised for failures in the structural shock identification.

    Purpose:
        This exception is raised when the rotational-angle decomposition
        method fails to identify a set of structural shocks that satisfy the
        theoretical sign restrictions. This is a critical failure in the
        econometric methodology, indicating that the data and the theoretical
        assumptions are inconsistent.

    Process:
        This exception should be raised by the `identify_structural_shocks`
        function if, after searching through all possible rotation angles, no
        angle produces a candidate monetary policy shock that is simultaneously
        positively correlated with interest rate surprises and negatively
        correlated with equity surprises. The error message should clearly
        state which central bank's data failed the identification procedure.
        It extends Python's base `Exception` class and serves as a specific,
        catchable error that signals a fundamental problem with the analysis,
        requiring researcher intervention.
    """
    pass

# =============================================================================
# TASK 6, STEPS 1-3: ROTATIONAL-ANGLE DECOMPOSITION
# =============================================================================

def _generate_rotation_matrices(
    num_angles: int
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates a grid of angles and their corresponding 2D rotation matrices.

    This helper creates a 3D array where each slice along the first axis is a
    2x2 orthogonal rotation matrix for an angle in the grid.

    Args:
        num_angles (int): The number of angles to generate on the grid
                          (e.g., 999).

    Returns:
        Tuple[np.ndarray, np.ndarray]:
        - A 1D array of angles `theta` of shape (num_angles,).
        - A 3D array of rotation matrices `Q` of shape (num_angles, 2, 2).
    """
    # Create a grid of `num_angles` equally spaced angles from 0 to 2*pi.
    theta_grid = np.linspace(0, 2 * np.pi, num_angles)

    # Compute cosines and sines for all angles in a single vectorized operation.
    cos_theta = np.cos(theta_grid)
    sin_theta = np.sin(theta_grid)

    # Pre-allocate the 3D array for all rotation matrices for efficiency.
    rotation_matrices = np.zeros((num_angles, 2, 2))

    # Assemble the rotation matrices using the pre-computed trig values.
    # Equation: Q(theta) = [[cos(theta), -sin(theta)], [sin(theta), cos(theta)]]
    rotation_matrices[:, 0, 0] = cos_theta
    rotation_matrices[:, 0, 1] = -sin_theta
    rotation_matrices[:, 1, 0] = sin_theta
    rotation_matrices[:, 1, 1] = cos_theta

    return theta_grid, rotation_matrices


def _find_admissible_rotations(
    surprise_vectors: np.ndarray,
    rotation_matrices: np.ndarray,
    method: str = 'correlation',
    pass_rate_threshold: float = 0.60
) -> np.ndarray:
    """
    Identifies admissible rotation angles based on specified sign restrictions.

    Purpose:
        This is the core identification function. It tests each possible rotation
        of the reduced-form surprises against a set of theoretical sign
        restrictions to find the subset of rotations that are consistent with a
        structural monetary policy shock.

    Process:
        1.  **Rotate Surprises**: It first performs a batched matrix multiplication
            to apply every rotation matrix to the surprise vectors, generating a
            full set of candidate structural shocks.
            Equation: `epsilon(theta) = Q(theta) * S'`
        2.  **Select Method**: Based on the `method` parameter, it proceeds with
            one of two validation algorithms:
            a. **'correlation' (Default)**:
               - It calculates the Pearson correlation between the candidate
                 monetary policy shock and each of the original surprise series
                 in a fully vectorized and exact manner.
               - An angle is admissible if:
                 `rho(epsilon_mp, s_rate) > 0` AND `rho(epsilon_mp, s_equity) < 0`.
            b. **'observation_count'**:
               - For each observation, it checks if the co-movement between the
                 candidate shock and the surprises has the correct sign.
               - It calculates the percentage of observations for which both
                 sign restrictions hold simultaneously.
               - An angle is admissible if this "pass rate" is greater than or
                 equal to `pass_rate_threshold` (e.g., 60%).
        3.  **Return Mask**: It returns a boolean array indicating which angles
            in the original grid are admissible.

    Args:
        surprise_vectors (np.ndarray): An (N, 2) array of [rate, equity] surprises.
        rotation_matrices (np.ndarray): A (G, 2, 2) array of rotation matrices.
        method (str): The identification method to use. Must be either
                      'correlation' or 'observation_count'.
        pass_rate_threshold (float): The minimum pass rate for the
                                     'observation_count' method.

    Returns:
        np.ndarray: A 1D boolean array of length G, where `True` indicates an
                    admissible rotation angle.

    Raises:
        ValueError: If an unsupported `method` is provided.
    """
    # --- 1. Input Validation ---
    if method not in ['correlation', 'observation_count']:
        raise ValueError(f"Unsupported method '{method}'. Must be 'correlation' or 'observation_count'.")

    # --- 2. Vectorized Rotation of Surprises ---
    # Equation: epsilon(theta) = Q(theta) * S'
    # Use np.einsum for efficient batched matrix multiplication.
    # 'gij,jk->gik' means: for each grid point g, multiply the (i,j) matrix Q
    # with the (j,k) matrix S.T, resulting in a (g,i,k) matrix.
    # Result shape: (num_angles, 2, num_events)
    rotated_shocks: np.ndarray = np.einsum('gij,jk->gik', rotation_matrices, surprise_vectors.T)

    # The candidate monetary policy shock is the first row of each rotated matrix.
    mp_shocks: np.ndarray = rotated_shocks[:, 0, :]  # Shape: (num_angles, num_events)

    # --- 3. Apply Identification Method ---
    if method == 'correlation':
        # --- 3a. Exact Vectorized Correlation Method ---
        # De-mean all series. N = number of events.
        N = surprise_vectors.shape[0]
        mp_shocks_demeaned = mp_shocks - mp_shocks.mean(axis=1, keepdims=True)
        surprises_demeaned = surprise_vectors - surprise_vectors.mean(axis=0)

        # Calculate the covariance term (numerator of correlation).
        # einsum 'gn,nj->gj' computes the dot product of each of the G candidate shocks
        # with the 2 original surprise series, resulting in a (G, 2) covariance matrix.
        covariance = np.einsum('gn,nj->gj', mp_shocks_demeaned, surprises_demeaned) / (N - 1)

        # Calculate the standard deviations (for the denominator).
        mp_shocks_std = mp_shocks_demeaned.std(axis=1)
        surprises_std = surprises_demeaned.std(axis=0)

        # Calculate the correlation matrix by dividing covariance by the product of stds.
        # Add a small epsilon to prevent division by zero.
        correlations = covariance / (mp_shocks_std[:, np.newaxis] * surprises_std + 1e-12)

        # Apply sign restrictions on the correlations.
        # Restriction 1: rho(epsilon_mp, s_rate) > 0
        corr_rate_ok = correlations[:, 0] > 0
        # Restriction 2: rho(epsilon_mp, s_equity) < 0
        corr_equity_ok = correlations[:, 1] < 0

        # An angle is admissible if both correlation restrictions hold.
        is_admissible = corr_rate_ok & corr_equity_ok

    elif method == 'observation_count':
        # --- 3b. Observation Count Method ---
        # De-mean the series to analyze co-movements around the mean.
        mp_shocks_demeaned = mp_shocks - mp_shocks.mean(axis=1, keepdims=True)
        surprises_demeaned = surprise_vectors - surprise_vectors.mean(axis=0)

        # The sign of the product of demeaned series gives the sign of the co-movement for each observation.
        # Shape: (num_angles, num_events, 2)
        co_movements = mp_shocks_demeaned[:, :, np.newaxis] * surprises_demeaned

        # Check if the sign restrictions hold for each individual observation.
        # Restriction 1: Co-movement with rate surprise must be positive.
        rate_co_movement_ok = co_movements[:, :, 0] > 0
        # Restriction 2: Co-movement with equity surprise must be negative.
        equity_co_movement_ok = co_movements[:, :, 1] < 0

        # A successful observation is one where both restrictions hold.
        both_restrictions_ok = rate_co_movement_ok & equity_co_movement_ok

        # The pass rate for each angle is the mean of the boolean mask along the event axis.
        pass_rate = np.mean(both_restrictions_ok, axis=1)

        # An angle is admissible if its pass rate meets the threshold.
        is_admissible = pass_rate >= pass_rate_threshold

    # --- 4. Return the final boolean mask ---
    return is_admissible


def identify_structural_shocks(
    surprise_vectors: Dict[str, np.ndarray],
    num_angles: int = 999
) -> Dict[str, Dict[str, Any]]:
    """
    Identifies structural shocks from reduced-form surprises via rotational decomposition.

    Purpose:
        This is the main function for Task 6. It implements the Jarociński &
        Karadi (2020) methodology to disentangle pure monetary policy shocks
        from central bank information shocks.

    Process:
        For each central bank's surprise vectors:
        1.  **Setup**: Generates a grid of rotation angles and corresponding
            rotation matrices.
        2.  **Identification**: Applies each rotation and identifies the subset of
            "admissible" angles that satisfy the theoretical sign restrictions
            on the correlations between shocks and surprises.
        3.  **Benchmark Selection**: Calculates the median of the admissible
            angles, carefully handling cases where the set of angles "wraps
            around" the 2-pi circle.
        4.  **Shock Construction**: Applies the median rotation to the raw
            surprises to generate the benchmark structural shock series.
        5.  **Normalization**: Normalizes the structural shocks to have unit
            standard deviation and ensures the monetary policy shock is signed
            as contractionary (i.e., positively correlated with interest rates).

    Args:
        surprise_vectors (Dict[str, np.ndarray]): A dictionary where keys are
            central bank names and values are (N, 2) NumPy arrays of their
            [rate, equity] surprise vectors.
        num_angles (int): The number of angles to use in the grid search for
                          the rotation.

    Returns:
        Dict[str, Dict[str, Any]]: A nested dictionary where top-level keys are
        central bank names. Each inner dictionary contains:
        - 'structural_shocks': The (N, 2) array of normalized shocks.
        - 'median_angle': The benchmark rotation angle (in radians).
        - 'admissible_angles': The array of all angles satisfying the restrictions.
        - 'admissibility_mask': The boolean mask for the angle grid.

    Raises:
        IdentificationError: If no admissible rotation angles can be found for
                             a given central bank.
    """
    # --- Initialization ---
    # Generate the angle grid and rotation matrices once.
    theta_grid, rotation_matrices = _generate_rotation_matrices(num_angles)

    # Prepare the dictionary to store results for each central bank.
    results = {}

    # --- Process each central bank's surprises independently ---
    for bank, surprises in surprise_vectors.items():
        # --- Step 2: Find Admissible Rotations ---
        is_admissible = _find_admissible_rotations(surprises, rotation_matrices, theta_grid)

        # Extract the angles that satisfy the restrictions.
        admissible_angles = theta_grid[is_admissible]

        # --- Error Handling: Check if identification was successful ---
        if len(admissible_angles) == 0:
            raise IdentificationError(f"Identification failed for {bank}: No admissible rotation angles found.")

        # --- Step 3: Calculate Benchmark Median Rotation (handling circularity) ---
        # Sort angles to check for "wrap-around" issues on the 0-2pi circle.
        sorted_angles = np.sort(admissible_angles)
        # A large jump in sorted angles suggests the set wraps around 2pi.
        if (np.diff(sorted_angles) > np.pi).any():
            # Shift angles < pi by adding 2pi to "unwrap" the set.
            adjusted_angles = np.where(sorted_angles < np.pi, sorted_angles + 2 * np.pi, sorted_angles)
            # Calculate the median on the adjusted set.
            median_angle = np.median(adjusted_angles)
            # Map the median back to the [0, 2pi] range if it was shifted.
            median_angle = median_angle % (2 * np.pi)
        else:
            # If no wrap-around, the standard median is correct.
            median_angle = np.median(admissible_angles)

        # --- Step 4: Construct Benchmark Shocks ---
        # Create the single benchmark rotation matrix from the median angle.
        q_median = np.array([
            [np.cos(median_angle), -np.sin(median_angle)],
            [np.sin(median_angle), np.cos(median_angle)]
        ])

        # Apply the rotation to get the raw structural shocks.
        raw_structural_shocks = (q_median @ surprises.T).T

        # --- Step 5: Normalization and Sign Convention ---
        # Ensure the monetary policy shock (column 0) is signed as contractionary.
        # It must be positively correlated with the interest rate surprise (column 0).
        if np.corrcoef(raw_structural_shocks[:, 0], surprises[:, 0])[0, 1] < 0:
            # If the correlation is negative, flip the sign of BOTH structural shocks
            # to maintain their orthogonality and relative interpretation.
            raw_structural_shocks *= -1

        # Normalize both shock series to have a standard deviation of 1.
        std_devs = raw_structural_shocks.std(axis=0)
        # Avoid division by zero if a shock series has zero variance.
        safe_std_devs = np.where(std_devs > 1e-9, std_devs, 1.0)
        normalized_shocks = raw_structural_shocks / safe_std_devs

        # --- Store Results ---
        results[bank] = {
            'structural_shocks': normalized_shocks,
            'median_angle': median_angle,
            'admissible_angles': admissible_angles,
            'admissibility_mask': is_admissible
        }

    return results

# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 6
# =============================================================================

def run_phase2_task6_shock_identification(
    surprise_vectors: Dict[str, np.ndarray],
    config: Dict[str, Any]
) -> Dict[str, Dict[str, Any]]:
    """
    Orchestrates the full rotational decomposition pipeline for Task 6.

    This is a simple wrapper around the main `identify_structural_shocks`
    function, intended to fit into the broader project pipeline structure. It
    retrieves any necessary parameters from the configuration and calls the
    core implementation.

    Args:
        surprise_vectors (Dict[str, np.ndarray]): The dictionary of raw surprise
                                                  vectors from Task 5.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, Dict[str, Any]]: The nested dictionary containing the
                                   identified structural shocks and metadata
                                   for each central bank.
    """
    # Extract computational settings from the configuration.
    num_angles = config.get("computation_settings", {}).get("robustness_checks", {}).get("identification_uncertainty_draws", 999)

    # Call the main identification function with the specified parameters.
    structural_shock_results = identify_structural_shocks(
        surprise_vectors=surprise_vectors,
        num_angles=num_angles
    )

    return structural_shock_results


In [None]:
# Task 7: Monthly Shock Aggregation

# =============================================================================
# TASK 7, STEP 1: EVENT-TO-MONTH MAPPING
# =============================================================================

def map_events_to_months(
    announcement_df: pd.DataFrame,
    day_of_month_threshold: int = 15
) -> pd.DataFrame:
    """
    Maps high-frequency events to their corresponding modeling month.

    Purpose:
        This function translates the precise timestamp of each announcement into
        a standardized monthly bucket for aggregation. It implements a specific
        rule to account for information lags, assigning events that occur late
        in a month to the subsequent month, with a special exception for December.

    Process:
        1.  **Input Validation**: Ensures the input DataFrame contains the required
            `event_id` and `announcement_timestamp_utc` columns.
        2.  **Base Month Calculation**: For each event, it determines the
            calendar month in which it occurred and standardizes this to the
            first day of that month (e.g., '2022-01-17' -> '2022-01-01').
        3.  **Apply Threshold Rule with Edge Case**: It creates a boolean mask to
            identify events that should be shifted to the next month. An event
            is shifted only if both of the following conditions are true:
            a. Its day is after the `day_of_month_threshold` (e.g., > 15).
            b. Its month is NOT December.
        4.  **Assign Final Month**: Using the mask, it adds a one-month offset
            to all identified events. Events not matching the criteria (i.e.,
            early-in-month events or any December event) retain their original
            base month.

    Args:
        announcement_df (pd.DataFrame): The clean announcement metadata with an
                                        'event_id' index or column and a valid
                                        'announcement_timestamp_utc' column.
        day_of_month_threshold (int): The day of the month used as the cutoff.
                                      Events after this day are pushed to the
                                      next month (unless in December).

    Returns:
        pd.DataFrame: A DataFrame with 'event_id' as the index and a single
                      'assigned_month' column containing the standardized
                      monthly timestamp for each event.
    """
    # --- 1. Input Validation ---
    # Ensure the required timestamp column exists.
    if 'announcement_timestamp_utc' not in announcement_df.columns:
        raise KeyError("Input DataFrame must contain the 'announcement_timestamp_utc' column.")
    # Ensure the event identifier is present.
    if 'event_id' not in announcement_df.columns:
        raise KeyError("Input DataFrame must contain the 'event_id' column.")

    # --- 2. Data Preparation ---
    # Work on a copy to avoid modifying the original DataFrame.
    df = announcement_df[['event_id', 'announcement_timestamp_utc']].copy()

    # Set event_id as the index for clean joins and a well-structured output.
    df.set_index('event_id', inplace=True)

    # Get the timestamp series for easier access.
    timestamps = df['announcement_timestamp_utc']

    # Determine the base calendar month for all events, standardized to the first day.
    # Using .dt.to_period('M').dt.to_timestamp() is a robust way to achieve this.
    base_month = timestamps.dt.to_period('M').dt.to_timestamp()

    # --- 3. Apply Threshold Rule with December Edge Case ---
    # Condition A: The event occurs after the specified day-of-month threshold.
    is_late_in_month: pd.Series = timestamps.dt.day > day_of_month_threshold

    # Condition B: The event's month is not December (month == 12).
    is_not_december: pd.Series = timestamps.dt.month != 12

    # Combine the conditions: an event is shifted only if it is both late in the month AND not in December.
    mask_for_shift: pd.Series = is_late_in_month & is_not_december

    # --- 4. Assign Final Month ---
    # Initialize the assigned_month series with the base month values.
    assigned_month = base_month.copy()

    # For the events identified by the mask, add a one-month offset.
    # pd.DateOffset is the correct tool for robust date arithmetic that handles year-ends correctly.
    assigned_month.loc[mask_for_shift] = base_month.loc[mask_for_shift] + pd.DateOffset(months=1)

    # Return the final mapping as a DataFrame.
    return assigned_month.to_frame(name='assigned_month')


# =============================================================================
# TASK 7, STEP 2: WITHIN-MONTH SHOCK AGGREGATION
# =============================================================================

def aggregate_shocks_to_monthly(
    structural_shocks: Dict[str, pd.DataFrame],
    event_to_month_map: pd.DataFrame
) -> pd.DataFrame:
    """
    Aggregates event-level structural shocks into a monthly time series.

    Purpose:
        This function robustly converts the high-frequency, event-level structural
        shocks into the monthly frequency required for macroeconomic models like
        BVARs. It ensures perfect alignment between shocks and their assigned
        aggregation month by using an explicit `event_id` index.

    Process:
        1.  **Input Validation**: Verifies that the input `structural_shocks` is a
            dictionary of DataFrames, each indexed by `event_id`.
        2.  **Combine Shocks**: Concatenates the shock DataFrames for all central
            banks into a single DataFrame, preserving the `event_id` index. This
            creates a unified dataset of all identified shocks.
        3.  **Join Mapping**: Merges the combined shocks with the event-to-month
            mapping DataFrame using their shared and explicit `event_id` index.
            This step is critical for ensuring each shock is assigned to the
            correct month without relying on fragile row ordering.
        4.  **Aggregate**: Groups the merged data by the `assigned_month` column
            and then sums the shock values within each monthly bucket.
            Equation: `shock_m = sum(shock_t for t in month m)`
        5.  **Densify**: Creates a complete, uninterrupted monthly date range that
            spans the entire sample period. It then reindexes the aggregated
            shock data to this full range, filling any months that had no
            announcements with a value of 0.0, as is appropriate for a shock series.

    Args:
        structural_shocks (Dict[str, pd.DataFrame]): The output from the shock
            identification task. This must be a dictionary where keys are
            central bank names and values are DataFrames containing the
            structural shocks, indexed by `event_id`.
        event_to_month_map (pd.DataFrame): The output from `map_events_to_months`,
                                           a DataFrame with an `event_id` index
                                           and an 'assigned_month' column.

    Returns:
        pd.DataFrame: A DataFrame indexed by a complete monthly DatetimeIndex,
                      with columns for each aggregated shock series (e.g.,
                      'shock_FED_MP', 'shock_ECB_INFO').
    """
    # --- 1. Input Validation ---
    # Check that the primary input is a dictionary.
    if not isinstance(structural_shocks, dict):
        raise TypeError("Input 'structural_shocks' must be a dictionary of DataFrames.")
    # Check that the DataFrames inside the dictionary are indexed correctly.
    for bank, df in structural_shocks.items():
        if not isinstance(df.index, pd.Index) or df.index.name != 'event_id':
             raise ValueError(f"DataFrame for bank '{bank}' must be indexed by 'event_id'.")
    # Check that the month map is indexed correctly.
    if not isinstance(event_to_month_map.index, pd.Index) or event_to_month_map.index.name != 'event_id':
        raise ValueError("Input 'event_to_month_map' must be indexed by 'event_id'.")

    # --- 2. Combine Shocks into a Single DataFrame ---
    # Concatenate all shock DataFrames horizontally. The join is implicitly on the
    # event_id index. This is robust and efficient.
    try:
        event_level_shocks = pd.concat(structural_shocks.values(), axis=1)
    except Exception as e:
        raise ValueError(f"Failed to concatenate shock DataFrames. Ensure they have unique columns and a consistent index. Error: {e}")

    # --- 3. Join with Month Mapping ---
    # Join the shock data with the month mapping on their shared, explicit event_id index.
    # This is the core step that remedies the flaw of the previous implementation.
    df_to_agg = event_to_month_map.join(event_level_shocks, how='left')

    # --- 4. Aggregate by Summation ---
    # Group the DataFrame by the 'assigned_month' column and sum all shock values
    # that fall into that monthly bucket.
    monthly_aggregated = df_to_agg.groupby('assigned_month').sum()

    # --- 5. Densify the Time Series ---
    # Determine the full date range required for the final time series.
    start_date = event_to_month_map['assigned_month'].min()
    end_date = event_to_month_map['assigned_month'].max()

    # Create a complete monthly date range with 'MS' (Month Start) frequency.
    full_date_range = pd.date_range(start=start_date, end=end_date, freq='MS')

    # Reindex the aggregated data to this full range. Any months that did not have
    # announcements (and thus are missing from `monthly_aggregated`) will be
    # filled with 0.0, which is the correct representation for a shock series.
    monthly_shocks_final = monthly_aggregated.reindex(full_date_range, fill_value=0.0)

    # Return the final, dense, monthly shock DataFrame.
    return monthly_shocks_final


# =============================================================================
# TASK 7, STEP 3: SHOCK SERIES STATISTICAL VALIDATION
# =============================================================================

def validate_monthly_shock_series(
    monthly_shocks_df: pd.DataFrame,
    ljung_box_lags: List[int] = [1, 6, 12]
) -> pd.DataFrame:
    """
    Performs statistical validation on the aggregated monthly shock series.

    Purpose:
        This function provides a quantitative quality check on the final shock
        series. It verifies key statistical properties that are expected of a
        well-identified, exogenous shock series, such as a zero mean and a lack
        of significant autocorrelation.

    Process (for each shock series):
        1.  **Descriptive Stats**: Calculates the first four moments (mean,
            standard deviation, skewness, kurtosis).
        2.  **Zero-Mean Test**: Performs a one-sample t-test to check if the
            mean of the series is statistically different from zero.
        3.  **Autocorrelation Test**: Conducts a Ljung-Box test for serial
            correlation at multiple specified lags.

    Args:
        monthly_shocks_df (pd.DataFrame): The final monthly shock series DataFrame.
        ljung_box_lags (List[int]): A list of lags to use for the Ljung-Box test.

    Returns:
        pd.DataFrame: A DataFrame report where each row corresponds to a shock
                      series and columns contain the results of the statistical tests.
    """
    # --- Initialization ---
    validation_results = []

    # --- Iterate and Validate Each Shock Series ---
    for shock_name in monthly_shocks_df.columns:
        series = monthly_shocks_df[shock_name]

        # Basic descriptive statistics
        results = {
            'shock_series': shock_name,
            'mean': series.mean(),
            'std_dev': series.std(),
            'skewness': series.skew(),
            'kurtosis': series.kurtosis()
        }

        # T-test for H0: mean = 0
        # A high p-value is desired, indicating we cannot reject the zero-mean hypothesis.
        t_stat, p_val_mean = stats.ttest_1samp(series, 0.0, nan_policy='omit')
        results['t_test_p_value'] = p_val_mean

        # Ljung-Box test for H0: no autocorrelation
        # High p-values are desired, indicating no significant serial correlation.
        try:
            # Use the pandas wrapper for a cleaner output format.
            lb_results = acorr_ljungbox(series, lags=ljung_box_lags, return_df=True)
            for lag in ljung_box_lags:
                results[f'lb_p_value_lag_{lag}'] = lb_results.loc[lag, 'lb_pvalue']
        except Exception:
            # Handle cases where the test fails (e.g., zero variance series).
            for lag in ljung_box_lags:
                results[f'lb_p_value_lag_{lag}'] = np.nan

        validation_results.append(results)

    return pd.DataFrame(validation_results).set_index('shock_series')


# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 7
# =============================================================================

def run_phase3_task7_temporal_aggregation(
    structural_shock_results: Dict[str, Dict[str, Any]],
    clean_announcement_df: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the full temporal aggregation pipeline for Task 7.

    This master function transforms the event-level structural shocks into a
    validated, monthly time series ready for econometric modeling.

    Args:
        structural_shock_results (Dict[str, Dict[str, Any]]): The output from
            Task 6, containing the identified structural shocks.
        clean_announcement_df (pd.DataFrame): The cleansed announcement metadata.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary containing the final monthly
                                 shock series and its statistical validation report.
    """
    # Extract the shock arrays and corresponding event_ids.
    # This requires careful reconstruction from the Task 6 output and the announcement df.
    shocks_to_aggregate = {
        bank: results['structural_shocks']
        for bank, results in structural_shock_results.items()
    }

    # --- Step 1: Map Events to Months ---
    event_month_map = map_events_to_months(
        announcement_df=clean_announcement_df
    )

    # --- Step 2: Aggregate Shocks to Monthly Frequency ---
    monthly_shocks = aggregate_shocks_to_monthly(
        structural_shocks=shocks_to_aggregate,
        event_to_month_map=event_month_map,
        announcement_df=clean_announcement_df
    )

    # --- Step 3: Perform Statistical Validation ---
    validation_report = validate_monthly_shock_series(
        monthly_shocks_df=monthly_shocks
    )

    # --- Aggregate and return all results ---
    results = {
        "monthly_shocks_df": monthly_shocks,
        "validation_report": validation_report
    }

    return results


In [None]:
# Task 8: Macroeconomic Data Transformation and Variable Construction

# =============================================================================
# TASK 8, STEP 1: CORE VARIABLE TRANSFORMATION
# =============================================================================

def transform_core_macro_variables(
    prepared_macro_df: pd.DataFrame,
    transformation_map: Dict[str, str],
    target_market: str
) -> pd.DataFrame:
    """
    Applies specified transformations and computes required cross-rates.

    Purpose:
        This function converts the prepared (interpolated, wide-format) macro
        data into the final form required for econometric modeling. It handles
        both direct transformations (e.g., log) and derived transformations
        like currency cross-rates.

    Process:
        1.  **Direct Transformations**: It iterates through a mapping dictionary
            to apply specified transformations ('log' or 'level') to each core
            variable. It includes validation to ensure positivity for log transforms.
            Equation: `y_t = 100 * ln(x_t)`
        2.  **Cross-Rate Calculation**: It explicitly checks for the presence of
            the necessary base exchange rate series (e.g., vs. USD for both
            Canada and Euro Area) needed to compute the CAD/EUR cross-rate.
        3.  **Cross-Rate Formula**: If the base series are present, it calculates
            the cross-rate according to standard financial arithmetic.
            Equation: `CAD/EUR = (USD/EUR) / (USD/CAD)`
        4.  **Cross-Rate Transformation**: It then applies the standard log
            transformation to the newly calculated cross-rate series.
        5.  **Standardize Naming**: It renames all output columns to a consistent
            convention, e.g., `CAN_gdp_log`, `CAN_exchange_rate_vs_EUR_log`.

    Args:
        prepared_macro_df (pd.DataFrame): A wide-format DataFrame of macro
                                          series, indexed by date. This data
                                          should already be cleaned and
                                          interpolated.
        transformation_map (Dict[str, str]): A dictionary mapping column names
                                             in `prepared_macro_df` to the
                                             desired transformation ('log' or 'level').
        target_market (str): The country/market code (e.g., 'CAN'), used for
                             standardizing output column names.

    Returns:
        pd.DataFrame: A new DataFrame with the same index but containing the
                      transformed and newly created variables.

    Raises:
        TypeError: If the input DataFrame does not have a DatetimeIndex.
        KeyError: If a variable in the transformation map or a required base
                  exchange rate is not found in the input DataFrame.
        ValueError: If a series marked for log-transformation contains non-positive values.
    """
    # --- 1. Input Validation ---
    # Ensure the input is a time-series DataFrame.
    if not isinstance(prepared_macro_df.index, pd.DatetimeIndex):
        raise TypeError("Input DataFrame must have a DatetimeIndex.")

    # --- 2. Direct Transformations ---
    # Create a dictionary to hold the final, transformed columns.
    transformed_cols: Dict[str, pd.Series] = {}

    # Iterate through the provided transformation map.
    for var_name, transform_type in transformation_map.items():
        # Ensure the variable exists in the input DataFrame.
        if var_name not in prepared_macro_df.columns:
            raise KeyError(f"Variable '{var_name}' from transformation map not found in DataFrame.")

        series = prepared_macro_df[var_name]

        # Generate the new, standardized column name.
        # Remove any country prefix from the original name to avoid duplication.
        clean_var_name = var_name.replace(f"{target_market}_", "")
        new_name = f"{target_market}_{clean_var_name}"

        if transform_type == 'log':
            # Validate that all values are positive before taking the log.
            if (series <= 0).any():
                raise ValueError(f"Series '{var_name}' contains non-positive values and cannot be log-transformed.")
            # Equation: y_t = 100 * ln(x_t)
            transformed_cols[new_name + '_log'] = 100 * np.log(series)
        elif transform_type == 'level':
            # Keep the series in its original level form.
            transformed_cols[new_name] = series
        else:
            raise ValueError(f"Unknown transformation type '{transform_type}' for variable '{var_name}'.")

    # --- 3. Cross-Rate Calculation (CAD/EUR) ---
    # Define the required base series for the cross-rate calculation.
    # Assuming the convention is USD per foreign currency (e.g., USD/CAD).
    cad_vs_usd_col = f"{target_market}_exchange_rate_vs_USD"
    eur_vs_usd_col = "EUR_exchange_rate_vs_USD" # Assuming a standard name for the Euro Area series

    # Check if both required base series are present in the original data.
    if cad_vs_usd_col in prepared_macro_df.columns and eur_vs_usd_col in prepared_macro_df.columns:
        # Extract the base series.
        usd_cad_series = prepared_macro_df[cad_vs_usd_col]
        usd_eur_series = prepared_macro_df[eur_vs_usd_col]

        # Equation: CAD/EUR = (USD/EUR) / (USD/CAD)
        cad_eur_series = usd_eur_series / usd_cad_series

        # Validate the new series for positivity before log-transformation.
        if (cad_eur_series <= 0).any():
            raise ValueError("Calculated CAD/EUR cross-rate contains non-positive values.")

        # Transform and add the new cross-rate to the results dictionary.
        new_cross_rate_name = f"{target_market}_exchange_rate_vs_EUR_log"
        transformed_cols[new_cross_rate_name] = 100 * np.log(cad_eur_series)

    # --- 4. Assemble Final DataFrame ---
    # Create the final DataFrame from the dictionary of transformed series.
    df_final = pd.DataFrame(transformed_cols)

    # Ensure the column order is predictable for downstream tasks.
    return df_final.sort_index(axis=1)

# =============================================================================
# TASK 8, STEP 2: CONTROL VARIABLE CONSTRUCTION
# =============================================================================

def construct_control_variables(
    time_index: pd.DatetimeIndex,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Constructs deterministic control variables for the model.

    Purpose:
        This function generates standard control variables, such as a time
        trend and dummy variables for specific periods, that are required for
        the BVAR and Local Projection models.

    Process:
        1.  **Linear Trend**: Creates a linear time trend series that starts at
            1 and increments by 1 for each period in the provided `time_index`.
        2.  **COVID-19 Dummy**: Reads the start and end dates for the COVID-19
            period from the configuration dictionary. It then creates a binary
            dummy variable that is 1 during this period (inclusive) and 0
            otherwise.

    Args:
        time_index (pd.DatetimeIndex): The monthly DatetimeIndex from the
                                       macroeconomic dataset, which defines the
                                       sample period.
        config (Dict[str, Any]): The main study configuration dictionary, used
                                 to get the COVID period definition.

    Returns:
        pd.DataFrame: A DataFrame with the same `time_index`, containing
                      columns for each generated control variable.
    """
    # --- Input Validation ---
    if not isinstance(time_index, pd.DatetimeIndex):
        raise TypeError("Input 'time_index' must be a pandas DatetimeIndex.")

    # --- Initialization ---
    num_observations = len(time_index)
    controls_df = pd.DataFrame(index=time_index)

    # --- 1. Construct Linear Time Trend ---
    # The trend starts at 1 and increments for each observation.
    controls_df['trend'] = np.arange(1, num_observations + 1)

    # --- 2. Construct COVID-19 Dummy Variable ---
    # Extract the period definition from the configuration.
    try:
        covid_period = config['bvar_specification']['covid_period']
        start_date = pd.to_datetime(covid_period['start_date'])
        end_date = pd.to_datetime(covid_period['end_date'])
    except KeyError as e:
        raise KeyError(f"Could not find COVID period definition in config: {e}")
    except Exception as e:
        raise ValueError(f"Could not parse COVID period dates from config: {e}")

    # Create a boolean mask for the date range.
    covid_mask = (time_index >= start_date) & (time_index <= end_date)

    # Convert the boolean mask to an integer (0 or 1) dummy variable.
    controls_df['covid_dummy'] = covid_mask.astype(int)

    return controls_df


# =============================================================================
# TASK 8, STEP 3: FINAL DATASET ASSEMBLY
# =============================================================================

def assemble_final_dataset(
    transformed_macro_df: pd.DataFrame,
    monthly_shocks_df: pd.DataFrame,
    control_variables_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Assembles the final, analysis-ready dataset for econometric modeling.

    Purpose:
        This function serves as the final step in the data preparation phase.
        It merges the transformed endogenous macroeconomic variables, the
        aggregated monthly shock series, and the deterministic control
        variables into a single, unified, and clean DataFrame. It follows a
        robust procedure to handle any minor misalignments in the time indices
        of the input datasets.

    Process:
        1.  **Outer Join**: It first performs an outer join on all three input
            DataFrames. This creates a superset of all dates present in any
            input, explicitly showing any misalignments as `NaN` values.
        2.  **Limited Forward Fill**: It then applies a conservative forward-fill
            with a limit of 1. This is designed to impute single, sporadic
            missing observations, which might arise from minor data reporting
            lags, without filling large, structural gaps in the data.
        3.  **Drop Remaining NaNs**: Finally, it drops any rows that still
            contain `NaN` values. This step effectively truncates the sample to
            the contiguous time period where all variables are present and valid,
            ensuring a clean, balanced panel for modeling.
        4.  **Validation**: It performs a final check to assert that the
            resulting DataFrame is completely free of any missing values before
            returning it.

    Args:
        transformed_macro_df (pd.DataFrame): Transformed endogenous variables,
                                             indexed by date.
        monthly_shocks_df (pd.DataFrame): Aggregated monthly shock series,
                                          indexed by date.
        control_variables_df (pd.DataFrame): Deterministic control variables,
                                             indexed by date.

    Returns:
        pd.DataFrame: The complete, analysis-ready dataset with a consistent
                      and complete monthly DatetimeIndex.

    Raises:
        ValueError: If the final assembled dataset still contains missing values
                    after all processing steps, indicating a more severe data
                    problem than the function is designed to handle.
    """
    # --- 1. Input Validation ---
    # Check that all inputs are DataFrames with a DatetimeIndex.
    for df_name, df in zip(['transformed_macro_df', 'monthly_shocks_df', 'control_variables_df'],
                           [transformed_macro_df, monthly_shocks_df, control_variables_df]):
        if not isinstance(df, pd.DataFrame):
            raise TypeError(f"Input '{df_name}' must be a pandas DataFrame.")
        if not isinstance(df.index, pd.DatetimeIndex):
            raise TypeError(f"Input '{df_name}' must have a DatetimeIndex.")

    # --- 2. Outer Join to Combine All Data Sources ---
    # Start with the macro data as the base.
    combined_df = transformed_macro_df

    # Join the shocks using an outer join to retain all dates from both datasets.
    # This makes any date misalignments explicit as rows/columns with NaNs.
    combined_df = combined_df.join(monthly_shocks_df, how='outer')

    # Join the control variables using an outer join.
    combined_df = combined_df.join(control_variables_df, how='outer')

    # Sort the index to ensure chronological order before filling.
    combined_df.sort_index(inplace=True)

    # --- 3. Apply Limited Forward Fill ---
    # This step handles minor, isolated missing values (e.g., a single month lag
    # in a data release) without imputing data across large structural gaps.
    # The `limit=1` is a critical parameter for this conservative approach.
    filled_df = combined_df.fillna(method='ffill', limit=1)

    # --- 4. Drop Remaining NaNs to Get Final Sample ---
    # After the limited fill, any remaining NaNs represent either gaps larger
    # than one month or NaNs at the beginning of a series' history.
    # Dropping these rows truncates the dataset to the final, clean, balanced
    # sample period where all data is present.
    final_df = filled_df.dropna()

    # --- 5. Final Validation ---
    # This is a critical assertion to guarantee the output is clean.
    # If this check fails, it indicates a deeper problem in the upstream data
    # that the standard cleaning procedure could not resolve.
    if final_df.isna().sum().sum() > 0:
        missing_info = final_df.isna().sum()
        raise ValueError(
            "Final dataset contains unexpected missing values after assembly and imputation. "
            f"Problematic columns:\n{missing_info[missing_info > 0]}"
        )

    # Ensure the data types are numeric, as joins can sometimes change them.
    final_df = final_df.astype(float)

    # Return the final, analysis-ready DataFrame.
    return final_df


# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 8
# =============================================================================

def run_phase3_task8_final_data_assembly(
    prepared_macro_df: pd.DataFrame,
    monthly_shocks_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the full data transformation and assembly pipeline for Task 8.

    This master function takes the prepared macro data and monthly shocks,
    generates the necessary control variables, applies final transformations,
    and assembles the complete dataset ready for modeling.

    Args:
        prepared_macro_df (pd.DataFrame): The wide-format, interpolated macro
                                          data from the preprocessing phase.
        monthly_shocks_df (pd.DataFrame): The aggregated monthly shock series
                                          from Task 7.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        pd.DataFrame: The final, complete, analysis-ready dataset.
    """
    # Define the transformation map based on economic variable types.
    # This should ideally be part of the configuration itself.
    transformation_map = {
        'CAN_exchange_rate': 'log',
        'CAN_policy_rate': 'level',
        'CAN_cpi': 'log',
        'CAN_equity_index': 'log',
        'CAN_gdp': 'log'
    }

    # --- Step 1: Apply Core Variable Transformations ---
    transformed_macro = transform_core_macro_variables(
        prepared_macro_df=prepared_macro_df,
        transformation_map=transformation_map
    )

    # --- Step 2: Construct Control Variables ---
    controls = construct_control_variables(
        time_index=transformed_macro.index,
        config=config
    )

    # --- Step 3: Assemble the Final Dataset ---
    analysis_ready_df = assemble_final_dataset(
        transformed_macro_df=transformed_macro,
        monthly_shocks_df=monthly_shocks_df,
        control_variables_df=controls
    )

    return analysis_ready_df


In [None]:
# Task 9: BVAR Model Specification and Prior Setup

# =============================================================================
# TASK 9, STEP 1: ENDOGENOUS & EXOGENOUS MATRIX CONSTRUCTION
# =============================================================================

def create_var_matrices(
    analysis_ready_df: pd.DataFrame,
    endogenous_vars: List[str],
    exogenous_vars: List[str],
    lags: int
) -> Tuple[np.ndarray, np.ndarray, pd.DatetimeIndex]:
    """
    Constructs the Y and X matrices for VAR model estimation.

    Purpose:
        This function transforms a time-series DataFrame into the standard matrix
        format required for regression analysis (Y = X*B + E). It systematically
        creates lagged variables and aligns the endogenous (Y) and exogenous (X)
        matrices.

    Process:
        1.  **Lag Creation**: For each endogenous variable, it creates `p` lagged
            versions, naming them systematically (e.g., 'VAR_L1').
        2.  **Regressor Assembly**: It combines the lagged endogenous variables,
            the specified exogenous variables, and a constant term into a single
            regressor DataFrame.
        3.  **Alignment**: It selects the non-lagged endogenous variables as the
            Y matrix.
        4.  **Handling NaNs**: It drops all rows containing any NaN values, which
            naturally arise from the lagging process. This ensures that Y and X
            are perfectly aligned and have the same number of observations (T).
        5.  **Conversion**: Converts the final DataFrames to NumPy arrays for
            efficient numerical computation.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset from Task 8.
        endogenous_vars (List[str]): An ordered list of column names for the
                                     endogenous variables.
        exogenous_vars (List[str]): A list of column names for the exogenous variables.
        lags (int): The number of lags (p) to include in the VAR model.

    Returns:
        Tuple[np.ndarray, np.ndarray, pd.DatetimeIndex]:
        - Y: The (T x K) matrix of endogenous variables.
        - X: The (T x M) matrix of regressors (lags, exog, constant).
        - valid_index: The DatetimeIndex corresponding to the valid (T) observations.
    """
    # --- Input Validation ---
    if lags < 1:
        raise ValueError("Number of lags must be a positive integer.")

    # --- Lag Creation ---
    df = analysis_ready_df.copy()
    lagged_vars = []
    for var in endogenous_vars:
        for i in range(1, lags + 1):
            lag_col_name = f"{var}_L{i}"
            df[lag_col_name] = df[var].shift(i)
            lagged_vars.append(lag_col_name)

    # --- Regressor Assembly ---
    # The order of regressors is: K*p lags, then exogenous vars, then a constant.
    regressor_cols = lagged_vars + exogenous_vars
    df_with_lags = add_constant(df, prepend=False) # Adds a 'const' column
    regressor_cols.append('const')

    # --- Alignment and NaN Handling ---
    # Drop rows with NaNs created by the shift() operation.
    df_clean = df_with_lags.dropna()

    # --- Matrix Conversion ---
    # Extract the Y and X matrices as NumPy arrays.
    Y = df_clean[endogenous_vars].values
    X = df_clean[regressor_cols].values

    # Extract the valid time index for plotting and reference.
    valid_index = df_clean.index

    return Y, X, valid_index


# =============================================================================
# TASK 9, STEPS 2 & 3: NORMAL-WISHART PRIOR IMPLEMENTATION
# =============================================================================

def construct_minnesota_prior(
    Y: np.ndarray,
    X: np.ndarray,
    endogenous_vars: List[str],
    lags: int,
    config: Dict[str, Any]
) -> Dict[str, np.ndarray]:
    """
    Constructs the parameters for a Minnesota-style Normal-Wishart prior.

    Purpose:
        This function translates the high-level hyperparameters from the study
        configuration into the precise prior mean vector (`beta_0`) and prior
        covariance matrix (`V_0`) for the VAR coefficients, as well as the
        parameters for the Inverse-Wishart prior on the error covariance.

    Process:
        1.  **Hyperparameter Extraction**: Retrieves tightness parameters
            (`lambda_1`, `lambda_3`) and the AR(1) coefficient prior from the config.
        2.  **Estimate Scaling Variances**: For each endogenous variable, it
            estimates a univariate AR(1) model to obtain the residual variance
            (sigma_i^2), which is used to scale the prior for cross-variable lags.
        3.  **Construct Prior Mean (`beta_0`)**: Creates a vector where the prior
            mean is `ar_coeff_prior` (e.g., 0.8) for the first own lag of each
            variable, and 0 for all other coefficients.
        4.  **Construct Prior Covariance (`V_0`)**: Creates a diagonal matrix.
            The diagonal elements (variances) are calculated using the Minnesota
            formulas, shrinking coefficients on higher-order lags and cross-
            variable lags more aggressively.
        5.  **Construct Wishart Prior**: Sets the degrees of freedom (`nu_0`) and
            scale matrix (`S_0`) for the Inverse-Wishart prior on the error
            covariance matrix, typically to uninformative but proper values.

    Args:
        Y (np.ndarray): The (T x K) matrix of endogenous variables.
        X (np.ndarray): The (T x M) matrix of regressors.
        endogenous_vars (List[str]): Ordered list of endogenous variable names.
        lags (int): The number of lags (p) in the model.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, np.ndarray]: A dictionary containing all prior components:
        'beta_0', 'V_0', 'nu_0', 'S_0'.
    """
    # --- Hyperparameter Extraction ---
    priors_config = config['bvar_priors']['hyperparameters']
    lambda_1 = priors_config['lambda_1_overall_tightness']
    lambda_3 = priors_config['lambda_3_lag_decay']
    ar_coeff = priors_config['ar_coeff_prior']

    K = Y.shape[1]  # Number of endogenous variables
    M = X.shape[1]  # Total number of regressors
    num_exog_vars = M - (K * lags) - 1 # Exclude lags and constant

    # --- Estimate Scaling Variances (sigma_i^2) ---
    sigma_sq = np.zeros(K)
    for i in range(K):
        y_i = Y[:, i]
        X_ar1 = add_constant(pd.Series(y_i).shift(1).bfill())
        model = OLS(y_i[1:], X_ar1[1:]).fit()
        sigma_sq[i] = model.ssr / model.df_resid

    # --- Construct Prior Mean (beta_0) ---
    # beta_0 is a flattened vector of size M*K
    beta_0 = np.zeros((M, K))
    # Set the prior mean for the first own lag of each variable.
    for i in range(K):
        # The index of the first own lag for variable i is i.
        beta_0[i, i] = ar_coeff
    beta_0 = beta_0.flatten(order='F') # Flatten column-wise

    # --- Construct Prior Covariance (V_0) ---
    # V_0 is a diagonal matrix, so we only need to construct its diagonal.
    V_0_diag = np.zeros(M * K)

    # Iterate through each regressor (column of X)
    for m in range(M):
        # Iterate through each equation (column of Y)
        for k in range(K):
            idx = k * M + m # Index in the flattened V_0_diag

            # Check if the regressor is a lagged endogenous variable
            if m < K * lags:
                lag = (m // K) + 1
                lagged_var_idx = m % K

                # Prior variance for endogenous lags
                variance = (lambda_1 / (lag ** lambda_3))**2
                if lagged_var_idx != k: # Cross-lag
                    variance *= (sigma_sq[k] / sigma_sq[lagged_var_idx])
                V_0_diag[idx] = variance

            # Prior variance for exogenous variables (excluding constant)
            elif m < M - 1:
                V_0_diag[idx] = 100**2 # Diffuse prior

            # Prior variance for the constant
            else:
                V_0_diag[idx] = 1e12 # Very diffuse prior

    V_0 = np.diag(V_0_diag)

    # --- Construct Inverse-Wishart Prior Parameters ---
    # nu_0: Prior degrees of freedom
    nu_0 = K + 2.0
    # S_0: Prior scale matrix
    S_0 = np.diag([s * (nu_0 - K - 1) for s in sigma_sq])

    return {
        'beta_0': beta_0,
        'V_0': V_0,
        'nu_0': np.array([[nu_0]]), # Ensure it's an array
        'S_0': S_0
    }


# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 9
# =============================================================================

def run_phase4_task9_bvar_setup(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the full BVAR model setup pipeline for Task 9.

    This master function prepares all the necessary components for the Gibbs
    sampler: it constructs the data matrices (Y and X) and the detailed prior
    matrices (beta_0, V_0, nu_0, S_0) based on the study's configuration.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset from Task 8.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, Any]: A dictionary containing all setup components:
                        'Y', 'X', 'valid_index', and 'priors'.
    """
    # --- Extract specifications from config ---
    spec_config = config['bvar_specification']
    endogenous_vars = spec_config['endogenous_variables']
    # Exogenous variables for the VAR are shocks + deterministic terms
    exogenous_vars = [
        var for var in spec_config['exogenous_variables']
        if 'shock' in var or 'trend' in var or 'dummy' in var
    ]
    lags = spec_config['lags']

    # --- Step 1: Create Data Matrices ---
    Y, X, valid_index = create_var_matrices(
        analysis_ready_df=analysis_ready_df,
        endogenous_vars=endogenous_vars,
        exogenous_vars=exogenous_vars,
        lags=lags
    )

    # --- Step 2 & 3: Construct Prior Matrices ---
    priors = construct_minnesota_prior(
        Y=Y,
        X=X,
        endogenous_vars=endogenous_vars,
        lags=lags,
        config=config
    )

    # --- Assemble and return all results ---
    results = {
        'Y': Y,
        'X': X,
        'valid_index': valid_index,
        'priors': priors,
        'meta': {
            'endogenous_vars': endogenous_vars,
            'exogenous_vars': [c for c in analysis_ready_df.columns if c not in endogenous_vars],
            'lags': lags,
            'T': Y.shape[0],
            'K': Y.shape[1],
            'M': X.shape[1]
        }
    }

    return results


In [None]:
# Task 10: Gibbs Sampler Implementation

# =============================================================================
# CUSTOM WARNING FOR THIS TASK
# =============================================================================

class ConvergenceWarning(UserWarning):
    """
    Custom warning for potential MCMC convergence issues.

    Purpose:
        This warning is raised by the Gibbs sampler when a convergence
        diagnostic (e.g., Geweke) indicates that the Markov chain may not have
        reached its stationary distribution. It serves as a non-fatal alert to
        the user that the posterior draws should be inspected carefully before
        being used for inference.
    """
    pass

# =============================================================================
# HELPER FOR CONVERGENCE DIAGNOSTIC
# =============================================================================

def _calculate_geweke_diagnostic(
    chain: np.ndarray,
    first_prop: float = 0.1,
    last_prop: float = 0.5
) -> float:
    """
    Calculates the Geweke (1992) diagnostic z-score for a single MCMC chain.

    Purpose:
        This function implements a standard MCMC convergence diagnostic. It tests
        the null hypothesis that the mean of an early segment of a Markov chain
        is equal to the mean of a later segment. If the chain has converged,
        the means should be similar. A large z-score (e.g., > |2|) suggests
        that the chain has not yet converged.

    Process:
        1.  **Slice Chain**: Splits the input chain of posterior draws into two
            segments: an early part (default: first 10%) and a late part
            (default: last 50%).
        2.  **Calculate Means**: Computes the simple mean of each segment.
        3.  **Estimate HAC Variance**: To account for the serial correlation
            inherent in MCMC draws, it estimates the variance of the segment
            means using a Heteroskedasticity and Autocorrelation Consistent
            (HAC) estimator (Newey-West). This is done by calculating the HAC
            variance of the intercept in a regression of the segment on a constant.
        4.  **Compute Z-Score**: The z-score is calculated as the difference in
            means divided by the square root of the sum of their HAC variances.

    Args:
        chain (np.ndarray): A 1D array of posterior draws for a single parameter.
        first_prop (float): Proportion of the start of the chain to use.
        last_prop (float): Proportion of the end of the chain to use.

    Returns:
        float: The Geweke z-score. Returns np.nan if the chain is too short
               or if the HAC estimation fails.
    """
    # Ensure the chain is long enough for a meaningful comparison.
    if len(chain) < 100:
        return np.nan

    # Define the number of observations in each segment of the chain.
    n_obs: int = len(chain)
    n_first: int = int(n_obs * first_prop)
    n_last: int = int(n_obs * last_prop)

    # Slice the chain into the two segments for comparison.
    first_segment: np.ndarray = chain[:n_first]
    last_segment: np.ndarray = chain[-n_last:]

    # Calculate the mean of each segment.
    mean_first: float = np.mean(first_segment)
    mean_last: float = np.mean(last_segment)

    # To calculate the standard error of the means robustly, we need the spectral
    # density at frequency zero, which is estimated using a HAC variance estimator.
    try:
        # Estimate HAC variance for the first segment's mean.
        var_first: float = sm.stats.sandwich_covariance.cov_hac(
            sm.OLS(first_segment, np.ones_like(first_segment)).fit()
        )[0, 0]

        # Estimate HAC variance for the last segment's mean.
        var_last: float = sm.stats.sandwich_covariance.cov_hac(
            sm.OLS(last_segment, np.ones_like(last_segment)).fit()
        )[0, 0]

        # The Geweke z-score is the difference in means standardized by the HAC standard error.
        z_score: float = (mean_first - mean_last) / np.sqrt(var_first + var_last)
        return z_score
    except Exception:
        # If HAC estimation fails for any reason (e.g., zero variance segment), return NaN.
        return np.nan

# =============================================================================
# IMPLEMENTATION OF `run_bvar_gibbs_sampler` (Task 10)
# =============================================================================

def run_bvar_gibbs_sampler(
    bvar_setup: Dict[str, Any],
    config: Dict[str, Any]
) -> Dict[str, np.ndarray]:
    """
    Estimates a Bayesian VAR model using a Gibbs sampler with convergence monitoring.

    Purpose:
        This function is the computational core of the BVAR estimation. It
        implements a Markov Chain Monte Carlo (MCMC) algorithm to draw samples
        from the joint posterior distribution of the VAR coefficients (B) and
        the error covariance matrix (Sigma). It now includes an intra-run
        convergence check to provide assurance that the generated samples are
        from a stable posterior distribution.

    Process:
        1.  **Initialization**: Sets the random seed for reproducibility, extracts
            data (Y, X), priors, and MCMC settings. Initializes the parameter
            values using OLS estimates to start the chain in a high-probability region.
        2.  **Gibbs Sampling Loop**: Iterates for a specified number of draws,
            alternately sampling from the two conditional posterior distributions:
            a. **Draw Sigma | Y, X, B**: Samples the error covariance matrix
               `Sigma` from its Inverse-Wishart conditional posterior.
            b. **Draw B | Y, X, Sigma**: Samples the VAR coefficients `B` (in
               vectorized form) from their Multivariate Normal conditional posterior.
        3.  **Burn-in and Storage**: Discards the initial burn-in draws and
            stores the subsequent draws.
        4.  **Convergence Monitoring**: Periodically (e.g., every 1000 draws
            after burn-in), it calculates the Geweke diagnostic on a subset of
            key parameters (the main diagonal of the first lag coefficient
            matrix). It issues a `ConvergenceWarning` if the diagnostic scores
            are consistently large, alerting the user to potential issues.

    Args:
        bvar_setup (Dict[str, Any]): The output from `run_phase4_task9_bvar_setup`.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, np.ndarray]: A dictionary containing the posterior draws:
        - 'B_draws': A (num_retained, M, K) array of coefficient draws.
        - 'Sigma_draws': A (num_retained, K, K) array of covariance matrix draws.
    """
    # --- 1. Initialization and Parameter Setup ---
    # Extract MCMC settings from the configuration dictionary.
    mcmc_config: Dict[str, int] = config['computation_settings']['mcmc_sampler']
    total_draws: int = mcmc_config['total_draws']
    burn_in: int = mcmc_config['burn_in_draws']
    random_seed: int = mcmc_config['random_seed']
    num_retained: int = total_draws - burn_in

    # Set the random seed for perfect reproducibility of the MCMC chain.
    np.random.seed(random_seed)

    # Extract data and prior matrices from the setup dictionary.
    Y: np.ndarray = bvar_setup['Y']
    X: np.ndarray = bvar_setup['X']
    priors: Dict[str, np.ndarray] = bvar_setup['priors']
    beta_0: np.ndarray = priors['beta_0']
    V_0: np.ndarray = priors['V_0']
    nu_0: float = priors['nu_0'].item()
    S_0: np.ndarray = priors['S_0']

    # Get model dimensions from metadata for clarity.
    T, K, M = bvar_setup['meta']['T'], bvar_setup['meta']['K'], bvar_setup['meta']['M']

    # Pre-compute constant matrices for efficiency inside the loop.
    XtX: np.ndarray = X.T @ X
    V_0_inv: np.ndarray = np.linalg.inv(V_0)

    # Initialize the parameters using OLS estimates to start the chain in a reasonable place.
    B_ols: np.ndarray = np.linalg.solve(XtX, X.T @ Y)
    U_ols: np.ndarray = Y - X @ B_ols
    Sigma_ols: np.ndarray = (U_ols.T @ U_ols) / (T - M)

    # Set initial draws for the Gibbs sampler.
    B_draw: np.ndarray = B_ols
    Sigma_draw: np.ndarray = Sigma_ols

    # Use lists for temporary storage during the run for easier appending.
    temp_B_draws: List[np.ndarray] = []
    temp_Sigma_draws: List[np.ndarray] = []

    # --- 2. Iterative Sampling Procedure (The Gibbs Loop) ---
    # Use tqdm for a progress bar to monitor the long-running loop.
    for i in tqdm(range(total_draws), desc="Running BVAR Gibbs Sampler"):

        # === Draw Sigma | Y, X, B ~ IW(nu_post, S_post) ===
        # Calculate residuals from the current coefficient draw.
        U: np.ndarray = Y - X @ B_draw
        # Update the parameters of the Inverse-Wishart distribution.
        nu_post: float = nu_0 + T
        S_post: np.ndarray = S_0 + (U.T @ U)
        # Draw Sigma from IW(nu_post, S_post).
        Sigma_draw = stats.invwishart.rvs(df=nu_post, scale=S_post)

        # === Draw B | Y, X, Sigma ~ N(beta_bar, V_bar) ===
        # Invert the newly drawn Sigma.
        Sigma_inv: np.ndarray = np.linalg.inv(Sigma_draw)
        # Update the parameters of the Multivariate Normal distribution for vec(B).
        V_bar: np.ndarray = np.linalg.inv(np.kron(Sigma_inv, XtX) + V_0_inv)

        beta_bar_term1: np.ndarray = (Sigma_inv @ Y.T @ X).flatten(order='F')
        beta_bar_term2: np.ndarray = V_0_inv @ beta_0
        beta_bar: np.ndarray = V_bar @ (beta_bar_term1 + beta_bar_term2)

        # Draw vec(B) from N(beta_bar, V_bar) using a stable Cholesky method.
        try:
            chol_V_bar: np.ndarray = np.linalg.cholesky(V_bar)
            beta_draw_flat: np.ndarray = beta_bar + chol_V_bar @ np.random.randn(M * K)
        except np.linalg.LinAlgError:
            # Fallback to standard multivariate normal if Cholesky fails.
            beta_draw_flat = stats.multivariate_normal.rvs(mean=beta_bar, cov=V_bar)

        # Reshape the flattened vector back into the (M x K) coefficient matrix.
        B_draw = beta_draw_flat.reshape((M, K), order='F')

        # --- 3. Storage ---
        # Store all draws temporarily. Burn-in will be discarded after the loop.
        temp_B_draws.append(B_draw)
        temp_Sigma_draws.append(Sigma_draw)

        # --- 4. Convergence Monitoring (as per blueprint) ---
        # Run the diagnostic periodically after the burn-in phase has started.
        current_post_burnin_draws = i - burn_in + 1
        if i >= burn_in and current_post_burnin_draws % 1000 == 0 and current_post_burnin_draws > 100:
            # Extract the post-burn-in chain so far.
            current_chain_B: np.ndarray = np.array(temp_B_draws[burn_in:i+1])

            # Check a few key parameters (diagonal of A_1 matrix).
            z_scores: List[float] = []
            for k_var in range(K):
                # The index in the B matrix for the first own lag of variable k_var is (k_var, k_var).
                param_chain: np.ndarray = current_chain_B[:, k_var, k_var]
                z_scores.append(_calculate_geweke_diagnostic(param_chain))

            # Check if any z-scores are large (e.g., > 2), indicating potential non-convergence.
            if any(np.abs(z) > 2.0 for z in z_scores if not np.isnan(z)):
                warnings.warn(
                    f"Geweke diagnostic z-score > 2.0 at iteration {i+1}. "
                    f"Chain may not have converged. Z-scores: {[round(z, 2) for z in z_scores]}",
                    ConvergenceWarning
                )

    # --- Finalize Storage ---
    # Discard the burn-in draws and convert lists to final NumPy arrays.
    B_draws_storage: np.ndarray = np.array(temp_B_draws[burn_in:])
    Sigma_draws_storage: np.ndarray = np.array(temp_Sigma_draws[burn_in:])

    # Return a dictionary containing the final, retained posterior draws.
    return {
        'B_draws': B_draws_storage,
        'Sigma_draws': Sigma_draws_storage
    }


In [None]:
# Task 11: Local Projections Estimation

# =============================================================================
# TASK 11: LOCAL PROJECTIONS ESTIMATION
# =============================================================================

def _prepare_lp_regression_data(
    df: pd.DataFrame,
    dependent_var: str,
    shock_vars: List[str],
    control_vars: List[str],
    horizon: int,
    lags: int
) -> Tuple[pd.Series, pd.DataFrame]:
    """
    Prepares the y vector and X matrix for a single LP regression.

    This helper function constructs the specific dataset required for estimating
    the local projection for one variable at one horizon.

    Args:
        df (pd.DataFrame): The full analysis-ready dataset.
        dependent_var (str): The name of the endogenous variable to project.
        shock_vars (List[str]): Names of the contemporaneous shock variables.
        control_vars (List[str]): Names of all control variables (other endog,
                                  lags, deterministics).
        horizon (int): The projection horizon `h`.
        lags (int): The number of lags to include for all controls.

    Returns:
        Tuple[pd.Series, pd.DataFrame]:
        - y: The dependent variable vector (y_{t+h}).
        - X: The matrix of regressors at time t.
    """
    # Create a new DataFrame for this regression to avoid side effects.
    reg_df = pd.DataFrame(index=df.index)

    # --- Create the dependent variable y_{t+h} ---
    reg_df['y_h'] = df[dependent_var].shift(-horizon)

    # --- Create the regressors at time t ---
    # 1. Contemporaneous shocks
    reg_df[shock_vars] = df[shock_vars]

    # 2. Lagged controls
    for var in control_vars:
        for i in range(1, lags + 1):
            reg_df[f"{var}_L{i}"] = df[var].shift(i)

    # 3. Add a constant for the intercept.
    reg_df = sm.add_constant(reg_df, prepend=True)

    # --- Align data by dropping all rows with NaNs ---
    reg_df.dropna(inplace=True)

    # Separate and return the final y vector and X matrix.
    y = reg_df['y_h']
    X = reg_df.drop(columns=['y_h'])

    return y, X


def estimate_local_projections(
    analysis_ready_df: pd.DataFrame,
    endogenous_vars: List[str],
    shock_vars: List[str],
    horizon: int,
    lags: int,
    confidence_level: float = 0.90
) -> Dict[str, np.ndarray]:
    """
    Estimates impulse response functions using Local Projections (Jordà, 2005).

    Purpose:
        This function provides an alternative, often more robust, method to VARs
        for estimating impulse responses. It runs a separate OLS regression for
        each variable and each forecast horizon.

    Process:
        For each endogenous variable and for each horizon `h` from 0 to `H`:
        1.  **Construct Data**: Creates the dependent variable `y_{t+h}` and the
            matrix of regressors `X_t`. `X_t` includes the contemporaneous
            shocks of interest, plus lags of the dependent variable, lags of
            all other control variables, and deterministic terms.
            Equation: `y_{t+h} = beta_h * shock_t + controls_t + e_{t+h}`
        2.  **Estimate Model**: Fits the regression using Ordinary Least Squares (OLS).
        3.  **Calculate HAC Errors**: Computes Newey-West Heteroskedasticity and
            Autocorrelation Consistent (HAC) standard errors. This is critical
            because the regression residuals are serially correlated by
            construction for horizons `h > 0`. An automatic bandwidth selection
            rule is used.
        4.  **Extract Results**: Stores the estimated coefficient on the shock
            variable (`beta_h`), which is the impulse response at horizon `h`.
        5.  **Compute Confidence Bands**: Calculates the confidence interval
            around the point estimate using the HAC standard errors.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset.
        endogenous_vars (List[str]): List of endogenous variable names.
        shock_vars (List[str]): List of shock variable names to estimate responses to.
        horizon (int): The maximum forecast horizon `H`.
        lags (int): The number of lags `p` for all control variables.
        confidence_level (float): The confidence level for the error bands (e.g., 0.90 for 90%).

    Returns:
        Dict[str, np.ndarray]: A dictionary containing the results arrays:
        - 'irf_point': Point estimates of the IRFs.
        - 'irf_lower': Lower bounds of the confidence intervals.
        - 'irf_upper': Upper bounds of the confidence intervals.
        The shape of each array is (num_endog_vars, num_shocks, horizon + 1).
    """
    # --- Initialization ---
    num_endog = len(endogenous_vars)
    num_shocks = len(shock_vars)

    # Pre-allocate NumPy arrays to store the results.
    irf_point = np.zeros((num_endog, num_shocks, horizon + 1))
    irf_lower = np.zeros((num_endog, num_shocks, horizon + 1))
    irf_upper = np.zeros((num_endog, num_shocks, horizon + 1))

    # Define the set of all possible control variables.
    # This includes all endogenous variables and the shocks themselves.
    all_controls = endogenous_vars + shock_vars

    # Calculate the critical value from the normal distribution for the confidence interval.
    alpha = 1 - confidence_level
    z_critical = stats.norm.ppf(1 - alpha / 2)

    # --- Main Loop: Iterate over variables, shocks, and horizons ---
    # Use tqdm for progress tracking as this can be computationally intensive.
    with tqdm(total=num_endog * (horizon + 1), desc="Running Local Projections") as pbar:
        for i, dep_var in enumerate(endogenous_vars):
            for h in range(horizon + 1):
                # --- Step 1: Prepare data for this specific regression ---
                y, X = _prepare_lp_regression_data(
                    df=analysis_ready_df,
                    dependent_var=dep_var,
                    shock_vars=shock_vars,
                    control_vars=all_controls,
                    horizon=h,
                    lags=lags
                )

                # --- Step 2: Estimate OLS model ---
                model = sm.OLS(y, X).fit()

                # --- Step 3: Calculate HAC standard errors ---
                # Use statsmodels' robust implementation of Newey-West.
                # `maxlags` is chosen automatically based on sample size if None.
                hac_cov = sm.stats.sandwich_covariance.cov_hac(model, nlags=None)
                hac_se = np.sqrt(np.diag(hac_cov))

                # --- Step 4 & 5: Extract results and compute CIs ---
                for j, shock_var in enumerate(shock_vars):
                    # Get the index of the shock coefficient in the results.
                    shock_idx = X.columns.get_loc(shock_var)

                    # The IRF is the coefficient on the contemporaneous shock.
                    point_estimate = model.params[shock_idx]

                    # The standard error is the HAC-adjusted SE.
                    std_err = hac_se[shock_idx]

                    # Store the point estimate.
                    irf_point[i, j, h] = point_estimate
                    # Store the lower confidence band.
                    irf_lower[i, j, h] = point_estimate - z_critical * std_err
                    # Store the upper confidence band.
                    irf_upper[i, j, h] = point_estimate + z_critical * std_err

                pbar.update(1)

    return {
        'irf_point': irf_point,
        'irf_lower': irf_lower,
        'irf_upper': irf_upper
    }


# =============================================================================
# ORCHESTRATOR FUNCTION FOR TASK 11
# =============================================================================

def run_phase4_task11_local_projections(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, np.ndarray]:
    """
    Orchestrates the full Local Projections estimation pipeline for Task 11.

    This master function extracts the necessary model specifications from the
    configuration and runs the local projection estimator to generate impulse
    response functions and their confidence bands.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset from Task 8.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, np.ndarray]: A dictionary containing the IRF point estimates
                               and confidence bands from the LP estimation.
    """
    # --- Extract specifications from config ---
    lp_config = config['local_projection_specification']
    bvar_config = config['bvar_specification']
    comp_config = config['computation_settings']['impulse_response_functions']

    # Identify endogenous variables and shocks to analyze.
    endogenous_vars = bvar_config['endogenous_variables']
    shock_vars = [var for var in bvar_config['exogenous_variables'] if 'shock' in var]

    # Get model parameters.
    horizon = comp_config['horizon_months']
    lags = lp_config['lags_dependent_variable'] # Assuming all lags are the same

    # Get inference parameters.
    confidence_level = comp_config['credible_interval_level'] # Use same level for CIs

    # --- Run the Local Projections estimator ---
    lp_results = estimate_local_projections(
        analysis_ready_df=analysis_ready_df,
        endogenous_vars=endogenous_vars,
        shock_vars=shock_vars,
        horizon=horizon,
        lags=lags,
        confidence_level=confidence_level
    )

    return lp_results


In [None]:
# Task 12: VAR-Based Impulse Response Functions

# =============================================================================
# TASK 12: VAR-BASED IMPULSE RESPONSE FUNCTIONS
# =============================================================================

def _calculate_irfs_for_single_draw(
    B_draw: np.ndarray,
    shock_indices: List[int],
    num_vars: int,
    lags: int,
    horizon: int
) -> np.ndarray:
    """
    Helper to compute IRFs for a single posterior draw of coefficients.

    Args:
        B_draw (np.ndarray): A single (M, K) coefficient matrix draw.
        shock_indices (List[int]): The column indices in X corresponding to the shocks.
        num_vars (int): The number of endogenous variables (K).
        lags (int): The number of lags in the VAR (p).
        horizon (int): The maximum horizon for the IRFs (H).

    Returns:
        np.ndarray: An (H+1, K, num_shocks) array of IRFs for this draw.
    """
    # --- 1. Construct Companion Matrix F ---
    # The companion matrix has dimensions (Kp x Kp).
    F = np.zeros((num_vars * lags, num_vars * lags))
    # The first block-row contains the K x K coefficient matrices A_1, ..., A_p.
    for i in range(lags):
        # Extract A_i from the top of the B_draw matrix.
        A_i = B_draw[i * num_vars : (i + 1) * num_vars, :].T
        F[0:num_vars, i * num_vars : (i + 1) * num_vars] = A_i
    # The lower blocks form an identity matrix to shift the lags.
    if lags > 1:
        I_block = np.eye(num_vars * (lags - 1))
        F[num_vars:, 0 : num_vars * (lags - 1)] = I_block

    # --- 2. Extract Impact Matrix B_0 for shocks ---
    # The impact matrix is the set of coefficients on the shock variables.
    B0 = B_draw[shock_indices, :] # Shape: (num_shocks, K)

    # --- 3. Recursive IRF Calculation ---
    # Pre-allocate storage for the IRFs for this draw.
    irfs = np.zeros((horizon + 1, num_vars, len(shock_indices)))
    # The impact at horizon 0 is simply the B0 matrix (transposed).
    irfs[0, :, :] = B0.T

    # Use the companion form to iterate through horizons.
    # F_power will hold F^h.
    F_power = np.eye(F.shape[0])
    for h in range(1, horizon + 1):
        # Update F_power to F^h.
        F_power = F_power @ F
        # The IRF is the top-left (K x K) block of F^h, post-multiplied by B0.
        # Equation: Psi_h = J * F^h * J' * B0, where J selects the top block.
        response_matrix = F_power[0:num_vars, 0:num_vars]
        irfs[h, :, :] = (response_matrix @ B0.T)

    return irfs


def calculate_var_impulse_responses(
    posterior_draws: Dict[str, np.ndarray],
    bvar_setup: Dict[str, Any],
    config: Dict[str, Any]
) -> Dict[str, np.ndarray]:
    """
    Computes and summarizes Bayesian impulse response functions from posterior draws.

    Purpose:
        This function translates the posterior distribution of VAR coefficients
        into a posterior distribution of impulse responses. It then summarizes
        this distribution to provide point estimates (posterior median) and
        uncertainty bands (credible intervals).

    Process:
        1.  **Iterate Through Draws**: For each saved draw from the Gibbs sampler:
            a. **Construct Companion Matrix**: The VAR(p) model is cast into its
               VAR(1) state-space (companion) form.
            b. **Identify Impact Matrix**: The coefficients corresponding to the
               exogenous shocks are extracted.
            c. **Compute IRFs**: The impulse responses are calculated recursively
               up to the specified horizon using the companion matrix.
        2.  **Store All Draws**: The computed IRFs for all draws are stored in a
            large multi-dimensional array.
        3.  **Summarize Posterior**:
            a. **Point Estimate**: The posterior median is calculated for each
               point of the IRF, providing a robust central tendency estimate.
            b. **Credible Intervals**: The desired percentiles (e.g., 5th and
               95th for a 90% interval) are calculated to represent the range
               of uncertainty.
            c. **Significance**: A boolean mask is created to indicate where the
               credible interval does not contain zero.

    Args:
        posterior_draws (Dict[str, np.ndarray]): The output from the Gibbs
            sampler, containing 'B_draws' and 'Sigma_draws'.
        bvar_setup (Dict[str, Any]): The setup dictionary from Task 9, containing
            model metadata like variable names and lag order.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, np.ndarray]: A dictionary of summary arrays:
        - 'irf_median': The posterior median of the IRFs.
        - 'irf_lower': The lower bound of the credible interval.
        - 'irf_upper': The upper bound of the credible interval.
        - 'irf_significant': Boolean mask for statistical significance.
        Shape of each array is (num_endog_vars, num_shocks, horizon + 1).
    """
    # --- Initialization ---
    B_draws = posterior_draws['B_draws']
    num_draws, M, K = B_draws.shape

    meta = bvar_setup['meta']
    lags = meta['lags']

    irf_config = config['computation_settings']['impulse_response_functions']
    horizon = irf_config['horizon_months']
    ci_level = config['computation_settings']['credible_interval_level']

    # Identify which columns of X correspond to the shocks.
    shock_vars = [var for var in meta['exogenous_vars'] if 'shock' in var]
    # The regressor order is [lags..., exog..., const]
    shock_indices = [K * lags + meta['exogenous_vars'].index(var) for var in shock_vars]
    num_shocks = len(shock_vars)

    # --- Step 1 & 2: Calculate IRFs for all draws ---
    # Pre-allocate a 4D array to store all IRF draws.
    # Dimensions: (draw, horizon, responding_variable, shock)
    all_irf_draws = np.zeros((num_draws, horizon + 1, K, num_shocks))

    for i in tqdm(range(num_draws), desc="Calculating VAR Impulse Responses"):
        all_irf_draws[i, :, :, :] = _calculate_irfs_for_single_draw(
            B_draw=B_draws[i, :, :],
            shock_indices=shock_indices,
            num_vars=K,
            lags=lags,
            horizon=horizon
        )

    # --- Step 3: Summarize the Posterior Distribution of IRFs ---
    # Calculate the lower and upper percentile bounds for the credible interval.
    lower_p = (1 - ci_level) / 2 * 100
    upper_p = (1 - (1 - ci_level) / 2) * 100

    # Calculate median and percentiles across the 'draws' axis (axis=0).
    irf_median = np.median(all_irf_draws, axis=0)
    irf_lower = np.percentile(all_irf_draws, lower_p, axis=0)
    irf_upper = np.percentile(all_irf_draws, upper_p, axis=0)

    # Assess significance: the interval does not contain zero.
    irf_significant = (irf_lower > 0) | (irf_upper < 0)

    # Reshape results to the desired output format: (K, S, H+1)
    # Current shape is (H+1, K, S). Transpose axes.
    final_shape_order = (1, 2, 0) # K, S, H+1

    return {
        'irf_median': irf_median.transpose(final_shape_order),
        'irf_lower': irf_lower.transpose(final_shape_order),
        'irf_upper': irf_upper.transpose(final_shape_order),
        'irf_significant': irf_significant.transpose(final_shape_order),
        'all_irf_draws': all_irf_draws # Optional: return all draws for further analysis
    }


In [None]:
# Task 13: Transmission Channel Analysis Implementation

# =============================================================================
# TASK 13: TRANSMISSION CHANNEL ANALYSIS IMPLEMENTATION
# =============================================================================

def run_transmission_channel_analysis(
    full_analysis_df: pd.DataFrame,
    config: Dict[str, Any],
    channel_specifications: List[Dict[str, Any]]
) -> Dict[str, Dict[str, Any]]:
    """
    Performs BVAR analysis on augmented models to study transmission channels.

    Purpose:
        This function is a high-level orchestrator that systematically investigates
        different economic transmission channels. It manages a "campaign" of
        BVAR model estimations, where each model is an augmentation of a baseline
        specification. It is designed to be flexible, supporting both the
        addition of a group of variables (e.g., a "financial channel") and an
        iterative analysis over a list of variables (e.g., a sectoral "export
        disaggregation").

    Process:
        1.  **Input Validation**: It first validates the structure of the
            `channel_specifications` list to ensure each entry is well-formed.
        2.  **Load Baseline**: Retrieves the list of core endogenous variables
            from the main configuration.
        3.  **Iterate Through Specifications**: For each specification dictionary
            in the input list:
            a. **Augment Specification**: If the type is 'augment', it creates a
               single new list of endogenous variables by adding a group of new
               variables to the baseline.
            b. **Iterate Specification**: If the type is 'iterate', it enters a
               nested loop. In each iteration, it creates a new list of
               endogenous variables by adding just *one* of the specified
               variables to the baseline.
            c. **Execute Pipeline**: For each generated model specification, it
               creates a deep copy of the configuration, updates the list of
               endogenous variables, and executes the full, trusted BVAR
               estimation and IRF calculation pipeline (Tasks 9, 10, and 12).
            d. **Store Results**: The complete results of each model run are
               stored in a master dictionary, keyed by a unique and descriptive
               channel name.

    Args:
        full_analysis_df (pd.DataFrame): An analysis-ready DataFrame that
            contains the baseline variables PLUS all potential channel variables.
        config (Dict[str, Any]): The main study configuration dictionary.
        channel_specifications (List[Dict[str, Any]]): A list of dictionaries,
            each defining an analysis to run. Required keys per dict:
            - 'channel_name' (str): Base name for the analysis.
            - 'type' (str): Method of analysis, either 'augment' or 'iterate'.
            - 'variables' (List[str]): The list of variables for the analysis.

    Returns:
        Dict[str, Dict[str, Any]]: A nested dictionary where top-level keys are
        the unique channel names. Each inner dictionary contains the full IRF
        results and metadata for that specific augmented model run.

    Raises:
        ValueError: If the `channel_specifications` structure is invalid or if
                    a specified variable is not found in the `full_analysis_df`.
    """
    # --- Initialization ---
    # Retrieve the list of core endogenous variables from the baseline specification.
    baseline_endog_vars: List[str] = config['bvar_specification']['endogenous_variables']

    # This dictionary will store the final results for all channel analyses.
    all_channel_results: Dict[str, Dict[str, Any]] = {}

    # --- Main Loop: Iterate through each specified channel analysis ---
    for spec in tqdm(channel_specifications, desc="Analyzing Transmission Channels"):
        # --- 1. Validate the specification entry ---
        if not all(k in spec for k in ['channel_name', 'type', 'variables']):
            raise ValueError(f"Invalid channel specification found: {spec}. Must contain 'channel_name', 'type', and 'variables'.")
        if spec['type'] not in ['augment', 'iterate']:
            raise ValueError(f"Invalid spec type '{spec['type']}'. Must be 'augment' or 'iterate'.")

        # Check that all specified additional variables exist in the main DataFrame.
        for var in spec['variables']:
            if var not in full_analysis_df.columns:
                raise ValueError(f"Variable '{var}' from channel spec '{spec['channel_name']}' not found in the analysis DataFrame.")

        # --- 2. Generate Model Runs based on Specification Type ---
        runs_to_execute: List[Tuple[str, List[str]]] = []

        if spec['type'] == 'augment':
            # For 'augment', create one run that adds all specified variables to the baseline.
            channel_name = spec['channel_name']
            # Use a set for efficient union and to prevent duplicates.
            augmented_vars = list(dict.fromkeys(baseline_endog_vars + spec['variables']))
            runs_to_execute.append((channel_name, augmented_vars))

        elif spec['type'] == 'iterate':
            # For 'iterate', create a separate run for each variable in the list.
            for var in spec['variables']:
                # Create a unique, descriptive name for this specific run.
                channel_name = f"{spec['channel_name']}_{var}"
                # The model includes the baseline plus this single additional variable.
                augmented_vars = baseline_endog_vars + [var]
                runs_to_execute.append((channel_name, augmented_vars))

        # --- 3. Execute the BVAR pipeline for each generated run ---
        for run_name, endog_vars_for_run in runs_to_execute:

            # Create a deep copy of the configuration to prevent side effects.
            # This is critical for ensuring each run is independent.
            run_config = copy.deepcopy(config)
            run_config['bvar_specification']['endogenous_variables'] = endog_vars_for_run

            # Announce which model is being run.
            tqdm.write(f"--- Running analysis for: {run_name} ---")

            # Execute the full, trusted BVAR estimation and IRF pipeline.
            # Task 9: Set up the BVAR with the augmented variable set.
            bvar_setup = run_phase4_task9_bvar_setup(
                analysis_ready_df=full_analysis_df,
                config=run_config
            )

            # Task 10: Run the Gibbs sampler for the augmented model.
            posterior_draws = run_bvar_gibbs_sampler(
                bvar_setup=bvar_setup,
                config=run_config
            )

            # Task 12: Calculate and summarize impulse responses for the augmented model.
            irf_results = calculate_var_impulse_responses(
                posterior_draws=posterior_draws,
                bvar_setup=bvar_setup,
                config=run_config
            )

            # --- 4. Store the results for this run ---
            all_channel_results[run_name] = {
                'irf_results': irf_results,
                'model_metadata': {
                    'channel_name': run_name,
                    'endogenous_variables': endog_vars_for_run,
                    'num_variables': len(endog_vars_for_run),
                    'lags': bvar_setup['meta']['lags']
                }
            }

    # Return the aggregated dictionary containing results from all runs.
    return all_channel_results



In [None]:
# Task 14: Cross-Validation and Model Comparison

# =============================================================================
# TASK 14, STEP 1: IN-SAMPLE FIT ASSESSMENT HELPERS
# =============================================================================

def _calculate_bvar_log_marginal_likelihood(
    Y: np.ndarray,
    X: np.ndarray,
    posterior_draws: Dict[str, np.ndarray]
) -> float:
    """
    Calculates the Log Marginal Likelihood via the Harmonic Mean Estimator.

    Purpose:
        This function provides an estimate of the model evidence, P(Y|M), which
        is a key metric for Bayesian model comparison. It uses the Harmonic
        Mean Estimator (HME), which is simple to compute from posterior draws
        but can be numerically unstable.

    Process:
        1.  **Iterate Draws**: For each posterior draw of the parameters (B, Sigma),
            it calculates the log-likelihood of the data given those parameters.
            The log-likelihood is based on the multivariate normal distribution:
            `ln p(Y|B, Sigma) = sum_{t=1 to T} ln N(U_t | 0, Sigma)`
            where `U = Y - XB`.
        2.  **Handle Instability**: It includes a `try-except` block to handle
            rare cases where a drawn `Sigma` matrix is not positive definite,
            assigning an infinitely poor likelihood to such draws.
        3.  **Stable Mean Calculation**: It computes the harmonic mean in log-space
            using the log-sum-exp trick to prevent numerical underflow/overflow,
            which is a major issue with the naive HME.
            `log(HME) = max(logL) - log(mean(exp(max(logL) - logL)))`

    Args:
        Y (np.ndarray): The (T x K) matrix of endogenous variables.
        X (np.ndarray): The (T x M) matrix of regressors.
        posterior_draws (Dict[str, np.ndarray]): A dictionary containing the
            'B_draws' and 'Sigma_draws' from the Gibbs sampler.

    Returns:
        float: The estimated log marginal likelihood of the BVAR model.
    """
    # Extract the posterior draws for coefficients (B) and covariance (Sigma).
    B_draws: np.ndarray = posterior_draws['B_draws']
    Sigma_draws: np.ndarray = posterior_draws['Sigma_draws']

    # Get dimensions from the draws array.
    num_draws, _, K = B_draws.shape

    # Pre-allocate an array to store the log-likelihood for each posterior draw.
    log_likelihoods: np.ndarray = np.zeros(num_draws)

    # Iterate through each of the posterior draws.
    for i in range(num_draws):
        # Calculate the residuals for the i-th draw: U_i = Y - X * B_i
        U_i: np.ndarray = Y - X @ B_draws[i, :, :]

        try:
            # Calculate the sum of log-PDFs of the multivariate normal distribution
            # for the residuals, given the i-th draw of the covariance matrix.
            log_likelihoods[i] = np.sum(
                multivariate_normal.logpdf(U_i, mean=np.zeros(K), cov=Sigma_draws[i, :, :], allow_singular=False)
            )
        except np.linalg.LinAlgError:
            # If a Sigma draw is not positive definite, its likelihood is zero (log-likelihood is -inf).
            log_likelihoods[i] = -np.inf

    # To compute the harmonic mean stably, use the log-sum-exp trick.
    # This prevents numerical underflow when exponentiating large negative log-likelihoods.
    # Find the maximum log-likelihood to serve as a scaling factor.
    max_log_lik: float = np.max(log_likelihoods)

    # Calculate the log of the mean of the inverse likelihoods.
    log_mean_inv_lik: float = np.log(np.mean(np.exp(-log_likelihoods + max_log_lik)))

    # Combine the terms to get the final log marginal likelihood.
    log_marginal_lik: float = max_log_lik - log_mean_inv_lik

    # Return the final scalar value.
    return float(log_marginal_lik)


def _perform_diebold_mariano_test(
    errors1: np.ndarray,
    errors2: np.ndarray
) -> Tuple[float, float]:
    """
    Performs a Diebold-Mariano test for equal forecast accuracy.

    Purpose:
        This test formally compares the predictive accuracy of two forecast models.
        The null hypothesis is that both models have the same forecast accuracy.
        A low p-value suggests a statistically significant difference in performance.

    Process:
        1.  **Define Loss**: Uses squared error as the loss function.
        2.  **Calculate Loss Differential**: Computes the series `d_t = L(e1_t) - L(e2_t)`,
            where `L(e) = e^2`.
        3.  **Test Mean**: The test statistic is a t-test for whether the mean of `d_t`
            is zero.
        4.  **HAC Standard Errors**: Critically, the standard error of `mean(d_t)` is
            calculated using a Newey-West Heteroskedasticity and Autocorrelation
            Consistent (HAC) estimator, which is necessary because forecast errors
            can be serially correlated.

    Args:
        errors1 (np.ndarray): A 1D array of forecast errors from Model 1.
        errors2 (np.ndarray): A 1D array of forecast errors from Model 2, aligned
                              with `errors1`.

    Returns:
        Tuple[float, float]:
        - The Diebold-Mariano t-statistic.
        - The corresponding p-value.
    """
    # Calculate the loss differential series using squared error loss.
    loss_diff: np.ndarray = errors1**2 - errors2**2

    # Calculate the mean of the loss differential.
    mean_loss_diff: float = np.mean(loss_diff)

    # To get the HAC standard error of the mean, we can regress the loss
    # differential on a constant and compute the HAC variance of the intercept.
    X_const: np.ndarray = np.ones_like(loss_diff)

    # Fit a simple OLS model: loss_diff = const + error
    model: sm.regression.linear_model.RegressionResultsWrapper = sm.OLS(loss_diff, X_const).fit()

    # Compute the HAC covariance matrix for the estimated parameters (just the constant).
    hac_cov: np.ndarray = sm.stats.sandwich_covariance.cov_hac(model)

    # The HAC standard error of the mean is the square root of the variance of the constant.
    hac_se: float = np.sqrt(hac_cov[0, 0])

    # The Diebold-Mariano statistic is the t-statistic for the mean.
    dm_stat: float = mean_loss_diff / hac_se

    # Calculate the two-sided p-value using the t-distribution.
    p_value: float = 2 * (1 - t.cdf(np.abs(dm_stat), df=len(loss_diff) - 1))

    # Return the statistic and its p-value.
    return float(dm_stat), float(p_value)


def assess_in_sample_fit(
    analysis_ready_df: pd.DataFrame,
    bvar_setup: Dict[str, Any],
    posterior_draws: Dict[str, np.ndarray],
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Performs a comprehensive in-sample fit and comparison of BVAR and LP models.

    Purpose:
        This function computes several key metrics to evaluate how well the
        estimated BVAR and Local Projection models fit the data they were
        trained on.

    Process:
        1.  **BVAR Metrics**:
            - Calculates in-sample fitted values using the posterior mean of
              the BVAR coefficients.
            - Computes the Root Mean Squared Error (RMSE) for each endogenous variable.
            - Estimates the Log Marginal Likelihood using the Harmonic Mean Estimator.
        2.  **LP Metrics**:
            - For each endogenous variable, it re-constructs the `h=0` (in-sample)
              Local Projection regression.
            - It fits this regression and calculates the in-sample RMSE.
        3.  **Model Comparison**:
            - It performs a Diebold-Mariano test to formally compare the
              in-sample forecast accuracy (based on squared errors) of the BVAR
              and LP models for each variable.
        4.  **Reporting**: Compiles all metrics into a single, well-organized
            DataFrame for easy comparison.

    Args:
        analysis_ready_df (pd.DataFrame): The full dataset used for estimation.
        bvar_setup (Dict[str, Any]): The setup dictionary from Task 9.
        posterior_draws (Dict[str, np.ndarray]): The posterior draws from Task 10.
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        pd.DataFrame: A report summarizing in-sample fit metrics, indexed by
                      the endogenous variable names.
    """
    # --- BVAR In-Sample Fit ---
    # Extract data matrices and posterior mean of coefficients.
    Y, X = bvar_setup['Y'], bvar_setup['X']
    B_mean = np.mean(posterior_draws['B_draws'], axis=0)

    # Calculate fitted values and residuals: Y_hat = X * B_mean
    Y_fit_bvar = X @ B_mean
    errors_bvar = Y - Y_fit_bvar

    # Calculate RMSE for each endogenous variable.
    rmse_bvar = np.sqrt(np.mean(errors_bvar**2, axis=0))

    # Calculate the log marginal likelihood for the BVAR model.
    lml_bvar = _calculate_bvar_log_marginal_likelihood(Y, X, posterior_draws)

    # --- LP In-Sample Fit (h=0) ---
    # Pre-allocate array for LP errors.
    errors_lp = np.full_like(Y, np.nan)
    endog_vars = bvar_setup['meta']['endogenous_vars']
    shock_vars = [var for var in config['bvar_specification']['exogenous_variables'] if 'shock' in var]

    # Loop through each endogenous variable to run its h=0 LP regression.
    for i, var in enumerate(endog_vars):
        # Prepare the specific y and X for this regression.
        y_lp, X_lp = _prepare_lp_regression_data(
            df=analysis_ready_df,
            dependent_var=var,
            shock_vars=shock_vars,
            control_vars=endog_vars + shock_vars,
            horizon=0,
            lags=config['local_projection_specification']['lags_dependent_variable']
        )
        # Ensure alignment between the BVAR and LP samples.
        common_index = y_lp.index.intersection(bvar_setup['valid_index'])
        y_lp, X_lp = y_lp.loc[common_index], X_lp.loc[common_index]

        # Fit the h=0 LP model via OLS.
        model_lp = sm.OLS(y_lp, X_lp).fit()

        # Calculate fitted values and errors.
        y_fit_lp = model_lp.predict(X_lp)
        errors_lp[np.isin(bvar_setup['valid_index'], common_index), i] = y_lp - y_fit_lp

    # Calculate RMSE for the LP model, ignoring NaNs from any misalignment.
    rmse_lp = np.sqrt(np.nanmean(errors_lp**2, axis=0))

    # --- Diebold-Mariano Test for Model Comparison ---
    dm_results = {}
    for i, var in enumerate(endog_vars):
        # Create a mask to select only the common, valid observations for both error series.
        valid_mask = ~np.isnan(errors_lp[:, i])
        # Perform the test on the aligned error vectors.
        dm_stat, p_val = _perform_diebold_mariano_test(errors_bvar[valid_mask, i], errors_lp[valid_mask, i])
        dm_results[var] = p_val

    # --- Assemble Final Report ---
    # Create a DataFrame to hold the comparative metrics.
    report = pd.DataFrame({
        'BVAR_RMSE': rmse_bvar,
        'LP_RMSE': rmse_lp,
        'DM_Test_p_value': pd.Series(dm_results)
    }, index=endog_vars)

    # Add the scalar log marginal likelihood to the report.
    report['BVAR_LogMarginalLikelihood'] = lml_bvar

    return report


# =============================================================================
# TASK 14, STEP 2: OUT-OF-SAMPLE VALIDATION HELPERS
# =============================================================================

def _generate_bvar_h_step_forecast(
    train_df: pd.DataFrame,
    B_mean: np.ndarray,
    bvar_meta: Dict[str, Any],
    config: Dict[str, Any],
    h: int
) -> np.ndarray:
    """
    Generates a rigorous h-step ahead forecast from an estimated BVAR model.

    Purpose:
        This function performs a true multi-step forecast by iterating the
        VAR equations forward in time. It is a critical component for any
        out-of-sample evaluation, simulating the generation of a forecast for
        a future period `t+h` using only information available at time `t`.

    Process:
        1.  **Initialize History**: Extracts the last `p` observations of the
            endogenous variables from the training data, which are needed to
            start the forecast.
        2.  **Project Exogenous Path**: Critically, it projects the future path
            of all deterministic exogenous variables.
            - **Shocks**: Future shocks are assumed to be their expected value, zero.
            - **Trend**: The time trend is projected linearly.
            - **Dummies**: The value of dummy variables (e.g., COVID dummy) is
              determined for each future date based on the model configuration.
        3.  **Iterate Forward**: Loops `h` times to generate the forecast path.
            - In each step `s` from 1 to `h`, it constructs the full regressor
              vector `X_{t+s}` for the next forecast.
            - The lagged endogenous values in `X_{t+s}` are taken from the
              history matrix, which contains actual data and previous forecasts.
            - The exogenous values are taken from the pre-computed projected path.
            - It calculates the one-step-ahead forecast `y_hat_{t+s}`.
            - It updates the history matrix by dropping the oldest observation
              and appending the new forecast, for use in the next iteration.
        4.  **Return Final Forecast**: After `h` iterations, it returns the final
            forecast for horizon `h`, which is `y_hat_{t+h}`.

    Args:
        train_df (pd.DataFrame): The training data up to time `t`.
        B_mean (np.ndarray): The (M x K) posterior mean coefficient matrix.
        bvar_meta (Dict[str, Any]): Metadata about the BVAR model, including
                                   variable names, dimensions, and lag order.
        config (Dict[str, Any]): The main study configuration dictionary, needed
                                 for dummy variable definitions.
        h (int): The forecast horizon.

    Returns:
        np.ndarray: A 1D array of length K containing the h-step-ahead forecast.
    """
    # --- 1. Initialization and Parameter Extraction ---
    # Extract model dimensions and variable names from metadata.
    p: int = bvar_meta['lags']
    K: int = bvar_meta['K']
    M: int = bvar_meta['M']
    endog_vars: List[str] = bvar_meta['endogenous_vars']

    # The full list of regressors in the correct order.
    # The last element is 'const'.
    full_regressor_list: List[str] = bvar_meta['regressor_cols']

    # Extract the last `p` observations of endogenous variables needed to start the forecast.
    Y_hist: np.ndarray = train_df[endog_vars].values[-p:]

    # --- 2. Project the Future Path of Exogenous Variables ---
    # Pre-allocate a matrix to hold the projected path.
    future_exog: np.ndarray = np.zeros((h, M))

    # Get the last timestamp from the training data to project dates forward.
    last_date: pd.Timestamp = train_df.index[-1]

    # Get the last known value of the trend.
    last_trend: float = train_df['trend'].iloc[-1] if 'trend' in train_df.columns else 0

    # Get the COVID period definition from the configuration.
    covid_period = config['bvar_specification']['covid_period']
    covid_start = pd.to_datetime(covid_period['start_date'])
    covid_end = pd.to_datetime(covid_period['end_date'])

    # Populate the future exogenous matrix step-by-step.
    for s in range(1, h + 1):
        # The index for the current forecast step (s-1 for 0-based indexing).
        step_idx = s - 1

        # Project each exogenous variable.
        for i, var_name in enumerate(full_regressor_list):
            if 'shock' in var_name:
                # Future shocks are their expectation: zero.
                future_exog[step_idx, i] = 0.0
            elif 'trend' == var_name:
                # The trend increases by 1 each period.
                future_exog[step_idx, i] = last_trend + s
            elif 'dummy' in var_name:
                # Project the dummy based on the future date.
                future_date = last_date + pd.DateOffset(months=s)
                is_in_covid_period = (future_date >= covid_start) and (future_date <= covid_end)
                future_exog[step_idx, i] = 1.0 if is_in_covid_period else 0.0
            elif 'const' == var_name:
                # The constant is always 1.
                future_exog[step_idx, i] = 1.0

    # --- 3. Iterate Forward to Generate Forecast Path ---
    # Pre-allocate array for the sequence of forecasts.
    forecasts: np.ndarray = np.zeros((h, K))

    # This is the core iterative loop.
    for i in range(h):
        # Construct the full regressor vector X_{t+i+1} for the next forecast.
        X_t_plus_i: np.ndarray = np.zeros(M)

        # a. Fill in the lagged endogenous variables from the history matrix.
        # The history matrix is ordered [y_{t-p+1}, ..., y_t]. We flatten it.
        # The order 'F' (column-major) is crucial to match the BVAR setup.
        lagged_endog_values = Y_hist.flatten(order='F')

        # Find the indices corresponding to the lagged variables.
        lag_indices = [idx for idx, name in enumerate(full_regressor_list) if '_L' in name]
        X_t_plus_i[lag_indices] = lagged_endog_values

        # b. Fill in the exogenous and deterministic variables from the projected path.
        exog_indices = [idx for idx, name in enumerate(full_regressor_list) if '_L' not in name]
        X_t_plus_i[exog_indices] = future_exog[i, exog_indices]

        # Generate the one-step-ahead forecast: y_hat = X * B
        y_hat: np.ndarray = X_t_plus_i @ B_mean

        # Store the forecast for this step.
        forecasts[i, :] = y_hat

        # Update the history matrix for the next iteration: drop the oldest
        # observation and append the newest forecast.
        Y_hist = np.vstack([Y_hist[1:], y_hat])

    # --- 4. Return the Final h-step-ahead Forecast ---
    # The final forecast is the last one generated in the loop.
    return forecasts[h-1, :]


# =============================================================================
# IMPLEMENTATION OF `_run_single_forecast_iteration` (Task 14)
# =============================================================================

def _run_single_forecast_iteration(
    t: int,
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any],
    forecast_horizons: List[int]
) -> List[Dict[str, Any]]:
    """
    Performs one complete step of the recursive forecasting exercise.

    Purpose:
        This function is the core worker for the parallelized out-of-sample
        validation. For a single forecast origin `t`, it re-estimates both the
        BVAR and LP models on all data available up to `t`, generates forecasts
        for all specified horizons, and returns the calculated forecast errors.

    Process:
        1.  **Slice Data**: Creates the training dataset using all data up to time `t`.
        2.  **Re-estimate BVAR**: Runs the entire BVAR pipeline (setup and Gibbs
            sampling) on the training data to get a new posterior mean `B_mean_t`.
        3.  **Re-estimate LP**: Runs the entire LP estimation pipeline on the
            training data to get new horizon-specific coefficients.
        4.  **Forecast**: For each endogenous variable and each horizon `h`:
            - Generates the h-step-ahead BVAR forecast using the iterative method.
            - Generates the h-step-ahead LP forecast using the direct method.
            - Retrieves the actual outcome `y_{t+h}`.
            - Calculates and stores the forecast errors for both models in a
              "tidy" format (one row per observation).

    Args:
        t (int): The forecast origin time index (integer location).
        analysis_ready_df (pd.DataFrame): The full dataset.
        config (Dict[str, Any]): The main study configuration.
        forecast_horizons (List[int]): The horizons to forecast.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries, each representing a single
                              forecast error observation.
    """
    # --- 1. Slice Data ---
    # Define the training data for this iteration.
    train_df: pd.DataFrame = analysis_ready_df.iloc[:t]

    # --- 2. Re-estimate BVAR ---
    # Run the full BVAR pipeline on the current training data slice.
    bvar_setup_t: Dict[str, Any] = run_phase4_task9_bvar_setup(train_df, config)
    posterior_draws_t: Dict[str, np.ndarray] = run_bvar_gibbs_sampler(bvar_setup_t, config)
    B_mean_t: np.ndarray = np.mean(posterior_draws_t['B_draws'], axis=0)

    # --- 3. Re-estimate LP ---
    # Run the full LP pipeline on the current training data slice.
    lp_results_t: Dict[str, np.ndarray] = run_phase4_task11_local_projections(train_df, config)

    # --- 4. Generate Forecasts and Errors ---
    # A list to store the tidy error records calculated in this iteration.
    iteration_errors: List[Dict[str, Any]] = []
    endog_vars: List[str] = config['bvar_specification']['endogenous_variables']

    # Loop through each required forecast horizon.
    for h in forecast_horizons:
        # Ensure the actual outcome is within the dataset bounds.
        if t + h > len(analysis_ready_df):
            continue

        # Get the true values that we are trying to forecast.
        actual_values: np.ndarray = analysis_ready_df[endog_vars].iloc[t + h - 1].values

        # --- BVAR Forecast ---
        # Generate the h-step-ahead BVAR forecast using the corrected helper.
        y_hat_bvar: np.ndarray = _generate_bvar_h_step_forecast(train_df, B_mean_t, bvar_setup_t['meta'], config, h)
        error_bvar: np.ndarray = actual_values - y_hat_bvar

        # --- LP Forecast ---
        # The LP forecast is direct, using the coefficients for horizon `h`.
        # First, construct the X_t vector from the last row of the training data.
        _, X_lp_t = _prepare_lp_regression_data(
            df=train_df,
            dependent_var=endog_vars[0], # Dependent var doesn't matter for X construction
            shock_vars=[var for var in config['bvar_specification']['exogenous_variables'] if 'shock' in var],
            control_vars=endog_vars + [var for var in config['bvar_specification']['exogenous_variables'] if 'shock' in var],
            horizon=h-1, # Use h-1 to get regressors at time t for forecast at t+h
            lags=config['local_projection_specification']['lags_dependent_variable']
        )
        # Extract the last row of the constructed X matrix, which corresponds to time t.
        X_t_vector = X_lp_t.iloc[-1].values

        # Pre-allocate forecast and error vectors for the LP model.
        y_hat_lp = np.zeros(len(endog_vars))

        # For each variable, get its specific h-step-ahead coefficients and predict.
        for i, var in enumerate(endog_vars):
            # The LP IRF is the coefficient on the shock. The forecast needs all coefficients.
            # This requires re-fitting the model to get all coefficients.
            y_lp_h, X_lp_h = _prepare_lp_regression_data(train_df, var, [], [], h-1, config['local_projection_specification']['lags_dependent_variable'])
            lp_model_h = sm.OLS(y_lp_h, X_lp_h).fit()
            y_hat_lp[i] = lp_model_h.predict(X_lp_h.iloc[-1])

        error_lp: np.ndarray = actual_values - y_hat_lp

        # --- Store errors in tidy format ---
        for i, var_name in enumerate(endog_vars):
            iteration_errors.append({
                't': t, 'h': h, 'variable_name': var_name, 'model': 'BVAR', 'error': error_bvar[i]
            })
            iteration_errors.append({
                't': t, 'h': h, 'variable_name': var_name, 'model': 'LP', 'error': error_lp[i]
            })

    return iteration_errors


# =============================================================================
# IMPLEMENTATION OF `evaluate_forecasting_performance_full` (Task 14)
# =============================================================================

def evaluate_forecasting_performance_full(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any],
    validation_split_ratio: float = 0.7,
    forecast_horizons: List[int] = [1, 6, 12],
    n_jobs: int = -1
) -> pd.DataFrame:
    """
    Evaluates out-of-sample forecasting performance using a full recursive scheme.

    Purpose:
        This function provides a rigorous out-of-sample test of the models'
        forecasting capabilities. It simulates how the models would have
        performed in real-time by re-estimating them at each point in the
        evaluation period. This process is computationally intensive and is
        parallelized to run efficiently on multi-core systems.

    Process:
        1.  **Split Data**: Divides the data into an initial training set and an
            evaluation set.
        2.  **Parallel Loop**: It iterates through each time step in the
            evaluation period. This loop is parallelized using `joblib`, where
            each time step `t` is a separate job.
        3.  **Worker Function**: Each parallel job calls the
            `_run_single_forecast_iteration` function, which performs the
            computationally heavy task of re-estimating both BVAR and LP models
            and computing forecast errors for that single time step.
        4.  **Aggregate Results**: After all parallel jobs are complete, it
            collects the forecast errors from all time steps.
        5.  **Calculate Metrics**: It converts the list of error records into a
            DataFrame and calculates the overall Root Mean Squared Error (RMSE)
            and Mean Absolute Error (MAE) for each model, variable, and forecast
            horizon.

    Args:
        analysis_ready_df (pd.DataFrame): The full analysis-ready dataset.
        config (Dict[str, Any]): The main study configuration dictionary.
        validation_split_ratio (float): The proportion of data for the initial
                                        training window.
        forecast_horizons (List[int]): Horizons to evaluate.
        n_jobs (int): The number of CPU cores to use for parallelization.
                      -1 means use all available cores.

    Returns:
        pd.DataFrame: A DataFrame summarizing forecast accuracy (RMSE, MAE),
                      multi-indexed by variable, horizon, and model.
    """
    # --- 1. Split Data ---
    # Determine the number of observations and the split point for the evaluation.
    n_obs: int = len(analysis_ready_df)
    n_initial: int = int(n_obs * validation_split_ratio)

    # --- 2. Parallel Loop ---
    # Use joblib.Parallel to run the forecast iterations in parallel.
    # This significantly speeds up the computationally expensive recursive scheme.
    # `delayed` is used to create a lightweight "promise" of the function call.
    results_list: List[List[Dict[str, Any]]] = Parallel(n_jobs=n_jobs)(
        delayed(_run_single_forecast_iteration)(
            t, analysis_ready_df, config, forecast_horizons
        ) for t in tqdm(range(n_initial, n_obs - max(forecast_horizons)), desc="Recursive OOS Forecasting")
    )

    # --- 3. Aggregate Results ---
    # The result is a list of lists; flatten it into a single list of dictionaries.
    flat_results: List[Dict[str, Any]] = [item for sublist in results_list for item in sublist]

    # If no results were generated, return an empty DataFrame.
    if not flat_results:
        return pd.DataFrame(columns=['RMSE', 'MAE'])

    # Convert the list of tidy error records into a DataFrame.
    error_df = pd.DataFrame.from_records(flat_results)

    # --- 4. Calculate Metrics ---
    # Define the aggregation functions for RMSE and MAE.
    agg_funcs = {
        'RMSE': ('error', lambda x: np.sqrt(np.mean(x**2))),
        'MAE': ('error', lambda x: np.mean(np.abs(x)))
    }

    # Group by the dimensions of interest and apply the aggregation functions.
    summary_report = error_df.groupby(['variable_name', 'h', 'model']).agg(**agg_funcs)

    # Return the final, clean summary report.
    return summary_report


# =============================================================================
# TASK 14, ORCHESTRATOR
# =============================================================================

def run_full_model_validation_suite(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any],
    lag_specs_to_test: List[int] = [1, 2, 3, 4, 5, 6]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the full suite of validation, comparison, and diagnostics.

    Purpose:
        This is the master function for Task 14. It executes all three major
        validation components: in-sample fit assessment, a full-scale out-of-
        sample forecasting horse race, and model diagnostics including lag
        selection.

    Process:
        1.  **Full-Sample Estimation**: It first runs the BVAR and LP estimations
            on the entire dataset. These full-sample results are used for the
            in-sample fit and residual diagnostic checks.
        2.  **In-Sample Assessment**: Calls `assess_in_sample_fit` to compute
            RMSE, LML, and Diebold-Mariano test statistics.
        3.  **Out-of-Sample Assessment**: Calls `evaluate_forecasting_performance_full`
            to run the computationally intensive recursive forecasting exercise.
        4.  **Lag Selection**: It iterates through a list of possible lag lengths,
            re-estimating the BVAR for each, and calculates AIC and BIC to help
            determine the optimal lag order.
        5.  **Residual Diagnostics**: It runs a standard battery of tests
            (Ljung-Box, ARCH-LM, Jarque-Bera) on the residuals of the full-sample
            baseline BVAR model.
        6.  **Compile Reports**: Gathers all results into a final, comprehensive
            dictionary of report DataFrames.

    Args:
        analysis_ready_df (pd.DataFrame): The full analysis-ready dataset.
        config (Dict[str, Any]): The main study configuration dictionary.
        lag_specs_to_test (List[int]): A list of lag lengths (p) to evaluate
                                       for the BVAR model.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary of comprehensive validation reports.
    """
    # --- Full-Sample Estimation (Baseline for In-Sample and Diagnostics) ---
    bvar_setup: Dict[str, Any] = run_phase4_task9_bvar_setup(analysis_ready_df, config)
    posterior_draws: Dict[str, np.ndarray] = run_bvar_gibbs_sampler(bvar_setup, config)

    # --- Step 1: In-Sample Fit Assessment ---
    in_sample_report: pd.DataFrame = assess_in_sample_fit(
        analysis_ready_df, bvar_setup, posterior_draws, config
    )

    # --- Step 2: Out-of-Sample Validation ---
    out_of_sample_report: pd.DataFrame = evaluate_forecasting_performance_full(
        analysis_ready_df, config
    )

    # --- Step 3: Lag Selection and Residual Diagnostics ---
    info_criteria: List[Dict[str, Any]] = []
    for p in tqdm(lag_specs_to_test, desc="Testing Lag Specifications"):
        # Create a temporary config with the new lag length.
        temp_config = config.copy()
        temp_config['bvar_specification'] = config['bvar_specification'].copy()
        temp_config['bvar_specification']['lags'] = p
        try:
            # Re-run the BVAR pipeline for this lag length.
            bvar_setup_p = run_phase4_task9_bvar_setup(analysis_ready_df, temp_config)
            posterior_draws_p = run_bvar_gibbs_sampler(bvar_setup_p, temp_config)

            # Calculate AIC and BIC using the posterior mean estimates.
            Y_p, X_p = bvar_setup_p['Y'], bvar_setup_p['X']
            B_mean_p = np.mean(posterior_draws_p['B_draws'], axis=0)
            Sigma_mean_p = np.mean(posterior_draws_p['Sigma_draws'], axis=0)
            residuals_p = Y_p - X_p @ B_mean_p

            log_lik_p = np.sum(multivariate_normal.logpdf(residuals_p, np.zeros(Y_p.shape[1]), Sigma_mean_p))
            n_params_p = B_mean_p.size
            T_p = Y_p.shape[0]

            aic = -2 * log_lik_p + 2 * n_params_p
            bic = -2 * log_lik_p + np.log(T_p) * n_params_p

            info_criteria.append({'lags': p, 'AIC': aic, 'BIC': bic})
        except Exception:
            # If estimation fails for a given lag, record as NaN.
            info_criteria.append({'lags': p, 'AIC': np.nan, 'BIC': np.nan})

    lag_selection_report: pd.DataFrame = pd.DataFrame(info_criteria).set_index('lags')

    # Run residual diagnostics on the main, full-sample model.
    residual_report: pd.DataFrame = run_model_diagnostics(bvar_setup, posterior_draws)['residual_diagnostics']

    # --- Compile Final Reports ---
    return {
        "in_sample_fit_report": in_sample_report,
        "out_of_sample_forecast_report": out_of_sample_report,
        "lag_selection_report": lag_selection_report,
        "residual_diagnostics_report": residual_report
    }


In [None]:
# Task 15: End-to-End Research Pipeline Orchestrator

# =============================================================================
# TASK 15: END-TO-END RESEARCH PIPELINE ORCHESTRATOR
# =============================================================================

def run_transatlantic_spillovers_analysis(
    equity_tick_df: pd.DataFrame,
    rate_tick_df: pd.DataFrame,
    macro_df: pd.DataFrame,
    announcement_df: pd.DataFrame,
    target_market: str,
    study_config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Executes the complete end-to-end research pipeline for the study.

    Purpose:
        This master orchestrator function serves as the single entry point to
        run the entire research project. It ensures a reproducible, robust, and
        logically sound execution of all analytical phases, from raw data
        ingestion to final model validation, encapsulating the full workflow.

    Process:
        The pipeline proceeds through a series of well-defined phases with a
        strict separation of concerns:
        1.  **Setup**: Initializes logging and a master results dictionary to
            store all artifacts from the run.
        2.  **Phase I (Data Ingestion & Cleaning)**:
            - Validates all inputs, configurations, and schemas.
            - Performs a deep data quality assessment to detect anomalies.
            - Cleanses and prepares all raw data into consistently structured
              DataFrames, with macro data prepared in its original levels.
        3.  **Phase II (Shock Identification)**:
            - Operates on the clean high-frequency data to identify structural shocks
              using the rotational decomposition method.
        4.  **Phase III (Model Preparation)**:
            - Aggregates the high-frequency shocks to a monthly frequency.
            - Applies model-specific numerical transformations (e.g., logs) to the
              clean macro data.
            - Constructs deterministic control variables (trend, dummies).
            - Assembles all components into the final, analysis-ready dataset.
        5.  **Phase IV (Econometric Estimation)**:
            - Estimates the primary BVAR model via Gibbs sampling.
            - Estimates the robust Local Projections model as a cross-check.
        6.  **Phase V (Results & Validation)**:
            - Calculates impulse response functions from the BVAR posterior.
            - Performs a full suite of in-sample and out-of-sample validation tests.
        7.  **Finalization**: Compiles all intermediate data, final results, and
            validation reports into a single, comprehensive output dictionary,
            and handles any exceptions gracefully.

    Args:
        equity_tick_df (pd.DataFrame): Raw high-frequency equity tick data.
        rate_tick_df (pd.DataFrame): Raw high-frequency interest rate tick data.
        macro_df (pd.DataFrame): Raw long-format macroeconomic time series data.
        announcement_df (pd.DataFrame): Raw central bank announcement metadata.
        target_market (str): The market to study (e.g., 'CAN').
        study_config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, Any]: A comprehensive, nested dictionary containing all inputs,
                        intermediate data products, final results, and validation
                        reports generated during the pipeline run.
    """
    # --- 1. Setup: Logging and Master Results Dictionary ---
    # Configure basic logging to provide a detailed execution trace for the user.
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    # Initialize the master dictionary to store all artifacts of the run.
    # This structure ensures a clean, organized, and auditable output.
    master_results: Dict[str, Any] = {
        "metadata": {"run_start_utc": pd.Timestamp.now(tz='UTC'), "target_market": target_market},
        "inputs": {"config": study_config}, "phase_1_cleaning": {}, "phase_2_identification": {},
        "phase_3_model_prep": {}, "phase_4_estimation": {}, "phase_5_results": {}
    }

    try:
        # Record the start time for performance monitoring.
        start_time = time.time()
        logging.info("=== STARTING END-TO-END RESEARCH PIPELINE ===")

        # --- 2. Phase I: Data Ingestion & Cleaning ---
        logging.info("--- Phase 1: Validating, Assessing, and Cleaning Data ---")

        # Task 1: Validate all inputs, schemas, and configurations. This is the first quality gate.
        validation_reports = run_phase1_task1_validation(
            equity_tick_df, rate_tick_df, macro_df, announcement_df, target_market, study_config
        )
        master_results['phase_1_cleaning']['validation_reports'] = validation_reports
        logging.info("Step 1.1: All inputs and configurations validated successfully.")

        # Task 2: Perform deep data quality assessment to detect anomalies.
        quality_reports = run_phase1_task2_quality_assessment(
            equity_tick_df, rate_tick_df, announcement_df, macro_df
        )
        master_results['phase_1_cleaning']['quality_assessment_reports'] = quality_reports
        logging.info("Step 1.2: Data quality assessment complete.")

        # Task 3: Cleanse and preprocess all data sources based on the validation and QA.
        preprocessed_data = run_phase1_task3_preprocessing(
            raw_equity_df=equity_tick_df, raw_rate_df=rate_tick_df, raw_announcement_df=announcement_df,
            raw_macro_df=macro_df, equity_anomaly_flags=quality_reports['equity_anomaly_flags'],
            rate_anomaly_flags=quality_reports['rate_anomaly_flags'], target_market=target_market
        )
        master_results['phase_1_cleaning']['cleaned_data'] = preprocessed_data
        logging.info("Step 1.3: Data cleansing and initial preparation complete.")

        # --- 3. Phase II: High-Frequency Shock Identification ---
        logging.info("--- Phase 2: Identifying Structural Shocks ---")

        # Task 4: Define event windows and extract prices from the clean tick data.
        price_extraction = run_phase2_task4_data_extraction(
            preprocessed_data['clean_announcement_df'],
            preprocessed_data['clean_equity_df'],
            preprocessed_data['clean_rate_df'],
            study_config
        )
        master_results['phase_2_identification']['price_extraction'] = price_extraction
        logging.info("Step 2.1: Event windows defined and prices extracted.")

        # Task 5: Calculate raw surprises from the extracted prices.
        surprises = run_phase2_task5_surprise_calculation(
            price_extraction['equity_price_extraction_results'],
            price_extraction['rate_price_extraction_results'],
            preprocessed_data['clean_announcement_df']
        )
        master_results['phase_2_identification']['raw_surprises'] = surprises
        logging.info("Step 2.2: Raw surprises calculated.")

        # Task 6: Perform rotational decomposition to get structural shocks.
        structural_shocks = run_phase2_task6_shock_identification(
            surprises['surprise_vectors'], study_config
        )
        master_results['phase_2_identification']['structural_shocks'] = structural_shocks
        logging.info("Step 2.3: Structural shocks identified successfully.")

        # --- 4. Phase III: Model Preparation (Refactored Logic) ---
        logging.info("--- Phase 3: Transforming and Assembling Data for Monthly Models ---")

        # Task 7: Aggregate the event-level shocks to a monthly frequency.
        monthly_data = run_phase3_task7_temporal_aggregation(
            structural_shocks, preprocessed_data['clean_announcement_df']
        )
        master_results['phase_3_model_prep']['monthly_shocks'] = monthly_data
        logging.info("Step 3.1: Shocks aggregated to monthly frequency.")

        # Task 8.1: Apply model-specific transformations to the clean macro data.
        transformation_map = {var.replace(f"{target_market}_", ""): ('log' if 'log' in var else 'level') for var in study_config['bvar_specification']['endogenous_variables']}
        transformed_macro_df = transform_core_macro_variables(
            prepared_macro_df=preprocessed_data['prepared_macro_df_levels'],
            transformation_map=transformation_map,
            target_market=target_market
        )
        master_results['phase_3_model_prep']['transformed_macro_df'] = transformed_macro_df
        logging.info("Step 3.2: Macroeconomic variables transformed.")

        # Task 8.2: Construct deterministic control variables.
        control_variables_df = construct_control_variables(
            time_index=transformed_macro_df.index,
            config=study_config
        )
        master_results['phase_3_model_prep']['control_variables_df'] = control_variables_df
        logging.info("Step 3.3: Control variables constructed.")

        # Task 8.3: Assemble the final, analysis-ready dataset.
        analysis_ready_df = assemble_final_dataset(
            transformed_macro_df=transformed_macro_df,
            monthly_shocks_df=monthly_data['monthly_shocks_df'],
            control_variables_df=control_variables_df
        )
        master_results['phase_3_model_prep']['analysis_ready_df'] = analysis_ready_df
        logging.info("Step 3.4: Final analysis-ready dataset assembled.")

        # --- 5. Phase IV: Econometric Estimation ---
        logging.info("--- Phase 4: Estimating Econometric Models ---")

        # Task 9: Set up the BVAR model matrices and priors.
        bvar_setup = run_phase4_task9_bvar_setup(analysis_ready_df, study_config)
        master_results['phase_4_estimation']['bvar_setup'] = bvar_setup
        logging.info("Step 4.1: BVAR model specified.")

        # Task 10: Run the Gibbs sampler to estimate the BVAR.
        bvar_posterior_draws = run_bvar_gibbs_sampler(bvar_setup, study_config)
        master_results['phase_4_estimation']['bvar_posterior_draws'] = bvar_posterior_draws
        logging.info("Step 4.2: BVAR estimated.")

        # Task 11: Estimate the Local Projections model for robustness.
        lp_results = run_phase4_task11_local_projections(analysis_ready_df, study_config)
        master_results['phase_4_estimation']['lp_results'] = lp_results
        logging.info("Step 4.3: Local Projections estimated.")

        # --- 6. Phase V: Results Generation and Validation ---
        logging.info("--- Phase 5: Generating Results and Validating Models ---")

        # Task 12: Calculate BVAR-based impulse responses from the posterior draws.
        bvar_irfs = calculate_var_impulse_responses(bvar_posterior_draws, bvar_setup, study_config)
        master_results['phase_5_results']['bvar_irfs'] = bvar_irfs
        logging.info("Step 5.1: BVAR impulse responses calculated.")

        # Task 14: Run the full suite of model validation checks.
        model_validation_suite = run_full_model_validation_suite(analysis_ready_df, study_config)
        master_results['phase_5_results']['model_validation_reports'] = model_validation_suite
        logging.info("Step 5.2: Model validation suite complete.")

        # --- 7. Finalization ---
        # Record the end time and total runtime.
        end_time = time.time()
        master_results['metadata']['run_end_utc'] = pd.Timestamp.now(tz='UTC')
        master_results['metadata']['total_runtime_seconds'] = round(end_time - start_time, 2)
        master_results['metadata']['final_status'] = "SUCCESS"
        logging.info(f"=== PIPELINE COMPLETED SUCCESSFULLY in {master_results['metadata']['total_runtime_seconds']} seconds. ===")

    except (Exception) as e:
        # A top-level exception handler to ensure the pipeline fails gracefully
        # and provides a clear error message in the final results object.
        end_time = time.time()
        logging.error(f"!!! PIPELINE FAILED: {type(e).__name__} - {e}", exc_info=True)
        master_results['metadata']['run_end_utc'] = pd.Timestamp.now(tz='UTC')
        master_results['metadata']['total_runtime_seconds'] = round(end_time - start_time, 2)
        master_results['metadata']['final_status'] = "FAIL"
        master_results['metadata']['error_message'] = str(e)

    # Return the final, comprehensive results object.
    return master_results


In [None]:
# Task 16: Identification Robustness Analysis

# =============================================================================
# TASK 16, STEP 1: POOR MAN'S SIGN RESTRICTION IMPLEMENTATION
# =============================================================================

def identify_shocks_pmsr(
    raw_surprises_df: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Identifies structural shocks using the Poor Man's Sign Restriction (PMSR).

    Purpose:
        This function provides an alternative, simpler identification scheme for
        robustness checks. It classifies each announcement as either a "monetary
        policy" or "information" event based on the sign of the correlation
        between the raw interest rate and equity surprises.

    Process:
        1.  **Group by Event**: The function operates on each announcement event individually.
        2.  **Check Correlation Sign**: For each event, it checks if the product
            of the rate and equity surprise is negative (`s_rate * s_equity < 0`).
            This is a proxy for the required negative co-movement of a pure
            monetary policy shock.
        3.  **Assign Shocks**:
            - If the co-movement is negative, the event is classified as a
              monetary policy shock. The `shock_MP` is set to the raw rate
              surprise, and `shock_INFO` is set to zero.
            - If the co-movement is non-negative, the event is classified as an
              information shock. The `shock_MP` is set to zero, and `shock_INFO`
              is set to the raw rate surprise.
        4.  **Normalize**: The resulting shock series for each central bank are
            grouped and normalized to have a unit standard deviation, ensuring
            comparability with the benchmark rotational decomposition shocks.

    Args:
        raw_surprises_df (pd.DataFrame): A DataFrame containing the raw surprises,
            indexed by 'event_id', with columns 'rate_surprise_bps',
            'equity_surprise_pct', and 'central_bank'.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary where keys are central bank names
        and values are DataFrames containing the identified and normalized
        structural shocks, indexed by 'event_id'.
    """
    # --- Input Validation ---
    if not all(col in raw_surprises_df.columns for col in ['rate_surprise_bps', 'equity_surprise_pct', 'central_bank']):
        raise ValueError("Input DataFrame is missing required columns.")

    # --- 1. & 2. Classify Events based on Correlation Sign ---
    # A negative product of surprises indicates negative co-movement.
    is_mp_shock = (raw_surprises_df['rate_surprise_bps'] * raw_surprises_df['equity_surprise_pct']) < 0

    # --- 3. Assign Shocks ---
    # Create columns for the raw structural shocks.
    shocks_df = pd.DataFrame(index=raw_surprises_df.index)

    # Assign the rate surprise to the MP shock column if it's an MP event, else 0.
    shocks_df['shock_MP'] = raw_surprises_df['rate_surprise_bps'].where(is_mp_shock, 0.0)

    # Assign the rate surprise to the INFO shock column if it's an INFO event, else 0.
    shocks_df['shock_INFO'] = raw_surprises_df['rate_surprise_bps'].where(~is_mp_shock, 0.0)

    # Add the central bank column for grouping.
    shocks_df['central_bank'] = raw_surprises_df['central_bank']

    # --- 4. Normalize Shocks per Central Bank ---
    # Calculate the standard deviation for each shock type within each central bank group.
    # Use .transform() to broadcast the std dev back to the original shape for division.
    std_devs = shocks_df.groupby('central_bank')[['shock_MP', 'shock_INFO']].transform('std')

    # Avoid division by zero if a shock series has zero variance.
    std_devs[std_devs < 1e-9] = 1.0

    # Normalize the shock series.
    shocks_df['shock_MP'] /= std_devs['shock_MP']
    shocks_df['shock_INFO'] /= std_devs['shock_INFO']

    # --- 5. Structure Output ---
    # Split the final DataFrame into a dictionary keyed by central bank.
    pmsr_results = {}
    for bank, group in shocks_df.groupby('central_bank'):
        # The output format must match the benchmark identification function's output.
        pmsr_results[bank] = group[['shock_MP', 'shock_INFO']].rename(
            columns={'shock_MP': f'shock_{bank}_MP', 'shock_INFO': f'shock_{bank}_INFO'}
        )

    return pmsr_results


# =============================================================================
# REFACTORED CORE PIPELINE LOGIC
# =============================================================================

def _run_core_pipeline(
    structural_shocks: Dict[str, pd.DataFrame],
    preprocessed_data: Dict[str, Any],
    analysis_ready_df_no_shocks: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Internal function that runs the main analysis pipeline from shock aggregation onwards.
    This function is designed to be called by different orchestrators.
    """
    # This dictionary will store the results of this specific pipeline run.
    results = {}

    # --- Phase III (cont.): Model Preparation ---
    logging.info("--- (Core Pipeline) Phase 3: Aggregating Data for Monthly Models ---")
    monthly_data = run_phase3_task7_temporal_aggregation(
        structural_shocks, preprocessed_data['clean_announcement_df']
    )
    results['monthly_shocks'] = monthly_data

    # The original `analysis_ready_df` is passed in without shocks, so we join them here.
    analysis_ready_df = analysis_ready_df_no_shocks.join(monthly_data['monthly_shocks_df'], how='inner')
    results['analysis_ready_df'] = analysis_ready_df
    logging.info("--- (Core Pipeline) Final analysis-ready dataset assembled. ---")

    # --- Phase IV: Econometric Estimation ---
    logging.info("--- (Core Pipeline) Phase 4: Estimating Econometric Models ---")
    bvar_setup = run_phase4_task9_bvar_setup(analysis_ready_df, config)
    results['bvar_setup'] = bvar_setup
    bvar_posterior_draws = run_bvar_gibbs_sampler(bvar_setup, config)
    results['bvar_posterior_draws'] = bvar_posterior_draws

    lp_results = run_phase4_task11_local_projections(analysis_ready_df, config)
    results['lp_results'] = lp_results
    logging.info("--- (Core Pipeline) Models estimated. ---")

    # --- Phase V: Results Generation and Validation ---
    logging.info("--- (Core Pipeline) Phase 5: Generating Results and Validating Models ---")
    bvar_irfs = calculate_var_impulse_responses(bvar_posterior_draws, bvar_setup, config)
    results['bvar_irfs'] = bvar_irfs

    model_validation_suite = run_full_model_validation_suite(analysis_ready_df, config)
    results['model_validation_reports'] = model_validation_suite
    logging.info("--- (Core Pipeline) Results generated and validated. ---")

    return results


# =============================================================================
# ORCHESTRATOR FOR THE ROBUSTNESS CHECK
# =============================================================================

def run_pmsr_robustness_analysis(
    master_results_benchmark: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Runs the full analysis pipeline using the PMSR identification method.

    Purpose:
        This function serves as an orchestrator for the "Poor Man's Sign
        Restriction" robustness check. It takes the results of a benchmark
        run, extracts the necessary early-stage data, re-identifies shocks
        using the PMSR method, and then runs the entire downstream analysis
        pipeline on these new shocks for comparison.

    Process:
        1.  **Extract Data**: It retrieves the raw surprises and other essential
            preprocessed data from a completed benchmark run's results object.
        2.  **PMSR Identification**: It calls the `identify_shocks_pmsr` function
            to generate the alternative set of structural shocks.
        3.  **Execute Core Pipeline**: It calls the internal `_run_core_pipeline`
            function, passing in the new PMSR shocks. This ensures that the
            exact same aggregation, estimation, and validation logic is applied
            as in the benchmark run.
        4.  **Return Results**: It returns a new, complete results dictionary
            for the PMSR robustness check.

    Args:
        master_results_benchmark (Dict[str, Any]): The full results dictionary
            from a completed run of the main `run_transatlantic_spillovers_analysis`
            pipeline.

    Returns:
        Dict[str, Any]: A comprehensive results dictionary for the PMSR analysis.
    """
    logging.info("=== STARTING PMSR ROBUSTNESS ANALYSIS PIPELINE ===")

    # --- 1. Extract necessary data from the benchmark run ---
    # This avoids re-running all the initial data cleaning steps.
    try:
        # Combine raw surprises into the required DataFrame format.
        raw_surprises_df = pd.concat([
            master_results_benchmark['phase_2_identification']['raw_surprises']['intermediate_results']['rate_surprises_raw'],
            master_results_benchmark['phase_2_identification']['raw_surprises']['intermediate_results']['equity_surprises_raw']
        ], axis=1).dropna()

        # Add the central bank column for grouping.
        ann_df = master_results_benchmark['phase_1_cleaning']['cleaned_data']['clean_announcement_df']
        raw_surprises_df = raw_surprises_df.join(ann_df.set_index('event_id')['central_bank'])

        preprocessed_data = master_results_benchmark['phase_1_cleaning']['cleaned_data']

        # We need the analysis-ready data *before* the shocks were added.
        analysis_ready_df_no_shocks = master_results_benchmark['phase_3_model_prep']['analysis_ready_df'].drop(
            columns=[c for c in master_results_benchmark['phase_3_model_prep']['analysis_ready_df'].columns if 'shock' in c]
        )

        config = master_results_benchmark['inputs']['config']
    except KeyError as e:
        raise ValueError(f"The benchmark results object is missing a required key: {e}")

    # --- 2. PMSR Identification ---
    # Generate the alternative structural shocks using the PMSR method.
    logging.info("--- (PMSR) Identifying shocks using Poor Man's Sign Restriction... ---")
    pmsr_shocks = identify_shocks_pmsr(raw_surprises_df)

    # --- 3. Execute Core Pipeline with PMSR shocks ---
    # Call the refactored core logic with the new shocks.
    pmsr_pipeline_results = _run_core_pipeline(
        structural_shocks=pmsr_shocks,
        preprocessed_data=preprocessed_data,
        analysis_ready_df_no_shocks=analysis_ready_df_no_shocks,
        config=config
    )

    # --- 4. Compile and Return Final Results ---
    # Add the identification-specific results to the main dictionary.
    pmsr_pipeline_results['pmsr_identified_shocks'] = pmsr_shocks
    logging.info("=== PMSR ROBUSTNESS ANALYSIS COMPLETED SUCCESSFULLY ===")

    return pmsr_pipeline_results


# =============================================================================
# TASK 16, STEP 2: ROTATIONAL UNCERTAINTY ANALYSIS
# =============================================================================

def _run_pipeline_for_single_angle(
    angle_draw: float,
    bank_name: str,
    benchmark_results: Dict[str, Any]
) -> np.ndarray:
    """
    Worker function to run the analysis pipeline for one sampled rotation angle.

    This function is designed to be called in parallel. It takes a single angle,
    generates the corresponding structural shocks, and runs the entire downstream
    BVAR estimation and IRF calculation, returning all posterior draws of the IRFs.

    Args:
        angle_draw (float): A single rotation angle sampled from the admissible set.
        bank_name (str): The name of the central bank ('FED' or 'ECB').
        benchmark_results (Dict[str, Any]): The full results object from the
            initial benchmark run, used to access necessary data and configs.

    Returns:
        np.ndarray: A (num_mcmc_draws, H+1, K, S) array of all IRF posterior
                    draws corresponding to this single identification angle.
    """
    # --- 1. Extract necessary data from the benchmark run ---
    # This avoids passing large DataFrames to each parallel worker.
    config = benchmark_results['inputs']['config']
    raw_surprises_df = pd.concat([
        benchmark_results['phase_2_identification']['raw_surprises']['intermediate_results']['rate_surprises_raw'],
        benchmark_results['phase_2_identification']['raw_surprises']['intermediate_results']['equity_surprises_raw']
    ], axis=1).dropna()
    preprocessed_data = benchmark_results['phase_1_cleaning']['cleaned_data']
    analysis_ready_df_no_shocks = benchmark_results['phase_3_model_prep']['analysis_ready_df'].drop(
        columns=[c for c in benchmark_results['phase_3_model_prep']['analysis_ready_df'].columns if 'shock' in c]
    )

    # --- 2. Generate Shocks for this Specific Angle ---
    # We need to manually apply the rotation, normalization, and sign convention.
    q_matrix = np.array([
        [np.cos(angle_draw), -np.sin(angle_draw)],
        [np.sin(angle_draw), np.cos(angle_draw)]
    ])

    surprises_array = raw_surprises_df.loc[raw_surprises_df['central_bank'] == bank_name, ['rate_surprise_bps', 'equity_surprise_pct']].values
    raw_shocks = (q_matrix @ surprises_array.T).T

    # Enforce sign convention
    if np.corrcoef(raw_shocks[:, 0], surprises_array[:, 0])[0, 1] < 0:
        raw_shocks *= -1

    # Normalize
    std_devs = raw_shocks.std(axis=0)
    safe_std_devs = np.where(std_devs > 1e-9, std_devs, 1.0)
    normalized_shocks = raw_shocks / safe_std_devs

    # Create the shock DataFrame in the required format (indexed by event_id).
    event_ids = raw_surprises_df[raw_surprises_df['central_bank'] == bank_name].index
    shock_df = pd.DataFrame(
        normalized_shocks,
        index=event_ids,
        columns=[f'shock_{bank_name}_MP', f'shock_{bank_name}_INFO']
    )

    # The input to the core pipeline is a dictionary.
    # We assume the other bank's shocks are taken from the benchmark median run for simplicity.
    other_bank = 'ECB' if bank_name == 'FED' else 'FED'
    structural_shocks_for_run = {
        bank_name: shock_df,
        other_bank: benchmark_results['phase_2_identification']['structural_shocks'][other_bank]
    }

    # --- 3. Run the Core Pipeline ---
    # This re-estimates the BVAR and calculates IRFs for this specific shock series.
    pipeline_results = _run_core_pipeline(
        structural_shocks=structural_shocks_for_run,
        preprocessed_data=preprocessed_data,
        analysis_ready_df_no_shocks=analysis_ready_df_no_shocks,
        config=config
    )

    # --- 4. Return All IRF Draws ---
    # We need the full posterior distribution of IRFs for this angle.
    return pipeline_results['bvar_irfs']['all_irf_draws']


def run_rotational_uncertainty_analysis(
    benchmark_results: Dict[str, Any],
    num_angle_draws: int = 999,
    n_jobs: int = -1
) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Performs a robustness check on identification uncertainty.

    Purpose:
        This function quantifies the uncertainty in the final impulse responses
        that arises from the choice of a specific rotation angle for identification.
        It integrates over the set of all plausible identification schemes to
        produce credible intervals that account for both identification and
        parameter estimation uncertainty.

    Process:
        1.  **Extract Admissible Angles**: It retrieves the set of all admissible
            rotation angles that were identified in the benchmark model run.
        2.  **Sample Angles**: It draws a large number of angles (e.g., 999)
            uniformly with replacement from this admissible set.
        3.  **Parallel Re-estimation**: For each sampled angle, it runs the
            entire downstream analysis pipeline (shock generation, aggregation,
            BVAR estimation, IRF calculation) in parallel using `joblib`. Each
            parallel job returns the full set of posterior draws of the IRFs
            for its specific angle.
        4.  **Aggregate All Draws**: It collects the IRF draws from all parallel
            runs into a single, massive array. This array now represents the
            joint distribution of IRFs over both parameter uncertainty and
            identification uncertainty.
        5.  **Summarize Total Uncertainty**: It calculates the median and
            percentiles (e.g., 5th and 95th) of this combined set of draws to
            produce the final point estimates and credible intervals that
            incorporate total uncertainty.

    Args:
        benchmark_results (Dict[str, Any]): The full results dictionary from a
            completed run of the main pipeline.
        num_angle_draws (int): The number of rotation angles to sample for the
                               uncertainty analysis.
        n_jobs (int): The number of CPU/GPU cores to use for parallelization.
                      -1 means use all available cores.

    Returns:
        Dict[str, Dict[str, np.ndarray]]: A dictionary where keys are central
        bank names. Each inner dictionary contains the summary of the total
        uncertainty IRFs ('irf_median', 'irf_lower', 'irf_upper').
    """
    logging.info("=== STARTING ROTATIONAL UNCERTAINTY ANALYSIS PIPELINE ===")

    # --- 1. & 2. Extract and Sample Admissible Angles ---
    final_results = {}
    config = benchmark_results['inputs']['config']
    ci_level = config['computation_settings']['impulse_response_functions']['credible_interval_level']

    for bank in ['FED', 'ECB']:
        logging.info(f"--- Processing uncertainty for {bank} ---")
        admissible_angles = benchmark_results['phase_2_identification']['structural_shocks'][bank]['admissible_angles']

        if len(admissible_angles) == 0:
            logging.warning(f"No admissible angles found for {bank}. Skipping uncertainty analysis.")
            continue

        # Set seed for reproducible sampling of angles.
        np.random.seed(config['computation_settings']['mcmc_sampler']['random_seed'])
        angle_samples = np.random.choice(admissible_angles, size=num_angle_draws, replace=True)

        # --- 3. Parallel Re-estimation ---
        # Use joblib to run the pipeline for each angle in parallel.
        all_irf_draws_list = Parallel(n_jobs=n_jobs)(
            delayed(_run_pipeline_for_single_angle)(
                angle, bank, benchmark_results
            ) for angle in tqdm(angle_samples, desc=f"Simulating IRFs for {bank}")
        )

        # --- 4. Aggregate All Draws ---
        # Concatenate the list of (num_mcmc, H, K, S) arrays into one large array.
        # Shape: (num_angle_draws * num_mcmc_draws, H, K, S)
        total_irf_draws = np.concatenate(all_irf_draws_list, axis=0)

        # --- 5. Summarize Total Uncertainty ---
        lower_p = (1 - ci_level) / 2 * 100
        upper_p = (1 - (1 - ci_level) / 2) * 100

        # Calculate median and percentiles across the combined 'draws' axis (axis=0).
        irf_median = np.median(total_irf_draws, axis=0)
        irf_lower = np.percentile(total_irf_draws, lower_p, axis=0)
        irf_upper = np.percentile(total_irf_draws, upper_p, axis=0)

        # Reshape for standard output format (K, S, H+1)
        final_shape_order = (1, 2, 0)
        final_results[bank] = {
            'irf_median': irf_median.transpose(final_shape_order),
            'irf_lower': irf_lower.transpose(final_shape_order),
            'irf_upper': irf_upper.transpose(final_shape_order)
        }
        logging.info(f"--- Uncertainty analysis for {bank} complete. ---")

    logging.info("=== ROTATIONAL UNCERTAINTY ANALYSIS COMPLETED SUCCESSFULLY ===")
    return final_results



# =============================================================================
# TASK 16, STEP 3: ALTERNATIVE SAMPLE PERIOD ANALYSIS
# =============================================================================

def run_alternative_sample_analysis(
    raw_equity_df: pd.DataFrame,
    raw_rate_df: pd.DataFrame,
    raw_macro_df: pd.DataFrame,
    raw_announcement_df: pd.DataFrame,
    target_market: str,
    config: Dict[str, Any],
    sample_definitions: Dict[str, Dict[str, str]]
) -> Dict[str, Dict[str, Any]]:
    """
    Performs a robustness check by re-running the entire analysis on alternative sample periods.

    Purpose:
        This function assesses the stability of the model's findings over time
        by systematically re-estimating the entire pipeline on different sub-samples
        of the data. This is a critical test for parameter constancy and helps
        determine if the results are driven by specific economic regimes (e.g.,
        the post-2008 crisis period or the COVID-19 era).

    Process:
        1.  **Iterate Through Samples**: The function loops through a dictionary
            of sample period definitions. Each definition provides a name (e.g.,
            'pre_covid') and an end date.
        2.  **Truncate All Data**: For each defined sample period, it creates
            a complete, new set of input data by filtering all four raw DataFrames
            (tick data, announcements, macro data) to include only observations
            up to the specified end date. This is a critical step to ensure the
            entire analysis, including the shock identification, is performed
            consistently on the truncated sample.
        3.  **Execute Full Pipeline**: It calls the main master orchestrator,
            `run_transatlantic_spillovers_analysis`, passing in the truncated
            set of data. This re-runs the entire end-to-end analysis from
            scratch for the sub-sample.
        4.  **Store Results**: The complete results object returned by the master
            orchestrator for each sub-sample run is stored in a dictionary,
            keyed by the sample period's name.

    Args:
        raw_equity_df (pd.DataFrame): The full raw high-frequency equity data.
        raw_rate_df (pd.DataFrame): The full raw high-frequency rate data.
        raw_macro_df (pd.DataFrame): The full raw macroeconomic time series data.
        raw_announcement_df (pd.DataFrame): The full raw announcement metadata.
        target_market (str): The market to study (e.g., 'CAN').
        config (Dict[str, Any]): The main study configuration dictionary.
        sample_definitions (Dict[str, Dict[str, str]]): A dictionary defining
            the sample periods to analyze. Example:
            {'pre_covid': {'end_date': '2020-02-29'},
             'pre_gfc': {'end_date': '2007-08-31'}}

    Returns:
        Dict[str, Dict[str, Any]]: A dictionary where keys are the names of the
        sample periods. Each value is the complete master results object from
        running the full pipeline on that data sub-sample.
    """
    # --- Initialization ---
    # This dictionary will store the full results object for each sample run.
    all_sample_results: Dict[str, Dict[str, Any]] = {}

    # --- Main Loop: Iterate through each defined sample period ---
    for sample_name, definition in sample_definitions.items():
        logging.info(f"=== STARTING ANALYSIS FOR SAMPLE PERIOD: '{sample_name}' (ends {definition['end_date']}) ===")

        # --- 1. Truncate All Raw Data Sources ---
        # Convert the end date string to a timestamp for comparison.
        end_date = pd.to_datetime(definition['end_date'])

        # Filter the high-frequency equity data.
        equity_df_sample = raw_equity_df[raw_equity_df['timestamp_micros_utc'].dt.date <= end_date.date()].copy()

        # Filter the high-frequency rate data.
        rate_df_sample = raw_rate_df[raw_rate_df['timestamp_micros_utc'].dt.date <= end_date.date()].copy()

        # Filter the announcement metadata.
        announcement_df_sample = raw_announcement_df[raw_announcement_df['announcement_date_local'] <= end_date].copy()

        # Filter the macroeconomic data.
        macro_df_sample = raw_macro_df[raw_macro_df['date'] <= end_date].copy()

        # --- 2. Execute the Full End-to-End Pipeline on the Truncated Data ---
        # A deep copy of the config is used to prevent any potential side effects
        # if the pipeline were to modify it in place (which it shouldn't).
        run_config = copy.deepcopy(config)

        # Call the main master orchestrator with the filtered datasets.
        sample_result_object = run_transatlantic_spillovers_analysis(
            equity_tick_df=equity_df_sample,
            rate_tick_df=rate_df_sample,
            macro_df=macro_df_sample,
            announcement_df=announcement_df_sample,
            target_market=target_market,
            study_config=run_config
        )

        # --- 3. Store the Results ---
        # Store the entire results object for this sample period.
        all_sample_results[sample_name] = sample_result_object

        logging.info(f"=== COMPLETED ANALYSIS FOR SAMPLE PERIOD: '{sample_name}' ===")

    return all_sample_results


# =============================================================================
# TASK 16: IDENTIFICATION ROBUSTNESS ANALYSIS ORCHESTRATOR
# =============================================================================

def run_identification_robustness_suite(
    benchmark_results: Dict[str, Any],
    raw_data: Dict[str, pd.DataFrame]
) -> Dict[str, Any]:
    """
    Orchestrates a full suite of robustness checks for the shock identification.

    Purpose:
        This master orchestrator for Task 16 executes a series of analyses to
        test the robustness of the main findings to alternative identification
        schemes and sample periods. It serves as a comprehensive validation of
        the core econometric identification strategy.

    Process:
        The function sequentially executes the three main robustness checks,
        using the results and data from the initial benchmark run as a starting point.
        1.  **Poor Man's Sign Restriction (PMSR) Analysis**:
            - It calls `run_pmsr_robustness_analysis`, which re-identifies shocks
              using the simpler PMSR rule and re-runs the entire downstream
              pipeline for comparison.
        2.  **Rotational Uncertainty Analysis**:
            - It calls `run_rotational_uncertainty_analysis`, which samples from
              the set of all admissible rotation angles identified in the
              benchmark run.
            - For each sampled angle, it re-runs the pipeline in parallel to
              compute credible intervals that incorporate both estimation and
              identification uncertainty.
        3.  **Alternative Sample Period Analysis**:
            - It calls `run_alternative_sample_analysis`, which systematically
              truncates the original raw datasets to different time periods
              (e.g., pre-GFC, pre-COVID) and re-runs the entire end-to-end
              pipeline for each sub-sample.

    Args:
        benchmark_results (Dict[str, Any]): The complete, comprehensive results
            object from a successful run of the main pipeline orchestrator
            (`run_transatlantic_spillovers_analysis`). This object contains all
            necessary intermediate data products (like raw surprises and
            admissible angles).
        raw_data (Dict[str, pd.DataFrame]): A dictionary containing the original,
            unfiltered raw DataFrames ('equity_tick_df', 'rate_tick_df',
            'macro_df', 'announcement_df'). This is required for the sample
            period analysis.

    Returns:
        Dict[str, Any]: A dictionary containing the detailed results from each
                        of the three robustness analyses.
    """
    # --- 0. Initialization ---
    # Initialize a dictionary to store the results of all robustness checks.
    robustness_suite_results: Dict[str, Any] = {}

    # Extract the main configuration from the benchmark results for reuse.
    config = benchmark_results['inputs']['config']
    target_market = benchmark_results['metadata']['target_market']

    logging.info("====== STARTING IDENTIFICATION ROBUSTNESS SUITE ======")

    # --- 1. Step 1: Poor Man's Sign Restriction (PMSR) Analysis ---
    # This check tests sensitivity to a simpler, alternative identification rule.
    logging.info("--- Task 16.1: Running Poor Man's Sign Restriction (PMSR) Analysis ---")
    try:
        # Execute the PMSR-specific orchestrator.
        pmsr_results = run_pmsr_robustness_analysis(
            master_results_benchmark=benchmark_results
        )
        # Store the complete results object from this run.
        robustness_suite_results['pmsr_analysis'] = pmsr_results
        logging.info("--- PMSR Analysis completed successfully. ---")
    except Exception as e:
        # Log any failure in this specific check but allow the suite to continue.
        logging.error(f"!!! PMSR Analysis failed: {e}", exc_info=True)
        robustness_suite_results['pmsr_analysis'] = {"status": "FAIL", "error": str(e)}

    # --- 2. Step 2: Rotational Uncertainty Analysis ---
    # This check quantifies the uncertainty arising from the choice of rotation angle.
    logging.info("--- Task 16.2: Running Rotational Uncertainty Analysis ---")
    try:
        # Execute the rotational uncertainty orchestrator.
        rotational_uncertainty_results = run_rotational_uncertainty_analysis(
            benchmark_results=benchmark_results,
            # These parameters could be moved to the config file for more flexibility.
            num_angle_draws=config['computation_settings']['robustness_checks']['identification_uncertainty_draws'],
            n_jobs=-1 # Use all available cores.
        )
        # Store the results (the new, wider credible bands).
        robustness_suite_results['rotational_uncertainty_analysis'] = rotational_uncertainty_results
        logging.info("--- Rotational Uncertainty Analysis completed successfully. ---")
    except Exception as e:
        logging.error(f"!!! Rotational Uncertainty Analysis failed: {e}", exc_info=True)
        robustness_suite_results['rotational_uncertainty_analysis'] = {"status": "FAIL", "error": str(e)}

    # --- 3. Step 3: Alternative Sample Period Analysis ---
    # This check tests the stability of the results over different time periods.
    logging.info("--- Task 16.3: Running Alternative Sample Period Analysis ---")
    try:
        # Define the sample periods for the analysis.
        sample_definitions = {
            'pre_covid': {'end_date': config['computation_settings']['robustness_checks']['truncated_sample_end_date']},
            'pre_gfc': {'end_date': '2007-08-31'} # A standard pre-GFC cutoff
        }

        # Execute the sample period analysis orchestrator.
        sample_period_results = run_alternative_sample_analysis(
            raw_equity_df=raw_data['equity_tick_df'],
            raw_rate_df=raw_data['rate_tick_df'],
            raw_macro_df=raw_data['macro_df'],
            raw_announcement_df=raw_data['announcement_df'],
            target_market=target_market,
            config=config,
            sample_definitions=sample_definitions
        )
        # Store the results, which will be a dictionary of full result objects.
        robustness_suite_results['sample_period_analysis'] = sample_period_results
        logging.info("--- Alternative Sample Period Analysis completed successfully. ---")
    except Exception as e:
        logging.error(f"!!! Alternative Sample Period Analysis failed: {e}", exc_info=True)
        robustness_suite_results['sample_period_analysis'] = {"status": "FAIL", "error": str(e)}

    logging.info("====== IDENTIFICATION ROBUSTNESS SUITE COMPLETE ======")

    # Return the dictionary containing all robustness check results.
    return robustness_suite_results


In [None]:
# Task 17: Estimation Method Robustness

# =============================================================================
# TASK 17, STEP 1: PRIOR SENSITIVITY ANALYSIS
# =============================================================================

def run_prior_sensitivity_analysis(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any],
    prior_scenarios: Dict[str, Dict[str, float]]
) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Performs a sensitivity analysis on the BVAR model's prior hyperparameters.

    Purpose:
        This function systematically assesses the robustness of the main impulse
        response findings to changes in the BVAR prior specification. It re-
        estimates the BVAR model under various alternative hyperparameter
        settings and stores the resulting IRFs for comparison.

    Process:
        1.  **Iterate Through Scenarios**: The function loops through a dictionary
            of pre-defined prior scenarios. Each scenario has a descriptive name
            (e.g., 'tighter_prior') and a dictionary of hyperparameter values
            to override in the main configuration.
        2.  **Modify Configuration**: For each scenario, it creates a deep copy
            of the main study configuration to ensure that runs are independent
            and do not have side effects. It then updates the relevant prior
            hyperparameters in this temporary configuration.
        3.  **Execute BVAR Pipeline**: It executes the core BVAR estimation and
            impulse response calculation pipeline (Tasks 9, 10, and 12) using
            the modified configuration for the current scenario.
        4.  **Store Results**: The complete impulse response function object
            (containing the median, lower, and upper credible bands) for each
            scenario is stored in a master dictionary, keyed by the scenario name.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset ready for
                                          modeling.
        config (Dict[str, Any]): The main study configuration dictionary, which
                                 serves as the baseline.
        prior_scenarios (Dict[str, Dict[str, float]]): A dictionary defining
            the sensitivity analyses to run. Keys are descriptive scenario names.
            Values are dictionaries of hyperparameter names and their alternative
            values to be tested. Example:
            {'tighter_prior': {'lambda_1_overall_tightness': 0.05}}

    Returns:
        Dict[str, Dict[str, np.ndarray]]: A dictionary where keys are the
        scenario names. Each value is the full impulse response results
        dictionary ('irf_median', 'irf_lower', etc.) from the BVAR run under
        that prior specification.
    """
    # --- 0. Initialization ---
    # This dictionary will store the final IRF results for all scenarios.
    all_sensitivity_results: Dict[str, Dict[str, np.ndarray]] = {}

    logging.info("====== STARTING BVAR PRIOR SENSITIVITY ANALYSIS ======")

    # --- 1. Iterate Through Scenarios ---
    for scenario_name, overrides in prior_scenarios.items():
        logging.info(f"--- Running sensitivity analysis for scenario: '{scenario_name}' ---")

        # --- 2. Modify Configuration Safely ---
        # Create a deep copy of the original configuration to avoid modifying it in place.
        # This is critical for ensuring each run is independent.
        scenario_config = copy.deepcopy(config)

        # Update the hyperparameters in the copied configuration.
        for param_name, value in overrides.items():
            # This assumes hyperparameters are in a specific nested location.
            if param_name in scenario_config['bvar_priors']['hyperparameters']:
                scenario_config['bvar_priors']['hyperparameters'][param_name] = value
                logging.info(f"    Overriding '{param_name}' with value: {value}")
            else:
                # Raise an error if the parameter to override is not found.
                raise KeyError(f"Hyperparameter '{param_name}' not found in configuration.")

        # --- 3. Execute BVAR Pipeline for the Scenario ---
        try:
            # Task 9: Set up the BVAR with the modified prior configuration.
            bvar_setup = run_phase4_task9_bvar_setup(
                analysis_ready_df=analysis_ready_df,
                config=scenario_config
            )

            # Task 10: Run the Gibbs sampler.
            posterior_draws = run_bvar_gibbs_sampler(
                bvar_setup=bvar_setup,
                config=scenario_config
            )

            # Task 12: Calculate and summarize impulse responses.
            irf_results = calculate_var_impulse_responses(
                posterior_draws=posterior_draws,
                bvar_setup=bvar_setup,
                config=scenario_config
            )

            # --- 4. Store Results ---
            # Store the final IRF object for this scenario.
            all_sensitivity_results[scenario_name] = irf_results
            logging.info(f"--- Scenario '{scenario_name}' completed successfully. ---")

        except Exception as e:
            # If any step of the pipeline fails for a given scenario, log the
            # error and continue to the next scenario.
            logging.error(f"!!! Scenario '{scenario_name}' failed: {e}", exc_info=True)
            all_sensitivity_results[scenario_name] = {"status": "FAIL", "error": str(e)}

    logging.info("====== BVAR PRIOR SENSITIVITY ANALYSIS COMPLETE ======")

    return all_sensitivity_results


# =============================================================================
# TASK 17, STEP 2: LAG LENGTH ROBUSTNESS
# =============================================================================

def run_lag_length_robustness_analysis(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any],
    lags_to_test: List[int]
) -> Dict[str, Any]:
    """
    Performs a robustness check on the BVAR model's lag length specification.

    Purpose:
        This function systematically evaluates the BVAR model's performance and
        resulting impulse responses across a range of different lag lengths.
        It provides formal information criteria to guide the selection of an
        optimal lag length and allows for a qualitative comparison of how the
        model's dynamic implications change with the specification.

    Process:
        1.  **Iterate Through Lag Lengths**: The function loops through a list
            of specified lag lengths to be tested (e.g., [2, 4, 5]).
        2.  **Modify Configuration**: For each lag length `p`, it creates a deep
            copy of the main study configuration and updates the `lags` parameter
            within the `bvar_specification`.
        3.  **Execute BVAR Pipeline**: It executes the core BVAR estimation and
            impulse response calculation pipeline (Tasks 9, 10, and 12) using
            the modified configuration. This is critical, as changing the lag
            length alters the regressor matrix, the prior structure, and the
            effective sample size.
        4.  **Calculate Information Criteria**: After each estimation, it
            calculates the Akaike (AIC), Bayesian (BIC), and Hannan-Quinn (HQ)
            information criteria using the posterior mean of the parameters.
            These metrics provide a formal basis for comparing model fit versus complexity.
        5.  **Store Results**: It stores both the calculated information criteria
            and the full impulse response function object for each tested lag
            length in a master results dictionary.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset ready for
                                          modeling.
        config (Dict[str, Any]): The main study configuration dictionary, which
                                 serves as the baseline.
        lags_to_test (List[int]): A list of integer lag lengths to test.

    Returns:
        Dict[str, Any]: A dictionary containing two main items:
        - 'information_criteria_report': A DataFrame summarizing AIC, BIC, and
          HQ for each tested lag length.
        - 'irf_results_by_lag': A dictionary where keys are lag lengths (e.g.,
          'lag_2') and values are the full IRF results for that model.
    """
    # --- 0. Initialization ---
    # Dictionaries to store the final results.
    info_criteria_results: List[Dict[str, Any]] = []
    irf_results_by_lag: Dict[str, Dict[str, np.ndarray]] = {}

    logging.info("====== STARTING BVAR LAG LENGTH ROBUSTNESS ANALYSIS ======")

    # --- 1. Iterate Through Lag Lengths ---
    for p in lags_to_test:
        logging.info(f"--- Running analysis for lag length p = {p} ---")

        # --- 2. Modify Configuration Safely ---
        # Create a deep copy of the configuration for this specific run.
        scenario_config = copy.deepcopy(config)
        scenario_config['bvar_specification']['lags'] = p

        try:
            # --- 3. Execute BVAR Pipeline for the Scenario ---
            # Task 9: Set up the BVAR with the new lag length.
            bvar_setup = run_phase4_task9_bvar_setup(
                analysis_ready_df=analysis_ready_df,
                config=scenario_config
            )

            # Task 10: Run the Gibbs sampler.
            posterior_draws = run_bvar_gibbs_sampler(
                bvar_setup=bvar_setup,
                config=scenario_config
            )

            # Task 12: Calculate and summarize impulse responses.
            irf_results = calculate_var_impulse_responses(
                posterior_draws=posterior_draws,
                bvar_setup=bvar_setup,
                config=scenario_config
            )

            # Store the IRF results for this lag length.
            irf_results_by_lag[f'lag_{p}'] = irf_results

            # --- 4. Calculate Information Criteria ---
            # Use the posterior mean of the parameters for the calculation.
            Y, X = bvar_setup['Y'], bvar_setup['X']
            B_mean = np.mean(posterior_draws['B_draws'], axis=0)
            Sigma_mean = np.mean(posterior_draws['Sigma_draws'], axis=0)

            # Calculate residuals at the posterior mean.
            residuals = Y - X @ B_mean

            # Calculate the log-likelihood at the posterior mean.
            log_lik = np.sum(multivariate_normal.logpdf(residuals, mean=np.zeros(Y.shape[1]), cov=Sigma_mean))

            # Get the number of parameters (k) and observations (T) for this specific model.
            k = B_mean.size
            T = Y.shape[0]

            # AIC = -2*ln(L) + 2*k
            aic = -2 * log_lik + 2 * k
            # BIC = -2*ln(L) + k*ln(T)
            bic = -2 * log_lik + k * np.log(T)
            # HQ = -2*ln(L) + 2*k*ln(ln(T))
            hq = -2 * log_lik + 2 * k * np.log(np.log(T))

            # Store the criteria for this lag length.
            info_criteria_results.append({'lags': p, 'AIC': aic, 'BIC': bic, 'HQ': hq, 'n_obs': T, 'n_params': k})
            logging.info(f"--- Lag p={p} completed successfully. BIC: {bic:.2f} ---")

        except Exception as e:
            # If any step fails, log the error and continue.
            logging.error(f"!!! Analysis for lag p={p} failed: {e}", exc_info=True)
            info_criteria_results.append({'lags': p, 'AIC': np.nan, 'BIC': np.nan, 'HQ': np.nan, 'n_obs': np.nan, 'n_params': np.nan})

    logging.info("====== BVAR LAG LENGTH ROBUSTNESS ANALYSIS COMPLETE ======")

    # --- 5. Assemble and Return Final Reports ---
    # Create the final report DataFrame for the information criteria.
    info_criteria_report = pd.DataFrame(info_criteria_results).set_index('lags')

    return {
        'information_criteria_report': info_criteria_report,
        'irf_results_by_lag': irf_results_by_lag
    }


# =============================================================================
# TASK 17, STEP 3: SPECIFICATION ROBUSTNESS TESTS
# =============================================================================

def run_specification_robustness_analysis(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any],
    specification_scenarios: List[Dict[str, Any]]
) -> Dict[str, Dict[str, Any]]:
    """
    Performs robustness checks against alternative model and data specifications.

    Purpose:
        This function systematically tests the sensitivity of the main BVAR
        results to various changes in the model specification. This includes
        using different variable transformations (e.g., first differences),
        substituting alternative data series (e.g., industrial production for
        GDP), and modifying the set of deterministic regressors (e.g., removing
        the time trend).

    Process:
        1.  **Iterate Through Scenarios**: The function loops through a list of
            pre-defined specification scenarios. Each scenario dictionary
            contains a descriptive name and instructions on how to modify the
            data or configuration.
        2.  **Modify Data/Configuration**: For each scenario, it creates a
            modified version of the `analysis_ready_df` or the `config` object.
            - For 'first_differences', it differences the endogenous variables.
            - For 'variable_swap', it replaces one column with another.
            - For 'config_change', it modifies the list of exogenous variables.
        3.  **Execute BVAR Pipeline**: It executes the core BVAR estimation and
            impulse response calculation pipeline (Tasks 9, 10, and 12) using
            the modified data and/or configuration for the current scenario.
        4.  **Store Results**: The complete impulse response function object for
            each scenario is stored in a master dictionary, keyed by the
            scenario name, for later comparison.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset from the
                                          baseline preparation.
        config (Dict[str, Any]): The main study configuration dictionary.
        specification_scenarios (List[Dict[str, Any]]): A list of dictionaries,
            each defining a robustness check. Example:
            [{'name': 'first_differences', 'type': 'data_transform',
              'transform_func': lambda df: df.diff().dropna()},
             {'name': 'no_trend', 'type': 'config_change',
              'config_path': ['bvar_specification', 'exogenous_variables'],
              'value': ['shock_FED', 'shock_ECB', 'covid_dummy']}]

    Returns:
        Dict[str, Dict[str, Any]]: A dictionary where keys are the scenario
        names. Each value is either the full IRF results dictionary or an
        error message if the run failed.
    """
    # --- 0. Initialization ---
    all_spec_results: Dict[str, Dict[str, Any]] = {}
    logging.info("====== STARTING BVAR SPECIFICATION ROBUSTNESS ANALYSIS ======")

    # --- 1. Iterate Through Scenarios ---
    for scenario in specification_scenarios:
        scenario_name = scenario['name']
        scenario_type = scenario['type']
        logging.info(f"--- Running specification scenario: '{scenario_name}' ---")

        # --- 2. Modify Data and/or Configuration ---
        # Start with copies of the baseline data and config.
        scenario_df = analysis_ready_df.copy()
        scenario_config = copy.deepcopy(config)

        try:
            if scenario_type == 'data_transform':
                # Apply a function to transform the data.
                transform_func = scenario['transform_func']
                scenario_df = transform_func(scenario_df)

            elif scenario_type == 'variable_swap':
                # Replace one variable with another.
                original_var = scenario['original_var']
                new_var = scenario['new_var']
                if new_var not in scenario_df.columns:
                    raise ValueError(f"Swap variable '{new_var}' not found in DataFrame.")
                # Perform the swap.
                scenario_df[original_var] = scenario_df[new_var]
                # Update the list of endogenous variables in the config.
                scenario_config['bvar_specification']['endogenous_variables'] = [
                    v if v != original_var else new_var for v in config['bvar_specification']['endogenous_variables']
                ]

            elif scenario_type == 'config_change':
                # Modify a nested value in the configuration dictionary.
                path = scenario['config_path']
                value = scenario['value']
                temp_dict = scenario_config
                for key in path[:-1]:
                    temp_dict = temp_dict[key]
                temp_dict[path[-1]] = value

            else:
                raise ValueError(f"Unknown scenario type: {scenario_type}")

            # --- 3. Execute BVAR Pipeline for the Scenario ---
            # Task 9: Set up the BVAR with the modified data/config.
            bvar_setup = run_phase4_task9_bvar_setup(
                analysis_ready_df=scenario_df,
                config=scenario_config
            )

            # Task 10: Run the Gibbs sampler.
            posterior_draws = run_bvar_gibbs_sampler(
                bvar_setup=bvar_setup,
                config=scenario_config
            )

            # Task 12: Calculate and summarize impulse responses.
            irf_results = calculate_var_impulse_responses(
                posterior_draws=posterior_draws,
                bvar_setup=bvar_setup,
                config=scenario_config
            )

            # --- 4. Store Results ---
            all_spec_results[scenario_name] = {
                'irf_results': irf_results,
                'status': 'SUCCESS'
            }
            logging.info(f"--- Scenario '{scenario_name}' completed successfully. ---")

        except Exception as e:
            # If any step fails, log the error and continue.
            logging.error(f"!!! Scenario '{scenario_name}' failed: {e}", exc_info=True)
            all_spec_results[scenario_name] = {"status": "FAIL", "error": str(e)}

    logging.info("====== BVAR SPECIFICATION ROBUSTNESS ANALYSIS COMPLETE ======")

    return all_spec_results



# =============================================================================
# TASK 17: ESTIMATION METHOD ROBUSTNESS ORCHESTRATOR
# =============================================================================

def run_estimation_robustness_suite(
    analysis_ready_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates a full suite of robustness checks for the BVAR estimation method.

    Purpose:
        This master orchestrator for Task 17 executes a series of analyses to
        test the robustness of the main findings to the specific choices made
        during the BVAR model estimation. It systematically explores the impact
        of alternative prior beliefs, dynamic specifications (lag length), and
        data definitions.

    Process:
        The function sequentially executes the three main estimation robustness checks:
        1.  **Prior Sensitivity Analysis**:
            - Defines a set of alternative prior hyperparameter settings (e.g.,
              tighter/looser priors).
            - Calls `run_prior_sensitivity_analysis` to re-estimate the BVAR
              and compute IRFs for each alternative prior.
        2.  **Lag Length Robustness**:
            - Defines a list of alternative lag lengths to test.
            - Calls `run_lag_length_robustness_analysis` to re-estimate the BVAR
              for each lag length, calculating information criteria and IRFs.
        3.  **Specification Robustness**:
            - Defines a list of alternative model specifications (e.g., using
              first-differenced data, removing the time trend).
            - Calls `run_specification_robustness_analysis` to re-estimate the
              BVAR for each alternative specification.

    Args:
        analysis_ready_df (pd.DataFrame): The final, complete dataset from the
                                          baseline preparation, potentially
                                          including extra columns for robustness
                                          checks (e.g., Industrial Production).
        config (Dict[str, Any]): The main study configuration dictionary.

    Returns:
        Dict[str, Any]: A dictionary containing the detailed results from each
                        of the three robustness analyses.
    """
    # --- 0. Initialization ---
    # This dictionary will store the results of all robustness checks.
    robustness_suite_results: Dict[str, Any] = {}

    logging.info("====== STARTING ESTIMATION METHOD ROBUSTNESS SUITE ======")

    # --- 1. Step 1: Prior Sensitivity Analysis ---
    logging.info("--- Task 17.1: Running Prior Sensitivity Analysis ---")
    try:
        # Define the scenarios for prior sensitivity. These are based on the
        # instructions and represent meaningful deviations from the baseline.
        prior_scenarios = {
            'tighter_prior': {'lambda_1_overall_tightness': 0.05},
            'looser_prior': {'lambda_1_overall_tightness': 0.2},
            'faster_lag_decay': {'lambda_3_lag_decay': 0.5}
        }

        # Execute the prior sensitivity orchestrator.
        prior_results = run_prior_sensitivity_analysis(
            analysis_ready_df=analysis_ready_df,
            config=config,
            prior_scenarios=prior_scenarios
        )
        # Store the results.
        robustness_suite_results['prior_sensitivity_analysis'] = prior_results
        logging.info("--- Prior Sensitivity Analysis completed successfully. ---")
    except Exception as e:
        # Log any failure but allow the suite to continue.
        logging.error(f"!!! Prior Sensitivity Analysis failed: {e}", exc_info=True)
        robustness_suite_results['prior_sensitivity_analysis'] = {"status": "FAIL", "error": str(e)}

    # --- 2. Step 2: Lag Length Robustness ---
    logging.info("--- Task 17.2: Running Lag Length Robustness Analysis ---")
    try:
        # Define the alternative lag lengths to test, excluding the baseline.
        baseline_lags = config['bvar_specification']['lags']
        lags_to_test = [p for p in [2, 4, 5] if p != baseline_lags]

        # Execute the lag length robustness orchestrator.
        lag_length_results = run_lag_length_robustness_analysis(
            analysis_ready_df=analysis_ready_df,
            config=config,
            lags_to_test=lags_to_test
        )
        # Store the results.
        robustness_suite_results['lag_length_robustness_analysis'] = lag_length_results
        logging.info("--- Lag Length Robustness Analysis completed successfully. ---")
    except Exception as e:
        logging.error(f"!!! Lag Length Robustness Analysis failed: {e}", exc_info=True)
        robustness_suite_results['lag_length_robustness_analysis'] = {"status": "FAIL", "error": str(e)}

    # --- 3. Step 3: Specification Robustness ---
    logging.info("--- Task 17.3: Running Specification Robustness Analysis ---")
    try:
        # Define the list of alternative specification scenarios.
        # This requires careful definition of data transformations and config changes.
        endog_vars = config['bvar_specification']['endogenous_variables']
        exog_vars_baseline = config['bvar_specification']['exogenous_variables']

        specification_scenarios = [
            {
                'name': 'first_differences',
                'type': 'data_transform',
                'transform_func': lambda df: df[endog_vars].diff().dropna()
            },
            {
                'name': 'no_trend',
                'type': 'config_change',
                'config_path': ['bvar_specification', 'exogenous_variables'],
                'value': [v for v in exog_vars_baseline if v != 'trend']
            }
            # Note: Adding a 'variable_swap' test for IP vs GDP would require
            # ensuring the IP data is present in `analysis_ready_df` and then
            # adding a scenario dict here.
        ]

        # Execute the specification robustness orchestrator.
        spec_results = run_specification_robustness_analysis(
            analysis_ready_df=analysis_ready_df,
            config=config,
            specification_scenarios=specification_scenarios
        )
        # Store the results.
        robustness_suite_results['specification_robustness_analysis'] = spec_results
        logging.info("--- Specification Robustness Analysis completed successfully. ---")
    except Exception as e:
        logging.error(f"!!! Specification Robustness Analysis failed: {e}", exc_info=True)
        robustness_suite_results['specification_robustness_analysis'] = {"status": "FAIL", "error": str(e)}

    logging.info("====== ESTIMATION METHOD ROBUSTNESS SUITE COMPLETE ======")

    # Return the dictionary containing all robustness check results.
    return robustness_suite_results



In [None]:
# Top-Level Orchestrator

# =============================================================================
# FINAL TOP-LEVEL ORCHESTRATOR
# =============================================================================

def execute_full_study_pipeline(
    equity_tick_df: pd.DataFrame,
    rate_tick_df: pd.DataFrame,
    macro_df: pd.DataFrame,
    announcement_df: pd.DataFrame,
    target_market: str,
    study_config: Dict[str, Any],
    run_identification_robustness: bool = True,
    run_estimation_robustness: bool = True
) -> Dict[str, Any]:
    """
    Executes the entire research study, including the main analysis and all robustness checks.

    Purpose:
        This top-level orchestrator is the single entry point for the entire
        analytical project. It provides a complete, end-to-end, reproducible
        workflow, from raw data ingestion to the final, fully validated results.
        It is designed to be the master controller that a researcher would call
        to replicate the entire study.

    Process:
        1.  **Execute Benchmark Pipeline**: It first calls the main orchestrator,
            `run_transatlantic_spillovers_analysis` (Task 15), to perform the
            complete baseline analysis. This includes data cleaning, shock
            identification, model estimation, and results generation.
        2.  **Handle Benchmark Failure**: If the benchmark pipeline fails for any
            reason, the entire process halts, as the robustness checks are
            contingent on a successful benchmark run.
        3.  **Execute Identification Robustness Suite**: If enabled, it calls the
            `run_identification_robustness_suite` orchestrator (Task 16). This
            function uses the benchmark results and the original raw data to
            perform a series of checks on the stability of the shock
            identification method (PMSR, rotational uncertainty, sub-samples).
        4.  **Execute Estimation Robustness Suite**: If enabled, it calls the
            `run_estimation_robustness_suite` orchestrator (Task 17). This
            function uses the final analysis-ready dataset from the benchmark
            run to test the sensitivity of the results to alternative BVAR
            specifications (priors, lag length, variable definitions).
        5.  **Compile Final Results**: It assembles the outputs from the benchmark
            run and all robustness suites into a final, comprehensive master
            results dictionary.

    Args:
        equity_tick_df (pd.DataFrame): Raw high-frequency equity tick data.
        rate_tick_df (pd.DataFrame): Raw high-frequency interest rate tick data.
        macro_df (pd.DataFrame): Raw long-format macroeconomic time series data.
        announcement_df (pd.DataFrame): Raw central bank announcement metadata.
        target_market (str): The market to study (e.g., 'CAN').
        study_config (Dict[str, Any]): The main study configuration dictionary.
        run_identification_robustness (bool): If True, runs the full suite of
            identification robustness checks (Task 16).
        run_estimation_robustness (bool): If True, runs the full suite of
            estimation method robustness checks (Task 17).

    Returns:
        Dict[str, Any]: A master dictionary containing the complete results of
                        the benchmark analysis and all executed robustness checks.
    """
    # --- 0. Initialization ---
    # Configure logging for the entire run.
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    # This dictionary will hold all outputs of the entire study.
    study_results: Dict[str, Any] = {}

    # --- 1. Execute Benchmark Pipeline (Task 15) ---
    logging.info("="*80)
    logging.info("STEP 1: EXECUTING BENCHMARK ANALYSIS PIPELINE")
    logging.info("="*80)

    # The benchmark run is mandatory. If it fails, the entire study fails.
    benchmark_run_results = run_transatlantic_spillovers_analysis(
        equity_tick_df=equity_tick_df,
        rate_tick_df=rate_tick_df,
        macro_df=macro_df,
        announcement_df=announcement_df,
        target_market=target_market,
        study_config=study_config
    )

    # Store the benchmark results.
    study_results['benchmark_run'] = benchmark_run_results

    # Halt execution if the benchmark pipeline failed.
    if benchmark_run_results.get('metadata', {}).get('final_status') == "FAIL":
        logging.error("Benchmark pipeline failed. Halting execution of robustness suites.")
        return study_results

    # --- 2. Execute Identification Robustness Suite (Task 16) ---
    if run_identification_robustness:
        logging.info("="*80)
        logging.info("STEP 2: EXECUTING IDENTIFICATION ROBUSTNESS SUITE")
        logging.info("="*80)

        # This suite requires the original raw data for the sub-sample analysis.
        raw_data_dict = {
            'equity_tick_df': equity_tick_df,
            'rate_tick_df': rate_tick_df,
            'macro_df': macro_df,
            'announcement_df': announcement_df
        }

        # Call the Task 16 orchestrator.
        identification_robustness_results = run_identification_robustness_suite(
            benchmark_results=benchmark_run_results,
            raw_data=raw_data_dict
        )
        # Store the results.
        study_results['identification_robustness_suite'] = identification_robustness_results
    else:
        logging.info("Skipping Identification Robustness Suite as per configuration.")

    # --- 3. Execute Estimation Method Robustness Suite (Task 17) ---
    if run_estimation_robustness:
        logging.info("="*80)
        logging.info("STEP 3: EXECUTING ESTIMATION METHOD ROBUSTNESS SUITE")
        logging.info("="*80)

        # This suite requires the final analysis-ready DataFrame from the benchmark run.
        analysis_ready_df = benchmark_run_results['phase_3_model_prep']['analysis_ready_df']

        # Call the Task 17 orchestrator.
        estimation_robustness_results = run_estimation_robustness_suite(
            analysis_ready_df=analysis_ready_df,
            config=study_config
        )
        # Store the results.
        study_results['estimation_robustness_suite'] = estimation_robustness_results
    else:
        logging.info("Skipping Estimation Method Robustness Suite as per configuration.")

    logging.info("="*80)
    logging.info("ENTIRE STUDY PIPELINE COMPLETED.")
    logging.info("="*80)

    # Return the final, comprehensive results object.
    return study_results
