### **`README.md`**

# A Quantitative Framework for Modeling Political Distance and Trade

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/downloads/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Type Checking: mypy](https://img.shields.io/badge/type_checking-mypy-blue)](http://mypy-lang.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2509.17303-b31b1b.svg)](https://arxiv.org/abs/2509.17303)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Discipline](https://img.shields.io/badge/Discipline-International%20Political%20Economy-blue)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Methodology](https://img.shields.io/badge/Methodology-Structural%20Gravity%20%7C%20PPML%20%7C%20Particle%20Filter-orange)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Data Source](https://img.shields.io/badge/Data-GDELT%20%7C%20UNGA%20Votes%20%7C%20IMF%20DOTS-lightgrey)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%23025596?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![Statsmodels](https://img.shields.io/badge/statsmodels-1A568C.svg?style=flat)](https://www.statsmodels.org/stable/index.html)
[![PyFixest](https://img.shields.io/badge/pyfixest-0B559E.svg?style=flat)](https://github.com/s3alfisc/pyfixest)
[![Pydantic](https://img.shields.io/badge/pydantic-E92063.svg?style=flat)](https://pydantic-docs.helpmanual.io/)
[![PyYAML](https://img.shields.io/badge/PyYAML-4B5F6E.svg?style=flat)](https://pyyaml.org/)
[![Joblib](https://img.shields.io/badge/joblib-2F72A4.svg?style=flat)](https://joblib.readthedocs.io/en/latest/)
[![Modelsummary](https://img.shields.io/badge/modelsummary-D4352C.svg?style=flat)](https://modelsummary.com/)
[![Analysis](https://img.shields.io/badge/Analysis-Geopolitical%20Risk-brightgreen)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Framework](https://img.shields.io/badge/Framework-Bayesian%20Filtering-blueviolet)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Model](https://img.shields.io/badge/Model-Non--Linear%20Panel-red)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Validation](https://img.shields.io/badge/Validation-Temporal%20Analysis-yellow)](https://github.com/chirindaopensource/trade_political_distance_wto)
[![Robustness](https://img.shields.io/badge/Robustness-Sensitivity%20Analysis-lightgrey)](https://github.com/chirindaopensource/trade_political_distance_wto)

--

**Repository:** `https://github.com/chirindaopensource/trade_political_distance_wto`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Trade, Political Distance and the World Trade Organization"** by:

*   Samuel Hardwick

The project provides a complete, end-to-end computational framework for quantifying the economic penalty of political friction on international trade and assessing the mitigating role of the World Trade Organization. It delivers a modular, auditable, and extensible pipeline that replicates the paper's entire workflow: from rigorous data validation and the sophisticated filtering of high-frequency event data, through the estimation of complex non-linear panel models with high-dimensional fixed effects, to the final synthesis and presentation of results.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: execute_full_research_pipeline](#key-callable-execute_full_research_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "Trade, Political Distance and the World Trade Organization." The core of this repository is the iPython Notebook `trade_political_distance_wto_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation to the final generation of tables, figures, and a synthesis report.

The paper addresses a key question in international political economy: Do multilateral institutions like the WTO shield commerce from geopolitical tensions? This codebase operationalizes the paper's advanced approach, allowing users to:
-   Rigorously validate and assess the quality of a complex, multi-source panel dataset.
-   Extract a smoothed signal of political relations from noisy, high-frequency news event data using a state-space model (Particle Filter).
-   Estimate a state-of-the-art structural gravity model using Poisson Pseudo-Maximum Likelihood (PPML) with high-dimensional fixed effects.
-   Analyze the extensive margin of trade using a Logit model with advanced, bias-corrected estimation and inference (Split-Panel Jackknife with Bootstrap).
-   Quantify the economic and statistical significance of the WTO's mitigating effect on political distance.
-   Systematically test the stability of the findings across a wide array of robustness checks.

## Theoretical Background

The implemented methods are grounded in modern econometrics, computational statistics, and international political economy.

**1. Structural Gravity Model:**
The workhorse model is the structural gravity equation of trade, which has strong theoretical microfoundations. The model explains bilateral trade flows as a function of economic size and various trade frictions or promoters. In this paper, the key friction is political distance. The model is specified in its multiplicative form and estimated with PPML to handle heteroskedasticity and zero trade flows correctly. The inclusion of high-dimensional fixed effects (exporter-year, importer-year, and country-pair) is critical to control for all time-varying country-specific factors (Multilateral Resistance Terms) and all time-invariant dyadic factors, isolating the effect of interest.
$$ X_{ijt} = \exp\left(\left[\beta_0 PD_{ijt} + \sum_z \beta_z (PD_{ijt} \times z_{ijt})\right] \mathbf{1}_{(i \neq j)} + \delta_{it} + \delta_{jt} + \delta_{ij} + \text{Border}_{ijt}\right) \times \varepsilon_{ijt} $$

**2. State-Space Model and Particle Filter:**
To measure short-term political relations, the paper treats the "true" but unobserved political stance between two countries as a latent state ($s_{ijt}$) that evolves over time. The noisy, high-frequency GDELT event data ($y_{ijt}$) is treated as a measurement of this state.
-   **State Equation (Random Walk):** $s_{ijt} = s_{ij,t-1} + v_t$, where $v_t \sim \mathcal{N}(0, Q_{ij})$
-   **Observation Equation:** $y_{ijt} = s_{ijt} + \eta_t$, where $\eta_t \sim \mathcal{N}(0, R_{ijt})$
Because the distributions may not be strictly Gaussian, this system is solved using a **Particle Filter**, a Sequential Monte Carlo method that approximates the posterior distribution of the state at each time step. This is a sophisticated technique to filter signal from noise.

**3. Inference in Non-Linear Panel Models:**
-   **PPML:** Standard errors are clustered at the country-pair level to account for serial correlation in the error term for a given dyad.
-   **Logit:** Estimating non-linear models (like Logit) with many fixed effects suffers from the incidental parameters problem, which biases the coefficients. The paper uses the **Split-Panel Jackknife** (Hinz, Stammann, and Wanner, 2021) to correct this bias. Standard errors are then computed via a **cluster bootstrap of the entire jackknife procedure**, a computationally intensive but highly robust inference method.

## Features

The provided iPython Notebook (`trade_political_distance_wto_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Phase Architecture:** The entire pipeline is broken down into 21 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All methodological and computational parameters are managed in an external `config.yaml` file, allowing for easy customization without code changes.
-   **Advanced GDELT Data Processing:** A complete Particle Filter implementation to generate a smoothed, high-quality measure of political relations from raw event data.
-   **State-of-the-Art Estimation:** High-performance estimation of PPML and Logit models with multiple high-dimensional fixed effects using the `pyfixest` library.
-   **Rigorous Inference:**
    -   Complete implementation of the Split-Panel Jackknife for bias correction.
    -   A parallelized cluster bootstrap of the jackknife procedure for valid standard errors.
    -   A full implementation of the multivariate Delta Method for calculating the standard errors of non-linear marginal effects.
-   **Comprehensive Analysis Suite:** Functions to quantify institutional effects, test for temporal stability, and perform formal hypothesis tests (Wald, F-tests).
-   **Parallelized Robustness Framework:** A powerful, extensible orchestrator for running dozens of robustness checks in parallel using `joblib`.
-   **Automated Reporting:** Programmatic generation of publication-quality tables (`modelsummary`), figures (`seaborn`), and a final JSON synthesis report.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Preprocessing (Tasks 1-6):** Ingests and rigorously validates all raw data and the `config.yaml` file, performs a deep data quality audit, runs the Particle Filter to create the GDELT measure, and constructs all other analytical variables.
2.  **Model & Sample Preparation (Tasks 7-9):** Adds fixed effects identifiers and interaction terms, creates the specific analytical samples (including domestic trade), and prepares generators for bootstrap and jackknife inference.
3.  **Estimation (Tasks 10-11):** Estimates the baseline PPML and Logit models using the advanced methods described above.
4.  **Analysis & Interpretation (Tasks 12-15):** Runs model diagnostics, calculates economically meaningful marginal effects, quantifies the key institutional effects, and analyzes their stability over time.
5.  **Robustness & Reporting (Tasks 16-21):** Orchestrates the entire suite of robustness checks and generates all final tables, figures, and a synthesis report.

## Core Components (Notebook Structure)

The `trade_political_distance_wto_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: execute_full_research_pipeline

The central function in this project is `execute_full_research_pipeline`. It orchestrates the entire analytical workflow, providing a single entry point for running the baseline study and all associated robustness checks.

```python
def execute_full_research_pipeline(
    master_df: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str,
    run_intensive_logit: bool = True,
    run_robustness_checks: bool = True
) -> None:
    """
    Executes the complete end-to-end research pipeline for the study.
    """
    # ... (implementation is in the notebook)
```

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `statsmodels`, `pyyaml`, `tqdm`, `joblib`, `matplotlib`, `seaborn`, `pydantic`, `pyfixest`, `modelsummary`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/trade_political_distance_wto.git
    cd trade_political_distance_wto
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install -r requirements.txt
    ```

## Input Data Structure

The pipeline requires a single `pandas.DataFrame` and a `config.yaml` file. A mock data generation function is provided in the main notebook to create a valid example for testing. The `master_df` must conform to the 26-column schema defined in the `define_full_schema()` function, which includes dyadic trade data, country-level controls for both exporter and importer, and UNGA ideal points.

## Usage

The `trade_political_distance_wto_draft.ipynb` notebook provides a complete, step-by-step guide. The core workflow is:

1.  **Prepare Inputs:** Load your master `pandas.DataFrame`. Ensure the `config.yaml` file is present in the same directory.
2.  **Execute Pipeline:** Call the grand orchestrator function.

    ```python
    # This single call runs the entire project.
    execute_full_research_pipeline(
        master_df=my_master_data,
        config=my_config_dict,
        output_dir="./research_outputs",
        run_intensive_logit=False,       # Set to True for the full analysis
        run_robustness_checks=False      # Set to True for the full analysis
    )
    ```
3.  **Inspect Outputs:** Check the specified output directory (`./research_outputs`) for all generated tables, figures, and the final JSON report.

## Output Structure

The pipeline does not return an object. Its side effect is the creation of a structured output directory:

```
research_outputs/
│
├── table_1_main_results.csv              # Main PPML and Logit regression results
├── table_2_robustness_summary.csv        # Summary of key coefficients from robustness checks
├── figure_1_ppml_marginal_effects.png    # Plot of main marginal effects
├── figure_2_temporal_heterogeneity.png   # Plot showing evolution of effects over time
├── figure_3_robustness_stability.png     # Coefficient stability plot
└── final_synthesis_report.json           # Programmatically generated summary of all findings
```

## Project Structure

```
trade_political_distance_wto/
│
├── trade_political_distance_wto_draft.ipynb   # Main implementation notebook
├── config.yaml                                # Master configuration file
├── requirements.txt                           # Python package dependencies
├── LICENSE                                    # MIT license file
└── README.md                                  # This documentation file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can easily modify all methodological parameters, such as particle filter settings, estimation choices, and robustness check definitions, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Alternative Estimators:** Adding support for other non-linear estimators like Negative Binomial for the intensive margin.
-   **Dynamic Panel Models:** Incorporating lagged dependent variables to model persistence in trade flows.
-   **Machine Learning Benchmarks:** Comparing the structural model's performance against predictive models like Gradient Boosting or Random Forests for forecasting trade flows under geopolitical stress.
-   **Interactive Visualization:** Building a dashboard (e.g., with Dash or Streamlit) to allow users to interactively explore the results and run custom scenarios.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{hardwick2025trade,
  title={{Trade, Political Distance and the World Trade Organization}},
  author={Hardwick, Samuel},
  journal={arXiv preprint arXiv:2509.17303},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Quantitative Framework for Modeling Political Distance and Trade.
GitHub repository: https://github.com/chirindaopensource/trade_political_distance_wto
```

## Acknowledgments

-   Credit to **Samuel Hardwick** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, Statsmodels, Pydantic, Joblib, Matplotlib, Seaborn, PyFixest, and Modelsummary**, whose work makes complex computational analysis accessible and robust.

--

*This README was generated based on the structure and content of `trade_political_distance_wto_draft.ipynb` and follows best practices for research software documentation.*

# Paper

Title: "*Trade, Political Distance and the World Trade Organization*"

Authors: Samuel Hardwick

E-Journal Submission Date: 22 September 2025

Link: https://arxiv.org/abs/2509.17303

Abstract:

Trade agreements are often understood as shielding commerce from fluctuations in political relations. This paper provides evidence that World Trade Organization membership reduces the penalty of political distance on trade at the extensive margin. Using a structural gravity framework covering 1948 to 2023 and two measures of political distance, based on high-frequency events data and UN General Assembly votes, GATT/WTO status is consistently associated with a wider range of products traded between politically distant partners. The association is strongest in the early WTO years (1995 to 2008). Events-based estimates also suggest attenuation at the intensive margin, while UN vote-based estimates do not. Across all specifications, GATT/WTO membership increases aggregate trade volumes. The results indicate that a function of the multilateral trading system has been to foster new trade links across political divides, while raising trade volumes among both close and distant partners.

# Summary

### **Summary: Hardwick (2025)**

This is a well-executed study that tackles a classic and newly urgent question: Does the multilateral trading system, embodied by the GATT/WTO, insulate international commerce from political friction? The author provides a nuanced, data-driven answer, suggesting that its primary function has been to foster the *creation* of new trade links across political divides, particularly during a "golden era" of multilateralism.

Here is a breakdown of the paper's methodology, findings, and contributions.

--

#### **The Core Research Question and Motivation**

The paper investigates whether membership in the GATT/WTO moderates the negative relationship between "political distance" (i.e., geopolitical disagreement or antagonism) and bilateral trade.

*   **Motivation:** The study is framed against the backdrop of rising geopolitical tensions and fears of "geoeconomic fragmentation," where trade patterns might revert to align with political blocs (e.g., US-centric vs. China-centric). Understanding the historical role of the WTO in mitigating such tendencies is therefore highly relevant for policy.
*   **Hypothesis:** The implicit hypothesis is that the rules, norms, and dispute settlement mechanisms of the GATT/WTO provide a degree of policy certainty and reduce transaction costs, thereby lowering the "penalty" that political distance otherwise imposes on trade.

--

#### **Data and Measurement of Political Distance (A Methodological Contribution)**

A key strength of this paper is its use of two distinct, complementary measures of political distance over a long time horizon (1948–2023).

1.  **UN General Assembly (UNGA) Ideal Point Distance:** This is a standard measure in political science. It uses countries' voting records in the UNGA to estimate their latent foreign policy positions on a one-dimensional scale. The absolute difference between two countries' "ideal points" serves as a proxy for their long-term ideological and geopolitical alignment. It is an annual, slow-moving indicator of structural alignment.

2.  **GDELT Event-Based Index:** This is a high-frequency measure designed to capture short-run diplomatic shocks and interactions.
    *   **Source:** It is constructed from the Global Database of Events, Language and Tone (GDELT), which machine-codes news reports for interactions between countries. Each event is assigned a "Goldstein score" from -10 (severe conflict) to +10 (major cooperation).
    *   **Novelty (The CS/Econometrics Angle):** Raw event data is notoriously noisy and sparse for many country pairs. Hardwick makes a notable methodological contribution by applying a **particle filter** to this data. This is a sophisticated Bayesian state-space modeling technique. The intuition is as follows:
        *   There exists a "true," unobserved (latent) state of political relations between two countries.
        *   The observed monthly GDELT score is a noisy signal of this true state.
        *   The particle filter uses a simulation-based approach to estimate the evolution of the true latent state over time, giving more weight to periods with denser news coverage (and thus a stronger signal). This smooths the series and reduces measurement error, a significant improvement over using raw or simply averaged scores.

--

#### **The Econometric Framework (Adherence to Best Practices)**

The author employs a state-of-the-art **structural gravity model**, which is the theoretical and empirical workhorse of modern international trade analysis.

*   **Estimator:** The model is estimated using the **Poisson Pseudo-Maximum Likelihood (PPML)** estimator. This is now the standard for gravity models because it is robust to heteroskedasticity (a common issue in trade data) and naturally includes observations with zero trade, which are crucial for analyzing the formation of new trade links.
*   **Fixed Effects:** The specification includes a comprehensive set of fixed effects to control for unobserved factors:
    *   **Importer-Year (`d_jt`) and Exporter-Year (`d_it`):** These absorb all time-varying, country-specific factors, such as GDP, domestic policies, and, critically, "multilateral resistance" terms from gravity theory.
    *   **Country-Pair (`d_ij`):** These absorb all time-invariant bilateral factors like geographic distance, common language, and colonial history.
*   **Core Specification:** The central analysis relies on interacting the political distance (`PD`) variable with institutional variables, chiefly `GATTWTO` membership. The key coefficient of interest is on the `PD x GATTWTO` term. A positive coefficient would indicate that the negative effect of political distance on trade is *weaker* for pairs where both are WTO members.
*   **Decomposition of Trade Margins:** The analysis goes beyond aggregate trade flows by decomposing the effect into:
    *   **Intensive Margin:** The average value of trade per product or sector that is already being traded.
    *   **Extensive Margin:** The number of different products (at the HS-6 digit level) or sectors being traded. This margin captures the creation of *new* trade relationships.

--

#### **The Principal Findings (The "What, How, and When")**

The results are presented clearly and build a compelling narrative.

1.  **Baseline:** Without accounting for institutions, political distance is, on average, negatively correlated with trade. A greater political divide means less trade.

2.  **The Attenuating Effect of GATT/WTO:** The central finding is that joint membership in the GATT/WTO significantly **attenuates** the negative impact of political distance. The interaction term `PD x GATTWTO` is consistently positive and significant in many specifications.

3.  **The Primacy of the Extensive Margin:** This is the most important nuance. The attenuating effect of the WTO is strongest and most robust at the **extensive margin**. This means the WTO's primary role in overcoming political friction has been to help countries establish *new* trade links (i.e., start trading in new products/sectors) rather than simply increasing the volume of existing trade. For politically distant partners, the WTO appears to lower the entry barriers for commerce.

4.  **Temporal Heterogeneity:** The effect was not constant over time. The analysis by sub-period reveals that the WTO's ability to shield trade from politics was **strongest in the early WTO years (1995–2008)**. In the post-2008 period, the evidence suggests that politics and trade have become more closely linked again, even for WTO members, a finding consistent with the broader narrative of slowing globalization and rising geopolitical competition.

--

#### **Robustness and Validation**

The author performs a commendable series of robustness checks, which strengthens confidence in the core results. The main finding—that WTO membership attenuates the political distance penalty at the extensive margin—holds up when:
*   Excluding China from the sample.
*   Restricting the sample to only democratic country pairs.
*   Removing pairs involved in active military conflict.
*   Using alternative specifications of the GDELT index and different time frequencies (quarterly data).

--

#### **Conclusion and Contribution**

In sum, this paper provides strong, robust evidence that a key function of the multilateral trading system has been to foster the creation of new trade ties across political divides. It did not necessarily make politically distant partners trade more intensely than close partners, but it created a framework where commerce between them could begin and be sustained.

The finding that this effect was concentrated in the 1995-2008 period is particularly sobering. It suggests that the benefits of the system are not automatic and may depend on a broader context of active multilateral cooperation, which has waned in recent years. The paper thus provides a valuable historical and quantitative perspective on the very mechanisms of global economic integration that are currently under strain. It is a solid piece of empirical work that skillfully blends methods from economics, political science, and data science.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================
#
#  A Quantitative Framework for Modeling the Impact of Political Distance on
#  International Trade and the Mitigating Role of the World Trade Organization
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Trade, Political Distance and the World
#  Trade Organization" by Samuel Hardwick (2025). It delivers a robust,
#  end-to-end system for quantifying the economic penalty of political friction
#  on trade flows and assessing the value of international institutions like the
#  WTO in mitigating this geopolitical risk.
#
#  Core Methodological Components:
#  • Structural Gravity Model of Trade with high-dimensional fixed effects.
#  • Poisson Pseudo-Maximum Likelihood (PPML) for intensive margin estimation.
#  • Logit models with Split-Panel Jackknife for extensive margin analysis.
#  • Two distinct measures of Political Distance:
#    1. UN General Assembly ideal point distances for long-term alignment.
#    2. High-frequency GDELT event data for short-term diplomatic relations.
#  • State-Space Modeling with a Particle Filter (Sequential Monte Carlo) to
#    extract a smoothed signal of political relations from noisy event data.
#  • Cluster-Robust Standard Errors at the dyad level for valid inference.
#  • Bootstrap of the Jackknife procedure for robust Logit model inference.
#  • Delta Method for calculating standard errors of non-linear marginal effects.
#
#  Technical Implementation Features:
#  • Modular, multi-phase pipeline with task-specific orchestrators.
#  • Configuration-driven design for transparent and reproducible analysis.
#  • Parallelized execution for computationally intensive estimation and
#    robustness checks using `joblib`.
#  • State-of-the-art econometric estimation via the `pyfixest` library.
#  • Programmatic generation of publication-quality tables and figures using
#    `modelsummary`, `matplotlib`, and `seaborn`.
#  • Comprehensive data validation, diagnostics, and synthesis reporting.
#
#  Paper Reference:
#  Hardwick, S. (2025). Trade, Political Distance and the World Trade Organization.
#  arXiv preprint arXiv:2509.17303.
#  https://arxiv.org/abs/2509.17303
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================


# ==============================================================================
# Standard Library Imports
# ==============================================================================
import ast
import json
import math
import re
import traceback
import warnings
from copy import deepcopy
from dataclasses import dataclass, field, FrozenInstanceError
from pathlib import Path
from typing import (Any, Callable, Dict, Final, Iterator, List, Tuple, Type)

# ==============================================================================
# Third-Party Library Imports
# ==============================================================================
import matplotlib.pyplot as plt
import modelsummary
import numpy as np
import pandas as pd
import seaborn as sns
from joblib import Parallel, delayed
from pydantic import (BaseModel, ConfigDict, Field, ValidationError,
                      field_validator)
from pyfixest.estimation import Fixest, feglm, feols
from pyfixest.model_matrix import model_matrix_fixest
from scipy.special import logsumexp
from scipy.stats import chi2, logistic, norm
import statsmodels.api as sm
from statsmodels.tools.sm_exceptions import ConvergenceWarning
from statsmodels.tsa.ar_model import AutoReg
from tqdm import tqdm


# Implementation

## Draft 1

### **Functional Decomposition of the End-to-End Research Pipeline**

#### **Phase 1: Configuration and Data Validation**

**Task 1: `run_configuration_and_validation`**
*   **Inputs:** A raw Python dictionary (`config`) containing the nested configuration for the entire study.
*   **Processes:**
    1.  **Structural Validation:** Validates the entire nested structure, keys, data types, and exact values of the input dictionary against a rigorous Pydantic schema.
    2.  **Functional Parsing:** Securely parses string-based parameters (e.g., the lambda function for observation variance) into executable Python objects using Abstract Syntax Trees (AST) for security.
    3.  **Instantiation:** Consolidates all validated parameters into a single, immutable `ProjectConstants` dataclass object.
*   **Outputs:** An immutable, validated `ProjectConstants` object.
*   **Transformation:** Transforms a raw, potentially error-prone dictionary into a structured, validated, and type-safe parameter object.
*   **Role in Research Pipeline:** This function serves as the **Gatekeeper and Initializer**. It ensures that the analysis is run with the exact parameters specified in the paper (e.g., `M=1000` particles), guaranteeing reproducibility. It implements the methodological constants cited in **Appendix A.1**.

**Task 2: `run_dataframe_validation_and_assessment`**
*   **Inputs:** A `pandas.DataFrame` (`master_df`) and a schema dictionary (`full_schema`).
*   **Processes:**
    1.  **Schema Validation:** Verifies the DataFrame's structure: exact column count, names, and dtypes.
    2.  **Range Validation:** Checks that numerical variables fall within their theoretical bounds (e.g., `PolityScore` in [-10, 10]).
    3.  **Consistency Validation:** Performs logical cross-checks on derived variables (e.g., `DomesticTrade` is consistent with `GDP` and `Exports`).
*   **Outputs:** A boolean `True` if all checks pass; otherwise, it raises a `ValueError`.
*   **Transformation:** This is a pure validation function; it asserts the state of the data without transforming it.
*   **Role in Research Pipeline:** This is the **Data Integrity Checkpoint**. It ensures the raw data is clean, correctly formatted, and internally consistent before any computations, as required by the general empirical strategy in **Section 3**.

**Task 3: `run_data_quality_assessment`**
*   **Inputs:** The validated `pandas.DataFrame`.
*   **Processes:**
    1.  **Missingness Audit:** Calculates per-column missing value percentages and the correlation of missingness.
    2.  **Temporal Assessment:** Calculates time series lengths per dyad, checks for temporal monotonicity, and flags significant data gaps.
*   **Outputs:** A nested dictionary containing detailed diagnostic reports.
*   **Transformation:** Transforms the DataFrame into a structured set of diagnostic statistics.
*   **Role in Research Pipeline:** This is the **Exploratory Data Analysis (EDA)** stage, providing a comprehensive overview of the panel's structure and quality.

#### **Phase 2: Data Preprocessing and Transformation**

**Task 4: `run_gdelt_preprocessing_pipeline`**
*   **Inputs:** The validated `pandas.DataFrame` and the `ProjectConstants` object.
*   **Processes:**
    1.  **Density Filtering:** Filters out dyadic pairs that do not meet the `baseline_min_obs_threshold` (270 months) of GDELT event data.
    2.  **Data Structuring:** Splits the single DataFrame into a dictionary of smaller DataFrames, one for each dyad's time series.
    3.  **Process Variance Estimation:** For each dyad, it fits an AR(1) model to the observed Goldstein scores and extracts the residual variance to estimate the process variance, $Q_{ij}$.
*   **Outputs:** A tuple containing: (1) The dictionary of dyadic time series, and (2) A `pandas.Series` of the estimated process variances ($Q_{ij}$).
*   **Transformation:** Transforms a single DataFrame into two structured outputs tailored for the particle filter.
*   **Role in Research Pipeline:** Implements the **Data Preparation for the Particle Filter** as described in **Appendix A.1**.

**Task 5: `run_particle_filter_pipeline`**
*   **Inputs:** The dictionary of dyadic time series and the Series of process variances ($Q_{ij}$) from Task 4.
*   **Processes:** Executes the full particle filter algorithm for each dyad, involving a sequential loop of:
    *   **Propagation:** $s_{ijt}^{(m)} = s_{ij,t-1}^{(m)} + \epsilon_t^{(m)}$, where $\epsilon_t^{(m)} \sim \mathcal{N}(0, Q_{ij})$.
    *   **Weighting:** $w_{ijt}^{(m)} \propto p(y_{ijt} | s_{ijt}^{(m)})$, correctly handling missing observations.
    *   **Resampling:** Performs systematic resampling based on the Effective Sample Size.
*   **Outputs:** A `pandas.DataFrame` containing the final, smoothed political distance series (`PD_GDELT_Filtered`).
*   **Transformation:** Transforms a dictionary of noisy time series into a single, clean DataFrame of estimated latent states.
*   **Role in Research Pipeline:** This is the **methodological core of the GDELT index construction** (see **Appendix A.1**), creating a key explanatory variable.

**Task 6: `run_variable_construction_pipeline`**
*   **Inputs:** The main DataFrame, the filtered GDELT series, a DataFrame of UNGA ideal points, and a list of columns for IHS transformation.
*   **Processes:**
    1.  Merges the `PD_GDELT_Filtered` series into the main DataFrame.
    2.  Computes the UNGA political distance: $PD_{ijt}^{\text{UNGA}} = |\text{IdealPoint}_{it} - \text{IdealPoint}_{jt}|$.
    3.  Applies the inverse hyperbolic sine transformation ($\sinh^{-1}(x)$) to specified variables.
    4.  Constructs the intensive (`X_ijt / S_ijt`) and extensive (`1(S_ijt > 0)`) trade margins.
*   **Outputs:** A single, fully processed `pandas.DataFrame` with all variables required for estimation.
*   **Transformation:** Enriches the DataFrame with the final set of constructed variables.
*   **Role in Research Pipeline:** Finalizes the **creation of all analytical variables** described in **Section 2** and **Section 3**.

#### **Phase 3: Model Preparation**

**Task 7: `run_panel_data_preparation`**
*   **Inputs:** The fully processed DataFrame and a specification for interaction terms.
*   **Processes:**
    1.  Creates the categorical identifier columns (`FE_ExporterYear`, etc.) for absorbing high-dimensional fixed effects.
    2.  Programmatically creates all interaction terms (e.g., `PD x GATTWTO`).
*   **Outputs:** A `pandas.DataFrame` ready for estimation.
*   **Transformation:** Adds the final identifier and interaction columns required to specify the full econometric model.
*   **Role in Research Pipeline:** Prepares the data for the specific structure of the **structural gravity model** from **Section 3**, specified by:
    $X_{ijt} = \exp\left(\left[\beta_0 PD_{ijt} + \sum_z \beta_z (PD_{ijt} \times z_{ijt})\right] \mathbf{1}_{(i \neq j)} + \delta_{it} + \delta_{jt} + \delta_{ij} + \text{Border}_{ijt}\right) \times \varepsilon_{ijt}$

**Task 8: `run_sample_preparation_pipeline`**
*   **Inputs:** The main DataFrame, a DataFrame of country-level data, and a configuration dictionary.
*   **Processes:**
    1.  **Create PPML Sample:** Augments the panel with domestic trade observations (`GDP - Exports`).
    2.  **Create Logit Sample:** Filters out dyads where the binary trade outcome never changes.
    3.  **Create Temporal Subsamples:** Slices the main sample into distinct time periods.
*   **Outputs:** A dictionary containing the `ppml_sample`, `logit_sample`, and `temporal_subsamples` DataFrames.
*   **Transformation:** Transforms the master DataFrame into multiple, specialized analytical samples.
*   **Role in Research Pipeline:** Creates the **final analytical datasets** for every model estimated in the paper.

**Task 9: `run_inference_preparation`**
*   **Inputs:** An analytical DataFrame and parameters for bootstrap/jackknife.
*   **Processes:**
    1.  Validates the cluster structure (`DyadicPair`).
    2.  Creates a generator for producing cluster-bootstrap resamples.
    3.  Creates a generator for producing split-panel jackknife samples.
*   **Outputs:** A dictionary containing the validated DataFrame and the two generators.
*   **Transformation:** Creates iterable objects (generators) that produce transformed versions of the data on-the-fly.
*   **Role in Research Pipeline:** Prepares the necessary components for the **advanced inference techniques** cited in the paper, such as "pair-clustered standard errors" and the "split-panel jackknife method with bootstrap standard errors."

#### **Phase 4: Primary Econometric Estimation**

**Task 10: `run_ppml_estimation_pipeline`**
*   **Inputs:** The PPML sample DataFrame and a model specification dictionary.
*   **Processes:**
    1.  Constructs a `patsy`-style formula string representing the full structural gravity model.
    2.  Estimates the model using `pyfixest.feols` with `family='poisson'` and computes cluster-robust standard errors.
*   **Outputs:** A `Fixest` results object containing all estimation outputs.
*   **Transformation:** Transforms a dataset and specification into a fitted model object.
*   **Role in Research Pipeline:** This is the **central estimation function** for the main results (e.g., Table 4).

**Task 11: `run_logit_estimation_pipeline`**
*   **Inputs:** The Logit sample, a model specification, and the `ProjectConstants` object.
*   **Processes:**
    1.  **Point Estimation:** Estimates coefficients using the split-panel jackknife procedure for bias correction: $\hat{\beta}_{BC} = \hat{\beta}_{Full} - \frac{G-1}{G} \sum_{g=1}^G (\hat{\beta}_{-g} - \hat{\beta}_{Full})$.
    2.  **Inference:** Computes standard errors by bootstrapping the *entire* jackknife procedure in parallel.
*   **Outputs:** A dictionary containing the final bias-corrected coefficients and their bootstrapped standard errors.
*   **Transformation:** Transforms a dataset and specification into a final set of coefficient estimates and standard errors.
*   **Role in Research Pipeline:** Implements the **estimation of the extensive margin models** (e.g., the "Trade dummy" column in Table 3).

#### **Phase 5: Analysis and Interpretation**

**Task 12: `run_diagnostics_pipeline`**
*   **Inputs:** The fitted PPML and Logit model objects.
*   **Processes:**
    1.  **PPML Diagnostics:** Checks for convergence and tests for overdispersion.
    2.  **Logit Diagnostics:** Checks for convergence (as a proxy for separation) and calculates McFadden's Pseudo R-squared.
    3.  **Cross-Model Consistency:** Compares the sign and significance of key coefficients across models.
*   **Outputs:** A nested dictionary containing all diagnostic reports.
*   **Transformation:** Transforms fitted model objects into a structured set of diagnostic statistics.
*   **Role in Research Pipeline:** This is the **Model Validation** stage, ensuring the estimated models are statistically sound.

**Task 13: `compute_ppml_marginal_effects` & `compute_logit_marginal_effects`**
*   **Role in Research Pipeline:** These are the **core calculation engines for effect sizes**. They translate raw coefficients into economically meaningful quantities (percentage changes or changes in probability). `compute_ppml_marginal_effects` uses the **Delta Method** for standard errors: $SE(g(\beta)) \approx \sqrt{(\nabla g)' V(\beta) (\nabla g)}$. `compute_logit_marginal_effects` uses the more robust **bootstrap distribution**. They produce the numbers for figures like Figure 3.

**Task 14: `run_institutional_quantification_pipeline`**
*   **Inputs:** A fitted model object and a specification of key interaction terms.
*   **Processes:**
    1.  Quantifies the WTO attenuation effect and its standard error using the full Delta Method.
    2.  Performs a formal **Wald test** of the equality of WTO and RTA moderation effects.
    3.  Performs an **F-test** of the joint significance of governance interaction terms.
*   **Outputs:** A dictionary containing the results of these calculations and hypothesis tests.
*   **Transformation:** Transforms model coefficients and VCV matrices into targeted quantities of interest and formal test results.
*   **Role in Research Pipeline:** Directly answers the paper's central research questions by **quantifying and testing the significance of the institutional moderation effects**.

**Task 15: `run_temporal_analysis_pipeline`**
*   **Inputs:** The dictionary of temporal subsamples and the full DataFrame.
*   **Processes:**
    1.  Re-runs the estimation and marginal effect calculation on each temporal subsample.
    2.  Runs a single, pooled regression with a triple-interaction term to formally test for a structural break.
*   **Outputs:** A dictionary containing a DataFrame of period-specific effects and the results of the structural break test.
*   **Transformation:** Transforms a static model into a dynamic analysis of the stability of findings over time.
*   **Role in Research Pipeline:** Provides the quantitative evidence for the **temporal heterogeneity** of the results, as shown in Figure 4.

#### **Phase 6: Robustness Analysis**

**Task 16-18: `run_all_robustness_analyses`**
*   **Inputs:** The baseline DataFrame, baseline model specification, and the `ProjectConstants` object.
*   **Processes:** This meta-task orchestrates the entire suite of robustness checks. It programmatically generates configurations for dozens of alternative models (e.g., excluding China, using alternative variables), and then executes the estimation for each in parallel.
*   **Outputs:** A dictionary where keys are the names of the robustness checks and values are the resulting fitted model objects.
*   **Transformation:** Transforms a single baseline analysis into a comprehensive sensitivity analysis.
*   **Role in Research Pipeline:** This is the **Sensitivity Analysis** phase, ensuring that the main findings are not artifacts of a specific sample or model specification.

#### **Phase 7: Results Presentation**

**Task 19: `run_results_presentation_pipeline`**
*   **Inputs:** All fitted model objects from the main and robustness analyses.
*   **Processes:**
    1.  Uses the `modelsummary` library to combine multiple model results into publication-quality tables.
    2.  Creates a custom summary table comparing key coefficients across all robustness checks.
*   **Outputs:** A dictionary of `pandas.DataFrame` objects, each representing a formatted table.
*   **Transformation:** Transforms complex model objects into human-readable presentation tables.
*   **Role in Research Pipeline:** This is the **Results Reporting** stage, programmatically generating the final tables (like Table 4).

**Task 20: `generate_and_save_all_figures`**
*   **Role in Research Pipeline:** This is the **Visualization** stage. It takes the calculated effects and model results and transforms them into the key figures of the paper (like Figures 3 and 4), providing an intuitive visual summary of the findings.

**Task 21: `generate_synthesis_report`**
*   **Inputs:** All major results objects from the entire pipeline.
*   **Processes:** Programmatically extracts the key quantitative findings, assesses their statistical and economic significance, and assembles them into a structured text-based report.
*   **Outputs:** A nested dictionary containing a full, reproducible summary of the project's conclusions.
*   **Transformation:** Transforms a collection of quantitative results into a coherent, interpretive narrative.
*   **Role in Research Pipeline:** This is the **Final Synthesis and Conclusion** stage, automating the process of writing the abstract and conclusion.

<br><br>

###**Usage Example**

*   **Ideal Data Structure:** The example will be a Python script. It will demonstrate the instantiation of the required inputs: the `master_df` DataFrame and the `config` dictionary.
*   **Accurate Implementation:**
    1.  **Imports:** The script will begin by importing necessary libraries (`pandas`, `numpy`, `yaml`) and, crucially, the `execute_full_research_pipeline` function itself, along with any other required callables from the pipeline module.
    2.  **Configuration Loading:** It will demonstrate the correct way to load the `config.yaml` file created in the previous step. This involves using the `PyYAML` library (`yaml.safe_load()`) to parse the YAML file into a Python dictionary. This is a critical step in a configuration-driven workflow.
    3.  **Data Generation (Mock-up):** Since we do not have the actual raw data, the script will programmatically generate a small, but structurally correct, mock `master_df`. This mock DataFrame will:
        *   Contain all 26 columns specified in the `define_full_schema` function with the correct dtypes.
        *   Have a `pandas.DatetimeIndex`.
        *   Contain data for a few dyads (e.g., USA-CHN, USA-CAN, DEU-FRA) over a few years.
        *   The data will be generated using `numpy.random` but will be constrained to plausible ranges (e.g., Polity scores between -10 and 10, trade values non-negative). This makes the example fully self-contained and runnable.
    4.  **Pipeline Execution:** The script will then make the call to `execute_full_research_pipeline`, passing the loaded `config`, the mock `master_df`, and a specified output directory path.
    5.  **Clarity and Annotation:** Every step will be preceded by comments explaining what is being done and why, making the script a clear tutorial for a new user of the pipeline.
*   **Anticipated Challenges & Resolutions:**
    *   **Challenge:** The full pipeline is computationally expensive and would take a long time to run.
    *   **Resolution:** The example will demonstrate how to use the `run_intensive_logit=False` and `run_robustness_checks=False` flags in the orchestrator. This allows a user to run a "smoke test" of the entire data preparation and baseline estimation pipeline quickly, before committing to the hours-long full run.
*   **Module Selection:** `pandas`, `numpy`, `pyyaml`, `pathlib`.
*   **Completeness & Best Practices:** This example demonstrates the complete, intended workflow. It shows how to separate configuration from code (loading the YAML file), how to structure the input data, and how to launch the entire analysis with a single function call. Providing a runnable example with mock data is a crucial part of delivering a professional-grade analytical tool.

--

#### Example: Using the End-to-End Pipeline

This script serves as a complete, executable example of how to use the `execute_full_research_pipeline` function.

```python
# ==============================================================================
# main_analysis.py
#
# Example script to execute the full end-to-end research pipeline for the
# replication of Hardwick (2025), "Trade, Political Distance and the World
# Trade Organization".
#
# This script demonstrates:
#   1. Loading the master configuration from a YAML file.
#   2. Creating a structurally correct (mock) input DataFrame.
#   3. Calling the top-level orchestrator to run the entire analysis.
# ==============================================================================

# --- 1. Import necessary libraries and the main pipeline function ---

# Standard library imports
from pathlib import Path
import yaml

# Third-party imports
import pandas as pd
import numpy as np

# Import the main orchestrator function from the project's module.
# It is assumed all other required functions are available within that module.
# from geopolitical_trade_pipeline.main import execute_full_research_pipeline
# from geopolitical_trade_pipeline.data_validation import define_full_schema

# For this self-contained example, we assume the functions are defined in scope.

def create_mock_master_df(
    years: range = range(2000, 2010),
    dyads: list = [('USA', 'CHN'), ('USA', 'CAN'), ('DEU', 'FRA')]
) -> pd.DataFrame:
    """Creates a small, structurally correct mock DataFrame for demonstration."""
    
    # Get the full schema to ensure all columns are created.
    schema = define_full_schema()
    
    # Create the panel structure.
    index = pd.to_datetime([f"{y}-01-01" for y in years])
    df_list = []
    for exp, imp in dyads:
        df_dyad = pd.DataFrame(index=index)
        df_dyad['ExporterISO'] = exp
        df_dyad['ImporterISO'] = imp
        df_list.append(df_dyad)
    
    df = pd.concat(df_list)
    df.index.name = 'DateTime'
    
    # Populate with plausible random data.
    rng = np.random.default_rng(2025)
    n_obs = len(df)
    
    df['DyadicPair'] = df['ExporterISO'] + '_' + df['ImporterISO']
    df['BilateralTradeValue_USD'] = rng.exponential(1e9, n_obs)
    df['TradeProd_SectorCount'] = rng.integers(0, 10, n_obs)
    df['BACI_TotalValue_USD'] = df['BilateralTradeValue_USD'] * rng.uniform(0.9, 1.1, n_obs)
    df['GDELT_GoldsteinMean'] = rng.uniform(-5, 5, n_obs)
    df['GDELT_GoldsteinStd'] = rng.uniform(0, 3, n_obs)
    df['GDELT_EventCount'] = rng.integers(0, 50, n_obs)
    df['GATTWTO_Both'] = 1
    df['GATTWTO_One'] = 0
    df['RTA'] = rng.choice([0, 1], n_obs)
    df['GDP_USD_Exporter'] = rng.uniform(1e12, 2e13, n_obs)
    df['GDP_USD_Importer'] = rng.uniform(1e12, 2e13, n_obs)
    df['TotalExports_USD_Exporter'] = df['GDP_USD_Exporter'] * rng.uniform(0.1, 0.3, n_obs)
    df['TotalExports_USD_Importer'] = df['GDP_USD_Importer'] * rng.uniform(0.1, 0.3, n_obs)
    df['DomesticTrade_Exporter'] = df['GDP_USD_Exporter'] - df['TotalExports_USD_Exporter']
    df['DomesticTrade_Importer'] = df['GDP_USD_Importer'] - df['TotalExports_USD_Importer']
    df['PolityScore_Exporter'] = rng.integers(-10, 11, n_obs)
    df['PolityScore_Importer'] = rng.integers(-10, 11, n_obs)
    df['CorruptionIndex_Exporter'] = rng.uniform(0, 1, n_obs)
    df['CorruptionIndex_Importer'] = rng.uniform(0, 1, n_obs)
    df['DemocraticPair'] = ((df['PolityScore_Exporter'] > 6) & (df['PolityScore_Importer'] > 6)).astype(int)
    df['PoliticalDistance'] = np.abs(df['PolityScore_Exporter'] - df['PolityScore_Importer'])
    df['IdealPoint_Exporter'] = rng.normal(0, 1, n_obs)
    df['IdealPoint_Importer'] = rng.normal(0, 1, n_obs)
    
    # Ensure dtypes match the schema.
    for col, dtype in schema.items():
        df[col] = df[col].astype(dtype)
        
    return df

if __name__ == '__main__':
    
    # --- 2. Define Inputs for the Pipeline ---

    # Define the path to the master configuration file.
    # In a real project, this file would be managed and version controlled.
    config_path = Path("config.yaml") # Assumes the YAML file is in the same directory.

    # Define the directory where all outputs (tables, figures, reports) will be saved.
    output_directory = Path("./research_outputs")

    # --- 3. Load Configuration from YAML file ---

    print(f"Loading configuration from: {config_path}")
    # Use a try-except block for robust file handling.
    try:
        with open(config_path, 'r') as f:
            # `yaml.safe_load` is the standard, secure way to parse YAML files.
            master_config = yaml.safe_load(f)
    except FileNotFoundError:
        print(f"ERROR: Configuration file not found at {config_path}. Exiting.")
        exit()
    except yaml.YAMLError as e:
        print(f"ERROR: Failed to parse YAML configuration file. Details: {e}. Exiting.")
        exit()

    # --- 4. Load or Create the Master DataFrame ---

    print("Loading/Creating master DataFrame...")
    # In a real project, this step would involve loading a large CSV or Parquet file.
    # For this example, we generate a small, structurally correct mock DataFrame.
    # This ensures the example is fully runnable without external data dependencies.
    master_input_df = create_mock_master_df()
    print(f"DataFrame created with {len(master_input_df)} observations.")
    print("Data Schema:\n", master_input_df.info())

    # --- 5. Execute the End-to-End Research Pipeline ---

    print("\n" + "="*80)
    print("STARTING END-TO-END RESEARCH PIPELINE")
    print("="*80)

    # This is the single call that runs the entire analysis.
    # For this example run, we disable the most computationally intensive parts
    # to allow for a quick "smoke test" of the pipeline's integrity.
    # For a full replication, these would be set to True.
    execute_full_research_pipeline(
        master_df=master_input_df,
        config=master_config,
        output_dir=str(output_directory),
        run_intensive_logit=False,       # Set to True for full replication
        run_robustness_checks=False    # Set to True for full replication
    )

    print("\n" + "="*80)
    print("PIPELINE EXECUTION FINISHED")
    print("="*80)

```



In [None]:
# Task 1: Parameter Validation and Configuration Setup

# ==============================================================================
# Task 1, Step 1: Configuration Dictionary Structure Validation
# ==============================================================================

class ParticleFilterParams(BaseModel):
    """
    A Pydantic model for validating the 'particle_filter_params' sub-dictionary.

    This model defines the precise schema for parameters related to the Particle
    Filter algorithm used for smoothing the GDELT political distance index. It
    enforces strict type and value constraints on each parameter to ensure
    methodological consistency with the source paper.

    Attributes:
        M_particles (int): The number of particles to use in the filter. Must be
                           exactly 1000 as per Appendix A.1.
        observation_variance_function (str): The string representation of the
                                             lambda function for observation
                                             variance, R_ijt.
        process_variance_estimation (str): A descriptive string specifying the
                                           method for estimating process
                                           variance, Q_ij.
        process_variance_floor (float): The minimum allowed value for the
                                        process variance to ensure numerical
                                        stability. Must be 1e-6.
        process_variance_default (float): The default process variance to use
                                          when AR(1) estimation fails. Must be
                                          0.001.
        baseline_min_obs_threshold (int): The minimum number of non-zero monthly
                                          observations required for a dyadic
                                          pair to be included in the baseline
                                          analysis (50% of sample). Must be 270.
        robustness_min_obs_threshold (int): The stricter minimum observation
                                            threshold for robustness checks
                                            (60% of sample). Must be 324.
    """
    # Configure the model to forbid any extra fields not defined in this schema.
    # This ensures the configuration dictionary does not contain unexpected keys.
    model_config = ConfigDict(extra='forbid')

    # Define the number of particles, must be an integer equal to 1000.
    # Source: Appendix A.1, page 27
    M_particles: int = Field(
        ...,
        description="Number of particles for the Sequential Monte Carlo filter.",
        eq=1000
    )

    # Define the string for the observation variance function.
    # Equation: R_ijt = 1 / (n_ijt + 1)
    # Source: Appendix A.1, page 27
    observation_variance_function: str = Field(
        ...,
        description="String representation of the observation variance function.",
        eq="lambda n_ijt: 1 / (n_ijt + 1)"
    )

    # Define the description for the process variance estimation method.
    # Source: Appendix A.1, page 27
    process_variance_estimation: str = Field(
        ...,
        description="Method to estimate process variance Q_ij.",
        eq="Residual variance of AR(1) fit to y_ijt series"
    )

    # Define the floor for the process variance, must be a float equal to 1e-6.
    # Source: Appendix A.1, page 27
    process_variance_floor: float = Field(
        ...,
        description="Minimum value for process variance to ensure stability.",
        eq=1e-6
    )

    # Define the default value for the process variance, must be a float equal to 0.001.
    # Source: Appendix A.1, page 27
    process_variance_default: float = Field(
        ...,
        description="Default process variance for failed AR(1) estimations.",
        eq=0.001
    )

    # Define the baseline threshold for minimum observations, must be an integer equal to 270.
    # Source: Appendix A.1, page 27
    baseline_min_obs_threshold: int = Field(
        ...,
        description="Minimum non-zero observations for baseline sample.",
        eq=270
    )

    # Define the robustness threshold for minimum observations, must be an integer equal to 324.
    # Source: Appendix A.1, page 27
    robustness_min_obs_threshold: int = Field(
        ...,
        description="Stricter minimum observations for robustness check sample.",
        eq=324
    )


class EstimationParams(BaseModel):
    """
    A Pydantic model for validating the 'estimation_params' sub-dictionary.

    This model specifies the schema for parameters governing the econometric
    estimation phase, including the choice of estimators, fixed effects
    structure, and methods for standard error calculation.

    Attributes:
        primary_estimator (str): The main estimator for the gravity model. Must
                                 be "PPML".
        extensive_margin_estimator (str): The estimator for the extensive margin
                                          analysis. Must be "Logit".
        fixed_effects_structure (List[str]): A list of strings defining the
                                             high-dimensional fixed effects to
                                             be absorbed in the models.
        ppml_se_method (str): The method for calculating standard errors in the
                              PPML models.
        logit_se_method (str): The method for calculating standard errors in the
                               Logit models.
        bootstrap_replications (int): The number of replications for bootstrap-
                                      based standard error calculations. Must
                                      be 1000.
    """
    # Configure the model to forbid any extra, undefined fields.
    model_config = ConfigDict(extra='forbid')

    # Define the primary estimator, must be the string "PPML".
    # Source: Section 3, page 10
    primary_estimator: str = Field(
        ...,
        description="Primary estimator for the structural gravity model.",
        eq="PPML"
    )

    # Define the extensive margin estimator, must be the string "Logit".
    # Source: Section 3, page 11
    extensive_margin_estimator: str = Field(
        ...,
        description="Estimator for the extensive margin (binary/fractional) models.",
        eq="Logit"
    )

    # Define the fixed effects structure, must be a list of exactly 4 strings.
    # Source: Section 3, page 10
    fixed_effects_structure: List[str] = Field(
        ...,
        description="High-dimensional fixed effects structure for gravity models.",
        min_length=4,
        max_length=4
    )

    # Define a custom validator to ensure the list contains the exact required elements.
    @field_validator('fixed_effects_structure')
    def check_exact_fe_structure(cls, v: List[str]) -> List[str]:
        """Validates the exact content of the fixed_effects_structure list."""
        # Define the expected list of fixed effects strings.
        expected = [
            "ExporterISO + Year",
            "ImporterISO + Year",
            "ExporterISO + ImporterISO",
            "Border + Year"
        ]
        # Compare the input list with the expected list.
        if v != expected:
            # If they do not match, raise a ValueError with a precise error message.
            raise ValueError(f"fixed_effects_structure must be exactly {expected}")
        # If validation passes, return the original value.
        return v

    # Define the method for PPML standard errors.
    # Source: Note to Table 2, page 14
    ppml_se_method: str = Field(
        ...,
        description="Method for PPML standard error calculation.",
        eq="Clustered by ExporterISO-ImporterISO pair"
    )

    # Define the method for Logit standard errors.
    # Source: Section 3, page 12
    logit_se_method: str = Field(
        ...,
        description="Method for Logit standard error calculation.",
        eq="Split-panel jackknife with bootstrap"
    )

    # Define the number of bootstrap replications, must be an integer equal to 1000.
    # Source: Note to Table 3, page 15
    bootstrap_replications: int = Field(
        ...,
        description="Number of replications for bootstrap procedures.",
        eq=1000
    )


class RobustnessCheckParams(BaseModel):
    """
    A Pydantic model for validating the 'robustness_check_params' sub-dictionary.

    This model defines the schema for parameters that control the various
    robustness checks performed in the study, such as alternative sample
    definitions and variable specifications.

    Attributes:
        democratic_pair_definition (str): The logical condition defining a
                                          "democratic pair" based on Polity
                                          scores.
        pd_trimming_rule (str): A description of the rule for trimming outliers
                                from the political distance distribution.
        alternative_governance_vars (List[str]): A list of alternative
                                                 governance indicators to be
                                                 used in robustness checks.
        temporal_aggregation_robustness_frequency (str): The temporal frequency
                                                         to use for robustness
                                                         checks (e.g., "Quarterly").
        first_month_gdelt_measure (str): A description of the rule for creating
                                         the quarterly GDELT measure.
    """
    # Configure the model to forbid any extra, undefined fields.
    model_config = ConfigDict(extra='forbid')

    # Define the rule for identifying democratic pairs.
    # Source: Note to Table A4, page 31
    democratic_pair_definition: str = Field(
        ...,
        description="Rule for defining a democratic dyad based on Polity scores.",
        eq="Polity_i > 6 and Polity_j > 6"
    )

    # Define the rule for trimming the political distance distribution.
    # Source: Table A4, Column 4 Header, page 31 & Footnote 7, page 16
    pd_trimming_rule: str = Field(
        ...,
        description="Rule for trimming outliers from the political distance variable.",
        eq="Drop top and bottom 1% of the PD distribution"
    )

    # Define the list of alternative governance variables.
    # Source: Note to Table A4, page 31
    alternative_governance_vars: List[str] = Field(
        ...,
        description="Alternative governance indicators for robustness checks.",
        min_length=2,
        max_length=2
    )

    # Define a custom validator to check the exact contents of the list.
    @field_validator('alternative_governance_vars')
    def check_exact_gov_vars(cls, v: List[str]) -> List[str]:
        """Validates the exact content of the alternative_governance_vars list."""
        # Define the expected list of governance variable names.
        expected = [
            "WGI_Voice_and_Accountability",
            "WGI_Rule_of_Law"
        ]
        # Compare the sorted input list with the sorted expected list for order-insensitivity.
        if sorted(v) != sorted(expected):
            # If they do not match, raise a ValueError.
            raise ValueError(f"alternative_governance_vars must contain exactly {expected}")
        # If validation passes, return the original value.
        return v

    # Define the frequency for temporal aggregation robustness checks.
    # Source: Table A10, Columns 3 & 4 Headers, page 36
    temporal_aggregation_robustness_frequency: str = Field(
        ...,
        description="Temporal frequency for aggregation robustness checks.",
        eq="Quarterly"
    )

    # Define the rule for the first-month GDELT measure.
    # Source: Table A10, Column 4 Header, page 36
    first_month_gdelt_measure: str = Field(
        ...,
        description="Rule for constructing the quarterly GDELT measure.",
        eq="Use GDELT index value from only the first month of each quarter"
    )


class Parameters(BaseModel):
    """
    A Pydantic model that nests the parameter sub-dictionaries.

    This model acts as an intermediate container, ensuring that the main
    'parameters' key in the configuration dictionary contains the three
    expected sub-dictionaries and nothing else.

    Attributes:
        particle_filter_params (ParticleFilterParams): The validated particle
                                                      filter parameters object.
        estimation_params (EstimationParams): The validated estimation
                                              parameters object.
        robustness_check_params (RobustnessCheckParams): The validated
                                                         robustness check
                                                         parameters object.
    """
    # Configure the model to forbid any extra, undefined fields.
    model_config = ConfigDict(extra='forbid')

    # Nest the previously defined Pydantic models as fields.
    # Pydantic will automatically delegate validation to these sub-models.
    particle_filter_params: ParticleFilterParams
    estimation_params: EstimationParams
    robustness_check_params: RobustnessCheckParams


class MasterConfig(BaseModel):
    """
    The top-level Pydantic model for validating the entire configuration dictionary.

    This model defines the root structure of the configuration, which must
    contain a single key, 'parameters', whose value is an object that conforms
    to the `Parameters` schema.

    Attributes:
        parameters (Parameters): The validated, nested object containing all
                                 project parameters.
    """
    # Configure the model to forbid any extra fields at the top level.
    model_config = ConfigDict(extra='forbid')

    # Define the single top-level key required in the configuration dictionary.
    parameters: Parameters


def validate_config_structure(config: Dict[str, Any]) -> MasterConfig:
    """
    Validates the structure, types, and exact values of the master configuration
    dictionary using a Pydantic schema.

    This function ensures that the configuration dictionary adheres precisely to
    the specifications required for the replication study, preventing runtime
    errors due to malformed or incorrect configuration parameters.

    Args:
        config (Dict[str, Any]): The master configuration dictionary to be
                                 validated.

    Returns:
        MasterConfig: A validated Pydantic model instance of the configuration,
                      providing attribute-style access and type safety.

    Raises:
        ValueError: If the configuration dictionary fails validation. The error
                    message contains detailed information about the validation
                    failures, including the exact location and nature of the
                    discrepancy.
    """
    # Check if the input is a dictionary.
    if not isinstance(config, dict):
        raise TypeError("Configuration must be a dictionary.")

    try:
        # Attempt to parse and validate the input dictionary against the schema.
        # Pydantic handles the recursive validation of all nested models.
        validated_config = MasterConfig.model_validate(config)

        # Return the validated Pydantic object upon success.
        return validated_config

    except ValidationError as e:
        # If Pydantic's validation fails, catch the detailed error.
        # Format the error into a clear, actionable message and raise a
        # standard ValueError, which is more conventional for parameter validation.
        error_details = e.errors()

        # Create a formatted, readable error report from Pydantic's output.
        report = "\n".join(
            [f"  - Location: {'.'.join(map(str, err['loc']))}, Error: {err['msg']}" for err in error_details]
        )

        # Raise a comprehensive ValueError.
        raise ValueError(f"Master configuration validation failed with the following errors:\n{report}")


# ==============================================================================
# Task 1, Step 2: Lambda Function and Functional Parameter Parsing
# ==============================================================================

def _is_safe_ast_node(node: ast.AST) -> bool:
    """
    Recursively checks if an Abstract Syntax Tree (AST) node and all its
    children are within a predefined whitelist of safe nodes.

    This is a security helper function to prevent arbitrary code execution when
    parsing the lambda function string from the configuration.

    Args:
        node (ast.AST): The AST node to inspect.

    Returns:
        bool: True if the node and all its children are safe, False otherwise.
    """
    # Define the whitelist of allowed AST node types. This list permits
    # basic arithmetic operations, variable names, numbers, and the lambda form.
    # It explicitly disallows imports, function calls (except lambda), etc.
    SAFE_NODES = {
        ast.Expression, ast.Lambda, ast.Name, ast.Load, ast.BinOp,
        ast.UnaryOp, ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow,
        ast.Num, ast.Constant, ast.arg, ast.arguments
    }

    # Check if the current node's type is in the whitelist.
    if type(node) not in SAFE_NODES:
        return False

    # Recursively check all child nodes of the current node.
    for child_node in ast.iter_child_nodes(node):
        if not _is_safe_ast_node(child_node):
            return False

    # If the node and all its children are safe, return True.
    return True


def parse_functional_parameters(validated_config: MasterConfig) -> Dict[str, Any]:
    """
    Parses string-based functional parameters from the validated configuration
    into executable Python objects and validates numerical bounds.

    This function safely evaluates the lambda string for the observation
    variance function after a rigorous security check using Abstract Syntax
    Trees (AST). It also extracts and validates key numerical parameters.

    Args:
        validated_config (MasterConfig): The validated Pydantic model instance
                                         of the configuration.

    Returns:
        Dict[str, Any]: A dictionary containing the parsed callable function
                        and validated numerical parameters, ready for use in
                        downstream computations.

    Raises:
        ValueError: If the lambda string is syntactically incorrect, contains
                    unsafe operations, or if numerical parameters are invalid.
        TypeError: If the parsed lambda string does not result in a callable
                   function.
    """
    # Initialize a dictionary to store the parsed functional parameters.
    functional_params = {}

    # --- Parse and Validate Observation Variance Function ---

    # Extract the lambda string from the validated configuration.
    lambda_str = validated_config.parameters.particle_filter_params.observation_variance_function

    try:
        # Safely parse the string into an Abstract Syntax Tree (AST).
        # This will fail with a SyntaxError for malformed Python code.
        tree = ast.parse(lambda_str, mode='eval')

        # Perform a security check on the AST to ensure it contains only whitelisted operations.
        if not _is_safe_ast_node(tree):
            # If the AST contains disallowed nodes, raise a security error.
            raise ValueError(f"Lambda string '{lambda_str}' contains unsafe operations.")

        # If the AST is safe, compile and evaluate it in a restricted namespace.
        # The 'globals' dictionary is empty, preventing access to any modules or functions.
        # The '__builtins__' are also cleared for maximum security.
        observation_func = eval(compile(tree, filename='<string>', mode='eval'), {"__builtins__": {}})

        # Verify that the evaluated object is a callable function.
        if not isinstance(observation_func, Callable):
            raise TypeError("Parsed observation_variance_function is not a callable function.")

        # Perform a functional test on the parsed lambda function.
        # Test with a sample input to ensure it behaves as expected.
        test_input = 100
        expected_output = 1 / (test_input + 1)
        if not np.isclose(observation_func(test_input), expected_output):
            raise ValueError("Parsed lambda function failed functional validation test.")

        # Store the validated, callable function.
        functional_params['R_function'] = observation_func

    except SyntaxError:
        # Catch parsing errors for malformed lambda strings.
        raise ValueError(f"Lambda string '{lambda_str}' is not valid Python syntax.")
    except Exception as e:
        # Re-raise other validation errors with context.
        raise ValueError(f"Failed to parse functional parameters: {e}")

    # --- Extract and Validate Numerical Parameters ---

    # Extract the process variance floor.
    q_floor = validated_config.parameters.particle_filter_params.process_variance_floor
    # Validate that it is a non-negative float.
    if not (isinstance(q_floor, float) and q_floor >= 0):
        raise ValueError("process_variance_floor must be a non-negative float.")
    functional_params['Q_floor'] = q_floor

    # Extract the process variance default value.
    q_default = validated_config.parameters.particle_filter_params.process_variance_default
    # Validate that it is a positive float.
    if not (isinstance(q_default, float) and q_default > 0):
        raise ValueError("process_variance_default must be a positive float.")
    functional_params['Q_default'] = q_default

    # Return the dictionary of parsed and validated parameters.
    return functional_params


# ==============================================================================
# Task 1, Step 3: Global Constants and Derived Parameters Initialization
# ==============================================================================

@dataclass(frozen=True)
class ProjectConstants:
    """
    An immutable dataclass to store and provide access to all validated project
    constants and derived parameters.

    This class serves as the single source of truth for all configuration
    parameters throughout the research pipeline, ensuring consistency and
    preventing accidental modification of parameters during runtime. The `frozen=True`
    argument makes instances of this class immutable.

    Attributes:
        M_PARTICLES (Final[int]): Number of particles for the Particle Filter.
        R_FUNCTION (Final[Callable[[int], float]]): Parsed observation variance function.
        Q_FLOOR (Final[float]): Floor for the process variance (Q_ij).
        Q_DEFAULT (Final[float]): Default value for process variance if AR(1) fit fails.
        BASELINE_MIN_OBS (Final[int]): Minimum non-zero observations for baseline GDELT index.
        ROBUSTNESS_MIN_OBS (Final[int]): Stricter minimum observations for robustness check.
        N_BOOTSTRAP (Final[int]): Number of bootstrap replications for Logit SEs.
    """
    # Define class attributes with Final type hints for clarity and static analysis.
    M_PARTICLES: Final[int]
    R_FUNCTION: Final[Callable[[int], float]]
    Q_FLOOR: Final[float]
    Q_DEFAULT: Final[float]
    BASELINE_MIN_OBS: Final[int]
    ROBUSTNESS_MIN_OBS: Final[int]
    N_BOOTSTRAP: Final[int]

    def __post_init__(self):
        """
        Performs post-initialization validation on the constants.

        This method is automatically called by the dataclass constructor after
        initialization. It is used here to enforce logical constraints between
        parameters that cannot be checked by simple type or value validation.

        Raises:
            ValueError: If any cross-parameter validation checks fail.
        """
        # Validate the logical relationship between Q_floor and Q_default.
        # The default variance must be greater than or equal to the floor.
        if self.Q_DEFAULT < self.Q_FLOOR:
            raise ValueError(
                f"Q_default ({self.Q_DEFAULT}) must be greater than or equal to "
                f"Q_floor ({self.Q_FLOOR})."
            )

        # Validate the logical relationship between observation thresholds.
        # The robustness threshold should be stricter (greater) than the baseline.
        if self.ROBUSTNESS_MIN_OBS <= self.BASELINE_MIN_OBS:
            raise ValueError(
                f"Robustness minimum observations ({self.ROBUSTNESS_MIN_OBS}) "
                f"must be strictly greater than the baseline minimum "
                f"({self.BASELINE_MIN_OBS})."
            )


def initialize_project_constants(
    validated_config: MasterConfig,
    functional_params: Dict[str, Any]
) -> ProjectConstants:
    """
    Initializes and validates the immutable ProjectConstants object.

    This function extracts parameters from the validated configuration and the
    parsed functional parameters dictionary, instantiates the frozen dataclass,
    and runs post-initialization validation checks.

    Args:
        validated_config (MasterConfig): The Pydantic-validated configuration object.
        functional_params (Dict[str, Any]): The dictionary of parsed callables
                                            and validated numerical parameters.

    Returns:
        ProjectConstants: An immutable, validated dataclass instance containing
                          all necessary project constants.

    Raises:
        KeyError: If a required key is missing from the input dictionaries.
        ValueError: If post-initialization validation within the dataclass fails.
    """
    try:
        # Instantiate the frozen dataclass, mapping config values to attributes.
        # This structure is explicit and robust to changes in dictionary order.
        constants = ProjectConstants(
            M_PARTICLES=validated_config.parameters.particle_filter_params.M_particles,
            R_FUNCTION=functional_params['R_function'],
            Q_FLOOR=functional_params['Q_floor'],
            Q_DEFAULT=functional_params['Q_default'],
            BASELINE_MIN_OBS=validated_config.parameters.particle_filter_params.baseline_min_obs_threshold,
            ROBUSTNESS_MIN_OBS=validated_config.parameters.particle_filter_params.robustness_min_obs_threshold,
            N_BOOTSTRAP=validated_config.parameters.estimation_params.bootstrap_replications
        )

        # Return the successfully created and validated constants object.
        return constants

    except (KeyError, AttributeError) as e:
        # Catch potential errors if the input objects are malformed,
        # which should not happen if they come from the previous validation steps.
        raise KeyError(f"Failed to initialize constants due to missing parameter: {e}")
    except (ValueError, TypeError) as e:
        # Catch validation errors from the dataclass's __post_init__ method.
        raise ValueError(f"Post-initialization validation of constants failed: {e}")


# ==============================================================================
# Task 1: Master Orchestrator Function
# ==============================================================================

def run_configuration_and_validation(
    config: Dict[str, Any]
) -> ProjectConstants:
    """
    Orchestrates the entire configuration validation and setup process.

    This master function serves as the single entry point for Task 1. It executes
    the three core steps in sequence:
    1. Validates the raw configuration dictionary's structure and values.
    2. Parses functional parameters (e.g., lambda strings) into callable objects.
    3. Initializes a final, immutable dataclass of project constants.

    The function is designed to be transactional: it either completes
    successfully and returns the constants object, or it fails and raises a
    single, comprehensive error.

    Args:
        config (Dict[str, Any]): The raw master configuration dictionary for the
                                 replication study.

    Returns:
        ProjectConstants: An immutable, fully validated dataclass instance
                          containing all project parameters, ready for use in
                          the subsequent phases of the analysis.

    Raises:
        ValueError: If any stage of the validation or parsing fails.
        TypeError: If the input configuration is not a dictionary or if a
                   parsed function is not callable.
    """
    try:
        # Step 1: Validate the overall structure, types, and values of the config dict.
        # This step ensures the configuration conforms to the required schema.
        print("Step 1: Validating configuration structure...")
        validated_config_model = validate_config_structure(config)
        print("Step 1: Success. Configuration structure is valid.")

        # Step 2: Parse string-based functions and validate functional parameters.
        # This step safely converts strings to callables and checks numerical bounds.
        print("Step 2: Parsing functional parameters...")
        parsed_functional_params = parse_functional_parameters(validated_config_model)
        print("Step 2: Success. Functional parameters parsed and validated.")

        # Step 3: Initialize the final, immutable container for all project constants.
        # This step creates the single source of truth for all downstream tasks.
        print("Step 3: Initializing project constants object...")
        project_constants = initialize_project_constants(
            validated_config=validated_config_model,
            functional_params=parsed_functional_params
        )
        print("Step 3: Success. Immutable project constants object created.")

        # If all steps succeed, return the final constants object.
        return project_constants

    except (ValueError, TypeError, KeyError) as e:
        # Catch any exception raised during the process.
        # This provides a single, clear failure point for the entire task.
        print(f"ERROR: Configuration and validation failed.")
        # Re-raise the exception to halt execution and provide a detailed error message.
        raise e


In [None]:
# Task 2: DataFrame Structure Validation and Quality Assessment

# ==============================================================================
# Task 2, Step 1: Core DataFrame Schema Validation
# ==============================================================================

def validate_dataframe_schema(
    df: pd.DataFrame,
    expected_columns: Dict[str, Type]
) -> None:
    """
    Validates the core schema of the input DataFrame against expected columns and types.

    This function performs a series of rigorous checks to ensure the DataFrame's
    structure is perfectly aligned with the project's requirements. It validates:
    1. The exact number of columns.
    2. The exact names of all columns.
    3. The data type of each column.
    4. The format of ISO country codes and the DyadicPair identifier.
    5. The integrity of the DatetimeIndex, including uniqueness within dyads.

    Args:
        df (pd.DataFrame): The DataFrame to be validated.
        expected_columns (Dict[str, Type]): A dictionary mapping column names
                                            to their expected data types.

    Raises:
        ValueError: If any schema validation check fails, with a detailed
                    message explaining the discrepancy.
        TypeError: If the input is not a pandas DataFrame.
    """
    # --- Initial Input Validation ---
    # Ensure the input object is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")

    # --- Column Count and Name Validation ---
    # Check if the number of columns matches the expected count (24).
    if len(df.columns) != len(expected_columns):
        raise ValueError(
            f"DataFrame must have exactly {len(expected_columns)} columns, but found {len(df.columns)}."
        )
    # Check if the set of column names matches the expected set.
    if set(df.columns) != set(expected_columns.keys()):
        missing = set(expected_columns.keys()) - set(df.columns)
        extra = set(df.columns) - set(expected_columns.keys())
        raise ValueError(
            f"Column name mismatch. Missing: {missing if missing else 'None'}. "
            f"Extra: {extra if extra else 'None'}."
        )

    # --- Data Type Validation ---
    # Iterate through each expected column and its type to validate dtypes.
    for col, expected_dtype in expected_columns.items():
        # Compare the actual dtype with the expected dtype.
        if df[col].dtype != expected_dtype:
            raise TypeError(
                f"Column '{col}' has incorrect dtype. Expected {expected_dtype}, "
                f"but found {df[col].dtype}."
            )

    # --- Format Validation for Key Identifier Columns ---
    # Define the regex pattern for ISO 3166-1 alpha-3 country codes.
    iso_pattern = re.compile(r'^[A-Z]{3}$')
    # Vectorized check for 'ExporterISO' format.
    if not df['ExporterISO'].str.match(iso_pattern).all():
        raise ValueError("Column 'ExporterISO' contains invalid ISO 3166-1 alpha-3 codes.")
    # Vectorized check for 'ImporterISO' format.
    if not df['ImporterISO'].str.match(iso_pattern).all():
        raise ValueError("Column 'ImporterISO' contains invalid ISO 3166-1 alpha-3 codes.")

    # --- DyadicPair Construction Validation ---
    # Construct the expected 'DyadicPair' series from its components.
    expected_dyadic_pair = df['ExporterISO'] + '_' + df['ImporterISO']
    # Perform a byte-exact, efficient comparison with the existing column.
    if not df['DyadicPair'].equals(expected_dyadic_pair):
        raise ValueError("Column 'DyadicPair' is not correctly constructed as 'ExporterISO_ImporterISO'.")

    # --- Index Validation ---
    # Check if the index is a DatetimeIndex and has the correct resolution.
    if not isinstance(df.index, pd.DatetimeIndex) or df.index.dtype != 'datetime64[ns]':
        raise TypeError("DataFrame index must be of type datetime64[ns].")
    # Check for duplicate timestamps within each dyadic pair.
    if not df.groupby('DyadicPair').apply(lambda x: x.index.is_unique).all():
        raise ValueError("Duplicate timestamps found within one or more dyadic pairs.")


# ==============================================================================
# Task 2, Step 2: Numerical Variables and Range Validation
# ==============================================================================

def validate_numerical_ranges(df: pd.DataFrame) -> None:
    """
    Validates the ranges and properties of key numerical variables.

    This function performs checks on numerical columns to ensure their values
    fall within theoretically or practically defined bounds. It handles NaN
    values correctly by excluding them from the validation checks.

    Args:
        df (pd.DataFrame): The DataFrame to be validated.

    Raises:
        ValueError: If any numerical variable fails its range or property check.
    """
    # --- Trade Value Validation (Non-negative) ---
    # Check that all non-null trade values are greater than or equal to zero.
    for col in ['BilateralTradeValue_USD', 'BACI_TotalValue_USD']:
        if not (df.loc[df[col].notna(), col] >= 0).all():
            raise ValueError(f"Column '{col}' contains negative values.")

    # --- GDELT Variable Validation ---
    # Check that Goldstein scores are within the [-10, +10] range.
    if not df['GDELT_GoldsteinMean'].dropna().between(-10, 10).all():
        raise ValueError("Column 'GDELT_GoldsteinMean' has values outside the [-10, 10] range.")
    # Check that event counts are non-negative integers.
    if not (df.loc[df['GDELT_EventCount'].notna(), 'GDELT_EventCount'] >= 0).all():
        raise ValueError("Column 'GDELT_EventCount' contains negative values.")

    # --- Governance Variable Validation ---
    # Check that Polity scores are within the [-10, +10] integer range.
    for col in ['PolityScore_Exporter', 'PolityScore_Importer']:
        if not df[col].dropna().between(-10, 10).all():
            raise ValueError(f"Column '{col}' has values outside the [-10, 10] range.")
    # Check that Corruption Index values are within the [0, 1] range.
    for col in ['CorruptionIndex_Exporter', 'CorruptionIndex_Importer']:
        if not df[col].dropna().between(0, 1).all():
            raise ValueError(f"Column '{col}' has values outside the [0, 1] range.")


# ==============================================================================
# Task 2, Step 3: Derived and Constructed Variables Validation
# ==============================================================================

def validate_derived_variables(df: pd.DataFrame) -> None:
    """
    Validates the logical and mathematical consistency of derived variables.

    This function cross-checks variables that are constructed from other columns
    to ensure they have been calculated correctly. It uses tolerance-based
    comparisons for floating-point arithmetic to avoid precision-related issues.

    Args:
        df (pd.DataFrame): The DataFrame to be validated.

    Raises:
        ValueError: If any derived variable is found to be inconsistent with
                    its source variables.
    """
    # --- DemocraticPair Validation ---
    # Create the expected boolean mask based on the definition.
    expected_democratic_mask = (df['PolityScore_Exporter'] > 6) & (df['PolityScore_Importer'] > 6)
    # Compare the existing column with the expected integer representation of the mask.
    if not df['DemocraticPair'].equals(expected_democratic_mask.astype(np.int64)):
        raise ValueError("Column 'DemocraticPair' is not correctly derived from Polity scores.")

    # --- PoliticalDistance Validation ---
    # Compute the expected political distance.
    expected_pol_dist = np.abs(df['PolityScore_Exporter'] - df['PolityScore_Importer'])
    # Use np.allclose for robust floating-point comparison, though source is int.
    if not np.allclose(df['PoliticalDistance'], expected_pol_dist.astype(np.float64), equal_nan=True):
        raise ValueError("Column 'PoliticalDistance' is not the absolute difference of Polity scores.")

    # --- DomesticTrade Validation (with 1% tolerance) ---
    # Compute expected domestic trade for exporter.
    expected_dom_trade_exp = df['GDP_USD_Exporter'] - df['TotalExports_USD_Exporter']
    # Use np.allclose with a relative tolerance of 1% for robust comparison.
    if not np.allclose(df['DomesticTrade_Exporter'], expected_dom_trade_exp, rtol=0.01, equal_nan=True):
        raise ValueError("Column 'DomesticTrade_Exporter' is not consistent with GDP and Exports (within 1% tolerance).")

    # --- Binary Institutional Variable Validation ---
    # Check that institutional indicators contain only 0s and 1s.
    for col in ['GATTWTO_Both', 'GATTWTO_One', 'RTA']:
        if not df[col].dropna().isin([0, 1]).all():
            raise ValueError(f"Column '{col}' must be a binary indicator (0 or 1).")


# ==============================================================================
# Task 2: Master Orchestrator Function
# ==============================================================================

def run_dataframe_validation_and_assessment(
    df: pd.DataFrame,
    expected_columns: Dict[str, Type]
) -> bool:
    """
    Orchestrates the entire DataFrame validation and quality assessment process.

    This master function executes the three core validation steps for Task 2 in
    a predefined sequence:
    1. Core schema validation (columns, types, identifiers).
    2. Numerical range validation.
    3. Derived variable consistency checks.

    It provides a single entry point for validating the input data, ensuring it
    is fit for the subsequent analysis phases. The function is transactional,
    halting and raising a detailed error upon the first validation failure.

    Args:
        df (pd.DataFrame): The input DataFrame to be validated.
        expected_columns (Dict[str, Type]): A dictionary mapping column names
                                            to their expected data types.

    Returns:
        bool: Returns True if all validation checks pass successfully.

    Raises:
        ValueError: If any validation check fails.
        TypeError: If the input is not a DataFrame or contains dtype errors.
    """
    try:
        # Step 1: Validate the fundamental structure and schema of the DataFrame.
        print("Step 1: Validating core DataFrame schema...")
        validate_dataframe_schema(df, expected_columns)
        print("Step 1: Success. Core schema is valid.")

        # Step 2: Validate the ranges of key numerical variables.
        print("Step 2: Validating numerical variable ranges...")
        validate_numerical_ranges(df)
        print("Step 2: Success. Numerical ranges are valid.")

        # Step 3: Validate the consistency of all derived and constructed variables.
        print("Step 3: Validating derived variable consistency...")
        validate_derived_variables(df)
        print("Step 3: Success. Derived variables are consistent.")

        # If all steps complete without raising an exception, the validation is successful.
        print("\nSUCCESS: DataFrame has passed all validation and quality checks.")
        return True

    except (ValueError, TypeError) as e:
        # Catch any validation error from the helper functions.
        print(f"\nERROR: DataFrame validation failed.")
        # Re-raise the specific error to provide a detailed failure report.
        raise e


In [None]:
# Task 3: Data Quality Assessment and Missing Value Analysis

# ==============================================================================
# Task 3, Step 1: Comprehensive Missing Value Audit
# ==============================================================================

def audit_missing_values(
    df: pd.DataFrame,
    high_missingness_threshold: float = 30.0
) -> Dict[str, Any]:
    """
    Performs a comprehensive audit of missing values in the DataFrame.

    This function calculates per-column missing value percentages, computes a
    correlation matrix of missingness to identify patterns, and flags dyadic
    pairs that exceed a specified threshold of missing data.

    Args:
        df (pd.DataFrame): The input DataFrame to audit.
        high_missingness_threshold (float): The percentage threshold (0-100)
                                            above which a dyadic pair is
                                            flagged for high missingness.

    Returns:
        Dict[str, Any]: A dictionary containing the audit results:
                        - 'missing_percentages': pd.Series with the percentage
                          of missing values for each column.
                        - 'missingness_correlation': pd.DataFrame showing the
                          correlation of missingness between columns.
                        - 'high_missingness_dyads': pd.Series of dyadic pairs
                          that exceed the missingness threshold.
    """
    # --- Input Validation ---
    # Verify the input is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    # Verify the threshold is a valid percentage.
    if not (0.0 <= high_missingness_threshold <= 100.0):
        raise ValueError("high_missingness_threshold must be between 0 and 100.")

    # --- Calculate Missing Value Percentages ---
    # Use the efficient vectorized .isnull().mean() method.
    missing_percentages = df.isnull().mean() * 100
    missing_percentages.name = "MissingValuePercentage"

    # --- Create Missingness Correlation Matrix ---
    # Convert the boolean DataFrame of nulls to integers (0 or 1).
    # Then, compute the Pearson correlation coefficient between these indicators.
    missingness_correlation = df.isnull().astype(int).corr()

    # --- Flag Dyadic Pairs with Excessive Missing Values ---
    # Group by 'DyadicPair' to analyze each time series independently.
    # For each group, calculate the mean of all boolean null indicators.
    # This gives the overall percentage of missing cells for that dyad.
    dyad_missingness = df.groupby('DyadicPair').apply(
        lambda g: g.isnull().values.mean() * 100
    )
    dyad_missingness.name = "DyadMissingnessPercentage"
    # Create a boolean mask to identify pairs exceeding the threshold.
    high_missingness_dyads = dyad_missingness[dyad_missingness > high_missingness_threshold]

    # --- Compile and Return Results ---
    # Store the results in a structured dictionary.
    audit_results = {
        'missing_percentages': missing_percentages,
        'missingness_correlation': missingness_correlation,
        'high_missingness_dyads': high_missingness_dyads
    }
    return audit_results


# ==============================================================================
# Task 3, Step 2: Temporal Coverage and Panel Balance Assessment
# ==============================================================================

def assess_temporal_coverage(
    df: pd.DataFrame,
    max_gap_tolerance: pd.Timedelta = pd.Timedelta('366 days')
) -> Dict[str, Any]:
    """
    Assesses the temporal coverage, continuity, and balance of the panel data.

    This function calculates the length of the time series for each dyad,
    validates that time is always moving forward (monotonicity), and identifies
    any significant temporal gaps that might indicate data collection issues.

    Args:
        df (pd.DataFrame): The input DataFrame with a DatetimeIndex.
        max_gap_tolerance (pd.Timedelta): The maximum allowable time gap
                                          between consecutive observations
                                          before it is flagged.

    Returns:
        Dict[str, Any]: A dictionary containing the assessment results:
                        - 'timeseries_lengths': pd.Series with the number of
                          observations for each dyadic pair.
                        - 'is_monotonic': A boolean indicating if all dyadic
                          time series are monotonically increasing.
                        - 'temporal_gaps': A DataFrame detailing all identified
                          gaps that exceed the tolerance.
    """
    # --- Input Validation ---
    # Verify the input is a pandas DataFrame with a DatetimeIndex.
    if not isinstance(df, pd.DataFrame) or not isinstance(df.index, pd.DatetimeIndex):
        raise TypeError("Input must be a pandas DataFrame with a DatetimeIndex.")

    # --- Calculate Time Series Length per Dyad ---
    # Use the efficient .groupby().size() method.
    timeseries_lengths = df.groupby('DyadicPair').size()
    timeseries_lengths.name = "ObservationCount"

    # --- Validate Temporal Monotonicity ---
    # For each dyad, check if the DatetimeIndex is monotonically increasing.
    # .all() confirms this holds true for every single dyad.
    is_monotonic = df.groupby('DyadicPair').apply(
        lambda g: g.index.is_monotonic_increasing
    ).all()

    # --- Identify Temporal Gaps ---
    # Calculate the time difference between consecutive observations within each dyad.
    time_diffs = df.groupby('DyadicPair').apply(lambda g: g.index.diff()).reset_index(level=0, drop=True)
    # Create a boolean mask for gaps exceeding the specified tolerance.
    gap_mask = time_diffs > max_gap_tolerance
    # Select the rows where a significant gap begins.
    temporal_gaps = df[gap_mask].copy()
    # Add the size of the gap as a new column for context.
    temporal_gaps['GapSize'] = time_diffs[gap_mask]

    # --- Compile and Return Results ---
    assessment_results = {
        'timeseries_lengths': timeseries_lengths,
        'is_monotonic': is_monotonic,
        'temporal_gaps': temporal_gaps
    }
    return assessment_results


# ==============================================================================
# Task 3, Step 3: Cross-Variable Consistency Validation
# ==============================================================================

def validate_cross_variable_consistency(df: pd.DataFrame) -> Dict[str, pd.Series]:
    """
    Performs consistency checks between related variables in the DataFrame.

    This function validates logical and mathematical relationships that should
    hold true in the data, such as trade value comparisons, GDP vs. exports,
    and the coherence of institutional and event-based indicators.

    Args:
        df (pd.DataFrame): The input DataFrame to validate.

    Returns:
        Dict[str, pd.Series]: A dictionary where keys describe the check and
                              values are boolean Series. `True` in a Series
                              indicates a row that failed the consistency check.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")

    # --- Initialize Results Dictionary ---
    consistency_failures = {}

    # --- Trade Value Comparison (DOTS vs. BACI) ---
    # Create a mask for rows where the comparison is valid (non-null, positive denominator).
    valid_trade_mask = (df['BilateralTradeValue_USD'] > 0) & (df['BACI_TotalValue_USD'].notna())
    # Calculate the percentage difference only on the valid subset.
    pct_diff = np.abs(
        df.loc[valid_trade_mask, 'BilateralTradeValue_USD'] - df.loc[valid_trade_mask, 'BACI_TotalValue_USD']
    ) / df.loc[valid_trade_mask, 'BilateralTradeValue_USD']
    # Identify rows where the discrepancy exceeds 10%.
    consistency_failures['trade_value_discrepancies'] = pct_diff > 0.10

    # --- GDP vs. Exports Validation ---
    # Identify impossible cases where total exports exceed GDP.
    # A small epsilon (1.0 USD) is added to GDP to handle rounding differences.
    consistency_failures['impossible_gdp_export_ratio'] = (
        df['TotalExports_USD_Exporter'] > (df['GDP_USD_Exporter'] + 1.0)
    )

    # --- Institutional Variable Consistency ---
    # A dyad cannot have both one and two members in GATT/WTO simultaneously.
    consistency_failures['institutional_exclusivity_violation'] = (
        (df['GATTWTO_Both'] + df['GATTWTO_One']) > 1
    )

    # --- GDELT Data Consistency ---
    # A non-null Goldstein score should not exist if the event count is zero.
    consistency_failures['gdelt_event_score_inconsistency'] = (
        (df['GDELT_EventCount'] == 0) & (df['GDELT_GoldsteinMean'].notna())
    )

    return consistency_failures


# ==============================================================================
# Task 3: Master Orchestrator Function
# ==============================================================================

def run_data_quality_assessment(
    df: pd.DataFrame,
    high_missingness_threshold: float = 30.0,
    max_gap_tolerance: pd.Timedelta = pd.Timedelta('366 days')
) -> Dict[str, Any]:
    """
    Orchestrates the entire data quality and missing value assessment process.

    This master function executes the three core assessment steps for Task 3:
    1. A comprehensive audit of missing values.
    2. An assessment of temporal coverage and panel balance.
    3. A series of cross-variable consistency checks.

    It aggregates the results from these steps into a single, comprehensive
    report, providing a deep understanding of the dataset's quality.

    Args:
        df (pd.DataFrame): The input DataFrame to be assessed.
        high_missingness_threshold (float): The percentage threshold for flagging
                                            dyads with high missingness.
        max_gap_tolerance (pd.Timedelta): The maximum allowable time gap between
                                          consecutive observations.

    Returns:
        Dict[str, Any]: A nested dictionary containing the detailed reports from
                        each assessment step.
    """
    try:
        # Step 1: Perform a comprehensive audit of missing data.
        print("Step 1: Auditing missing values...")
        missing_value_report = audit_missing_values(df, high_missingness_threshold)
        print("Step 1: Success. Missing value audit complete.")

        # Step 2: Assess the temporal structure and continuity of the panel.
        print("Step 2: Assessing temporal coverage and panel balance...")
        temporal_report = assess_temporal_coverage(df, max_gap_tolerance)
        print("Step 2: Success. Temporal coverage assessment complete.")

        # Step 3: Validate the logical consistency between different variables.
        print("Step 3: Validating cross-variable consistency...")
        consistency_report = validate_cross_variable_consistency(df)
        print("Step 3: Success. Cross-variable consistency checks complete.")

        # Aggregate all reports into a single master dictionary.
        master_report = {
            "missing_value_audit": missing_value_report,
            "temporal_coverage_assessment": temporal_report,
            "cross_variable_consistency_failures": consistency_report
        }

        print("\nSUCCESS: Data quality assessment is complete.")
        return master_report

    except (ValueError, TypeError) as e:
        # Catch any validation error from the helper functions.
        print(f"\nERROR: Data quality assessment failed.")
        # Re-raise the specific error to provide a detailed failure report.
        raise e


In [None]:
# Task 4: GDELT Event Data Preprocessing Pipeline

# ==============================================================================
# Task 4, Step 1: Filter Dyadic Pairs by Event Density
# ==============================================================================

def filter_dyads_by_event_density(
    df: pd.DataFrame,
    min_obs_threshold: int
) -> pd.DataFrame:
    """
    Filters dyadic pairs from the panel based on a minimum threshold of
    non-zero event observations.

    This function implements the sample selection criteria from Appendix A.1 of
    the source paper. It identifies and retains only those country pairs that
    have a sufficient number of months with reported political events, ensuring
    that the subsequent time-series analysis is performed on data with adequate
    density.

    Args:
        df (pd.DataFrame): The input DataFrame containing GDELT event data,
                           indexed by DatetimeIndex and including a 'DyadicPair'
                           column.
        min_obs_threshold (int): The minimum number of months with non-zero
                                 event counts required for a dyad to be retained.

    Returns:
        pd.DataFrame: A filtered DataFrame containing only the dyadic pairs that
                      meet or exceed the event density threshold.
    """
    # --- Input Validation ---
    # Verify the input is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    # Check for the required columns.
    if 'DyadicPair' not in df.columns or 'GDELT_EventCount' not in df.columns:
        raise ValueError("DataFrame must contain 'DyadicPair' and 'GDELT_EventCount' columns.")
    # Verify the threshold is a non-negative integer.
    if not isinstance(min_obs_threshold, int) or min_obs_threshold < 0:
        raise ValueError("min_obs_threshold must be a non-negative integer.")

    # --- Calculate Event Density per Dyad ---
    # Create a boolean series where True indicates a month with one or more events.
    has_events = df['GDELT_EventCount'] > 0
    # Use groupby().transform() for a highly efficient, vectorized calculation.
    # It computes the sum of non-zero event months for each dyad and broadcasts
    # this value back to a Series with the same index as the original DataFrame.
    event_density = has_events.groupby(df['DyadicPair']).transform('sum')

    # --- Filter the DataFrame ---
    # Create a boolean mask to identify all rows belonging to qualifying dyads.
    qualifying_dyads_mask = event_density >= min_obs_threshold
    # Apply the mask to filter the DataFrame.
    df_filtered = df[qualifying_dyads_mask].copy()

    # --- Log and Return ---
    # Report the number of dyads dropped and retained for audit purposes.
    n_original = df['DyadicPair'].nunique()
    n_filtered = df_filtered['DyadicPair'].nunique()
    print(
        f"Filtered dyads by event density: Retained {n_filtered} of {n_original} "
        f"dyads ({(n_original - n_filtered)} dropped)."
    )

    return df_filtered


# ==============================================================================
# Task 4, Step 2: Prepare GDELT Series for Particle Filter Input
# ==============================================================================

def prepare_gdelt_series_for_filter(
    df: pd.DataFrame
) -> Dict[str, pd.DataFrame]:
    """
    Prepares and structures GDELT time series data for the Particle Filter.

    This function sorts the data chronologically and then splits the main
    DataFrame into a dictionary of smaller DataFrames, one for each dyadic
    pair. This structure is the required input format for the subsequent
    per-dyad processing steps (AR(1) estimation and particle filtering).
    Crucially, it does NOT impute missing values, as the particle filter is
    designed to handle them.

    Args:
        df (pd.DataFrame): The input DataFrame, typically pre-filtered for
                           event density. Must have a DatetimeIndex and a
                           'DyadicPair' column.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary where keys are 'DyadicPair'
                                 strings and values are DataFrames containing
                                 the 'GDELT_GoldsteinMean' and
                                 'GDELT_EventCount' time series for that pair.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame) or not isinstance(df.index, pd.DatetimeIndex):
        raise TypeError("Input must be a pandas DataFrame with a DatetimeIndex.")
    required_cols = ['DyadicPair', 'GDELT_GoldsteinMean', 'GDELT_EventCount']
    if not all(col in df.columns for col in required_cols):
        raise ValueError(f"DataFrame must contain the columns: {required_cols}.")

    # --- Sort Data to Ensure Correct Temporal Order ---
    # This is a critical step for any time-series operation.
    df_sorted = df.sort_values(by=['DyadicPair', df.index.name])

    # --- Split DataFrame into a Dictionary of Dyadic Time Series ---
    # Use a dictionary comprehension over the groupby object for an efficient and
    # readable way to partition the data.
    gdelt_series_dict = {
        dyad_name: dyad_group[['GDELT_GoldsteinMean', 'GDELT_EventCount']]
        for dyad_name, dyad_group in df_sorted.groupby('DyadicPair')
    }

    return gdelt_series_dict


# ==============================================================================
# Task 4, Step 3: Estimate Process Variance Parameters (Q_ij)
# ==============================================================================

def estimate_process_variances(
    gdelt_series_dict: Dict[str, pd.DataFrame],
    q_floor: float,
    q_default: float
) -> pd.Series:
    """
    Estimates the process variance (Q_ij) for each dyadic pair.

    As per Appendix A.1, this function fits an AR(1) model to the observed
    GDELT Goldstein score series for each dyad. The residual variance of this
    fit serves as the estimate for the process variance of the latent state in
    the particle filter. The function includes robust error handling for cases
    where the AR(1) model fails to converge.

    Args:
        gdelt_series_dict (Dict[str, pd.DataFrame]): A dictionary of dyadic
                                                     time series from the
                                                     previous step.
        q_floor (float): The minimum allowed value for the process variance.
        q_default (float): The default value to use if AR(1) estimation fails.

    Returns:
        pd.Series: A Series of estimated process variances (Q_ij), indexed by
                   'DyadicPair'.
    """
    # --- Input Validation ---
    if not isinstance(gdelt_series_dict, dict):
        raise TypeError("Input 'gdelt_series_dict' must be a dictionary.")

    # --- Initialize Storage for Results ---
    process_variances = {}

    # --- Suppress ConvergenceWarning from statsmodels for cleaner output ---
    warnings.filterwarnings("ignore", category=ConvergenceWarning)

    # --- Iterate Over Each Dyad to Estimate its Q_ij ---
    for dyad, series_df in gdelt_series_dict.items():
        # Extract the Goldstein score series and drop missing values for the AR model.
        y_ijt = series_df['GDELT_GoldsteinMean'].dropna()

        # Ensure there are enough data points to fit a model.
        if len(y_ijt) < 10:
            # If insufficient data, assign the default variance and continue.
            process_variances[dyad] = q_default
            continue

        try:
            # --- Fit the AR(1) Model ---
            # Equation: y_t = c + phi * y_{t-1} + epsilon_t
            # We need the variance of epsilon_t.
            # Use statsmodels' AutoReg for robust AR model estimation.
            model = AutoReg(y_ijt, lags=1, trend='c')
            model_fit = model.fit()

            # --- Extract, Constrain, and Store the Variance ---
            # model_fit.sigma2 provides an unbiased estimate of the residual variance.
            q_estimate = model_fit.sigma2
            # Apply the floor constraint to ensure numerical stability.
            process_variances[dyad] = max(q_estimate, q_floor)

        except Exception:
            # If the model fails to fit for any reason (e.g., convergence,
            # perfect collinearity), assign the default variance.
            process_variances[dyad] = q_default

    # --- Reset Warning Filters ---
    warnings.resetwarnings()

    # --- Convert Results to a Pandas Series ---
    # A Series is the ideal structure for easy lookup in the next task.
    q_series = pd.Series(process_variances, name="ProcessVariance_Q")
    q_series.index.name = "DyadicPair"

    return q_series


# ==============================================================================
# Task 4: Master Orchestrator Function
# ==============================================================================

def run_gdelt_preprocessing_pipeline(
    df: pd.DataFrame,
    constants: 'ProjectConstants'
) -> Tuple[Dict[str, pd.DataFrame], pd.Series]:
    """
    Orchestrates the entire GDELT data preprocessing pipeline for the Particle Filter.

    This master function executes the three core preprocessing steps for Task 4:
    1. Filters the dataset to include only dyads with sufficient event density.
    2. Structures the filtered data into the required dictionary format.
    3. Estimates the process variance parameter (Q_ij) for each dyad.

    Args:
        df (pd.DataFrame): The raw, validated DataFrame.
        constants (ProjectConstants): The immutable dataclass containing all
                                      project parameters.

    Returns:
        Tuple[Dict[str, pd.DataFrame], pd.Series]: A tuple containing:
            - The dictionary of prepared GDELT time series for the filter.
            - The Series of estimated process variances (Q_ij).
    """
    try:
        # Step 1: Filter dyads based on the baseline event density threshold.
        print("Step 1: Filtering dyads by event density...")
        df_filtered = filter_dyads_by_event_density(
            df=df,
            min_obs_threshold=constants.BASELINE_MIN_OBS
        )
        print(f"Step 1: Success. Filtered DataFrame contains {df_filtered.shape[0]} observations.")

        # Step 2: Prepare the filtered data into the dictionary structure.
        print("Step 2: Preparing GDELT series for particle filter input...")
        gdelt_series_dict = prepare_gdelt_series_for_filter(df_filtered)
        print(f"Step 2: Success. Prepared data for {len(gdelt_series_dict)} dyads.")

        # Step 3: Estimate the process variance for each dyad.
        print("Step 3: Estimating process variances (Q_ij)...")
        q_variances = estimate_process_variances(
            gdelt_series_dict=gdelt_series_dict,
            q_floor=constants.Q_FLOOR,
            q_default=constants.Q_DEFAULT
        )
        print("Step 3: Success. Estimated process variances.")

        # Return the final, prepared data structures.
        print("\nSUCCESS: GDELT preprocessing pipeline complete.")
        return gdelt_series_dict, q_variances

    except (ValueError, TypeError, KeyError) as e:
        # Catch any exception raised during the process for a clean failure.
        print(f"\nERROR: GDELT preprocessing pipeline failed.")
        raise e


In [None]:
# Task 5: Particle Filter Implementation for Political Distance

# ==============================================================================
# Task 5, Step 1-3: Core Particle Filter Implementation
# ==============================================================================

def _systematic_resample(
    particles: np.ndarray,
    weights: np.ndarray,
    rng: np.random.Generator
) -> np.ndarray:
    """
    Performs systematic resampling to mitigate particle degeneracy.

    This method is a low-variance resampling technique that is generally
    preferred over multinomial resampling. It ensures a more representative
    selection of particles for the next generation.

    Args:
        particles (np.ndarray): The array of particle states (shape [M,]).
        weights (np.ndarray): The corresponding normalized particle weights
                              (shape [M,]).
        rng (np.random.Generator): The random number generator for reproducibility.

    Returns:
        np.ndarray: The new array of resampled particles (shape [M,]).
    """
    # Get the number of particles.
    M = len(particles)
    # Calculate the cumulative sum of weights for the resampling algorithm.
    cumulative_sum = np.cumsum(weights)
    # Generate a single random number to determine the starting point of the grid.
    u_start = rng.uniform(0, 1 / M)
    # Create a uniform grid of M points for resampling.
    positions = u_start + np.arange(M) / M

    # Find the indices of the particles to be selected.
    # searchsorted is a highly efficient (O(M log M)) way to perform this lookup.
    indices = np.searchsorted(cumulative_sum, positions)

    # Return the new set of particles based on the selected indices.
    return particles[indices]


def run_single_particle_filter(
    y_series: np.ndarray,
    n_series: np.ndarray,
    q_ij: float,
    constants: 'ProjectConstants',
    seed: int
) -> np.ndarray:
    """
    Executes the core Particle Filter algorithm for a single dyadic time series.

    This function implements the complete Sequential Monte Carlo filter as
    described in Appendix A.1. It estimates the latent 'true' political
    distance from the noisy observed GDELT Goldstein scores.

    The process involves:
    1. Initialization: Drawing initial particles from the empirical distribution.
    2. Sequential Loop (for each time step):
       a. Propagation: Evolving particles according to the random walk model.
       b. Weighting: Updating particle weights based on the likelihood of the
          current observation. Handles missing observations correctly.
       c. Resampling: Mitigating particle degeneracy using systematic resampling
          if the effective sample size drops below a threshold.
    3. State Extraction: Calculating the final filtered series as the mean of
       the particles at each time step.
    4. Sign Reversal: Converting the cooperation score into a distance metric.

    Args:
        y_series (np.ndarray): The time series of observed Goldstein scores (y_ijt).
                               May contain NaNs.
        n_series (np.ndarray): The time series of event counts (n_ijt).
        q_ij (float): The estimated process variance for this specific dyad.
        constants (ProjectConstants): The immutable dataclass of project parameters.
        seed (int): A seed for the random number generator for reproducibility.

    Returns:
        np.ndarray: The final filtered time series of the latent political
                    distance estimate.
    """
    # --- 0. Initialization ---
    # Initialize a dedicated random number generator for this run.
    rng = np.random.default_rng(seed)
    # Extract constants for clarity.
    M = constants.M_PARTICLES
    R_func = constants.R_FUNCTION
    # Get the length of the time series.
    T = len(y_series)

    # Get the non-NaN observations to sample from for initialization.
    y_observed = y_series[~np.isnan(y_series)]
    if len(y_observed) == 0:
        # If there are no observations at all, return a series of NaNs.
        return np.full(T, np.nan)

    # Initialize particles by sampling from the empirical distribution of observed data.
    particles = rng.choice(y_observed, size=M, replace=True)
    # Initialize weights uniformly.
    weights = np.full(M, 1 / M)
    # Create an array to store the final filtered state estimates.
    filtered_state = np.zeros(T)

    # --- 1. Sequential Filtering Loop ---
    for t in range(T):
        # --- a. Propagation (State Transition) ---
        # Equation: s_ijt|t-1 = s_ij,t-1 + v_t, where v_t ~ N(0, Q_ij)
        # Evolve each particle forward according to the random walk model.
        process_noise = rng.normal(scale=np.sqrt(q_ij), size=M)
        particles += process_noise

        # --- b. Weight Update (Likelihood Calculation) ---
        # Get the current observation and event count.
        y_t = y_series[t]
        n_t = n_series[t]

        # Check if the current observation is missing.
        if not np.isnan(y_t):
            # If an observation exists, update weights based on likelihood.
            # Equation: R_ijt = 1 / (n_ijt + 1)
            r_t = R_func(n_t)

            # To prevent numerical underflow, calculate weights in log-space.
            # Equation: log(w_t) = log(p(y_t | s_t)) = -0.5*log(2*pi*R_t) - (y_t - s_t)^2 / (2*R_t)
            log_likelihoods = norm.logpdf(y_t, loc=particles, scale=np.sqrt(r_t))

            # Add to previous log weights (if they existed). Here we assume uniform prior.
            log_weights = log_likelihoods

            # Normalize log weights using logsumexp for numerical stability.
            log_weights_normalized = log_weights - logsumexp(log_weights)

            # Convert back to linear space for resampling.
            weights = np.exp(log_weights_normalized)
        # If y_t is NaN, weights remain unchanged from the previous step (effectively uniform
        # after resampling), reflecting no new information.

        # --- c. Resampling ---
        # Calculate Effective Sample Size (ESS) to check for particle degeneracy.
        # Equation: ESS = 1 / sum(w_i^2)
        ess = 1.0 / np.sum(weights**2)

        # Resample only if ESS falls below a threshold (e.g., M/2).
        if ess < M / 2:
            particles = _systematic_resample(particles, weights, rng)
            # Reset weights to uniform after resampling.
            weights.fill(1 / M)

        # --- d. State Storage ---
        # The estimate for time t is the mean of the current particle set.
        filtered_state[t] = np.mean(particles)

    # --- 2. Final Transformation ---
    # As per the paper, multiply by -1 to convert the cooperation score
    # into a political distance measure (higher values = more distant).
    final_filtered_series = -filtered_state

    return final_filtered_series


# ==============================================================================
# Task 5: Master Orchestrator Function
# ==============================================================================

def run_particle_filter_pipeline(
    gdelt_series_dict: Dict[str, pd.DataFrame],
    q_variances: pd.Series,
    constants: 'ProjectConstants',
    base_seed: int = 20250921
) -> pd.DataFrame:
    """
    Orchestrates the execution of the Particle Filter across all dyadic pairs.

    This function iterates through each dyadic pair, applies the core particle
    filter algorithm to its time series, and aggregates the results into a
    single, tidy DataFrame. It ensures reproducibility by seeding the random
    number generator for each dyad deterministically.

    Args:
        gdelt_series_dict (Dict[str, pd.DataFrame]): The dictionary of prepared
                                                     GDELT time series.
        q_variances (pd.Series): The Series of estimated process variances (Q_ij).
        constants (ProjectConstants): The immutable dataclass of project parameters.
        base_seed (int): A base seed to generate deterministic, unique seeds
                         for each dyad's filter run, ensuring overall
                         reproducibility.

    Returns:
        pd.DataFrame: A DataFrame with a MultiIndex ('DyadicPair', 'DateTime')
                      containing the final, filtered political distance series
                      named 'PD_GDELT_Filtered'.
    """
    # --- Input Validation ---
    if not isinstance(gdelt_series_dict, dict) or not isinstance(q_variances, pd.Series):
        raise TypeError("Inputs must be a dictionary of DataFrames and a pandas Series.")

    # --- Initialize Storage for Results ---
    all_filtered_series = []

    # --- Iterate over all dyads with a progress bar ---
    print("Running Particle Filter for all dyads...")
    for i, (dyad, series_df) in enumerate(tqdm(gdelt_series_dict.items())):
        # --- Prepare Inputs for the Core Filter ---
        # Extract the required numpy arrays from the DataFrame.
        y_series = series_df['GDELT_GoldsteinMean'].values
        n_series = series_df['GDELT_EventCount'].values
        # Look up the process variance for the current dyad.
        q_ij = q_variances.get(dyad)

        # Check if Q_ij was successfully estimated.
        if q_ij is None:
            print(f"Warning: No process variance Q_ij found for dyad {dyad}. Skipping.")
            continue

        # --- Run the Core Particle Filter ---
        # Generate a unique, deterministic seed for each dyad for reproducibility.
        dyad_seed = base_seed + i
        filtered_values = run_single_particle_filter(
            y_series=y_series,
            n_series=n_series,
            q_ij=q_ij,
            constants=constants,
            seed=dyad_seed
        )

        # --- Store the Result ---
        # Create a pandas Series from the result with the correct index.
        filtered_series = pd.Series(
            filtered_values,
            index=series_df.index,
            name='PD_GDELT_Filtered'
        )
        # Add the dyad name to the series for later concatenation.
        filtered_series.name = dyad
        all_filtered_series.append(filtered_series)

    # --- Concatenate All Results into a Single DataFrame ---
    if not all_filtered_series:
        print("Warning: No series were filtered. Returning an empty DataFrame.")
        return pd.DataFrame()

    # Concatenate all individual series into a single DataFrame.
    # The keys of the list become the columns.
    result_df = pd.concat(all_filtered_series, axis=1)

    # Stack the DataFrame to create the final tidy format with a MultiIndex.
    # This transforms the data from wide (dyads as columns) to long format.
    final_df = result_df.stack().to_frame(name='PD_GDELT_Filtered')
    final_df.index.names = ['DateTime', 'DyadicPair']
    # Reorder index to match standard panel format.
    final_df = final_df.reorder_levels(['DyadicPair', 'DateTime'])

    print("\nSUCCESS: Particle filter pipeline complete.")
    return final_df


In [None]:
# Task 6: Political Distance and Trade Variable Construction

# ==============================================================================
# Task 6, Step 1: UNGA Political Distance Computation
# ==============================================================================

def compute_unga_political_distance(
    df: pd.DataFrame,
    unga_ideal_points_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes UNGA-based political distance and merges it into the main DataFrame.

    This function enriches a dyadic panel DataFrame with country-year specific
    UN General Assembly (UNGA) ideal point estimates. It performs two sequential
    left joins to merge the ideal points for the exporter and importer in each
    dyad-year observation. Finally, it calculates the political distance as the
    absolute difference between these two ideal points.

    The implementation follows a streamlined and robust logic to ensure clarity,
    efficiency, and accurate handling of potential data mismatches.

    Args:
        df (pd.DataFrame): The main dyadic panel DataFrame. Must have a
                           DatetimeIndex and 'ExporterISO', 'ImporterISO' columns.
        unga_ideal_points_df (pd.DataFrame): A DataFrame with columns
                                             ['ISO', 'Year', 'IdealPoint'],
                                             containing the annual ideal point
                                             estimates for each country.

    Returns:
        pd.DataFrame: The main DataFrame with three new columns added:
                      'IdealPoint_Exporter', 'IdealPoint_Importer', and 'PD_UNGA'.

    Raises:
        TypeError: If either input is not a pandas DataFrame.
        ValueError: If required columns are missing from the input DataFrames,
                    or if a merge operation fails completely (indicating a
                    likely key mismatch between the datasets).
    """
    # --- 1. Input Validation ---
    # Verify that both inputs are pandas DataFrames.
    if not isinstance(df, pd.DataFrame) or not isinstance(unga_ideal_points_df, pd.DataFrame):
        raise TypeError("Inputs 'df' and 'unga_ideal_points_df' must be pandas DataFrames.")

    # Define and check for required columns in the main DataFrame.
    required_main_cols = ['ExporterISO', 'ImporterISO']
    if not all(col in df.columns for col in required_main_cols):
        raise ValueError(f"Main DataFrame must contain columns: {required_main_cols}.")

    # Define and check for required columns in the UNGA data.
    required_unga_cols = ['ISO', 'Year', 'IdealPoint']
    if not all(col in unga_ideal_points_df.columns for col in required_unga_cols):
        raise ValueError(f"UNGA DataFrame must contain columns: {required_unga_cols}.")

    # --- 2. Prepare Merge Key ---
    # Create a temporary copy to avoid modifying the original DataFrame in place.
    df_out = df.copy()
    # Create a 'Year' column from the DatetimeIndex to use as a merge key.
    df_out['Year'] = df_out.index.year

    # --- 3. First Merge (for Exporter) ---
    # Perform a left merge to join the exporter's ideal point for each year.
    # A left join ensures that all original observations are preserved.
    df_out = pd.merge(
        df_out,
        unga_ideal_points_df,
        left_on=['ExporterISO', 'Year'],
        right_on=['ISO', 'Year'],
        how='left'
    )
    # Rename the merged 'IdealPoint' column to be specific to the exporter.
    df_out.rename(columns={'IdealPoint': 'IdealPoint_Exporter'}, inplace=True)
    # Drop the redundant 'ISO' key column from the UNGA data.
    df_out.drop(columns='ISO', inplace=True)

    # --- 4. Post-Merge Validation (Exporter) ---
    # Check if the merge was successful. If the new column is all NaN, it
    # indicates a total failure to match keys.
    if df_out['IdealPoint_Exporter'].isnull().all():
        raise ValueError(
            "Exporter ideal point merge failed completely. Check for mismatch "
            "in country codes ('ExporterISO' vs 'ISO') or year ranges."
        )

    # --- 5. Second Merge (for Importer) ---
    # Perform a second left merge to join the importer's ideal point.
    df_out = pd.merge(
        df_out,
        unga_ideal_points_df,
        left_on=['ImporterISO', 'Year'],
        right_on=['ISO', 'Year'],
        how='left'
    )
    # Rename the newly merged 'IdealPoint' column for the importer.
    df_out.rename(columns={'IdealPoint': 'IdealPoint_Importer'}, inplace=True)
    # Drop the redundant 'ISO' key column.
    df_out.drop(columns='ISO', inplace=True)

    # --- 6. Post-Merge Validation (Importer) ---
    # Perform the same check for the importer merge.
    if df_out['IdealPoint_Importer'].isnull().all():
        raise ValueError(
            "Importer ideal point merge failed completely. Check for mismatch "
            "in country codes ('ImporterISO' vs 'ISO') or year ranges."
        )

    # --- 7. Political Distance Calculation ---
    # Equation: PD_ijt^UNGA = |IdealPoint_it - IdealPoint_jt|
    # This vectorized operation calculates the absolute difference. It correctly
    # propagates NaNs, resulting in NaN for any dyad-year where at least one
    # partner's ideal point is missing.
    df_out['PD_UNGA'] = np.abs(df_out['IdealPoint_Exporter'] - df_out['IdealPoint_Importer'])

    # --- 8. Final Cleanup ---
    # Drop the temporary 'Year' column to restore the DataFrame's original schema.
    df_out.drop(columns='Year', inplace=True)

    # Return the final, enriched DataFrame.
    return df_out


# ==============================================================================
# Task 6, Step 2: Apply Inverse Hyperbolic Sine Transformation
# ==============================================================================

def apply_ihs_transformation(
    df: pd.DataFrame,
    columns_to_transform: List[str]
) -> pd.DataFrame:
    """
    Applies the Inverse Hyperbolic Sine (IHS) transformation to specified columns.

    The IHS transformation, sinh⁻¹(x), is used for variables that can take
    negative or zero values (like Polity scores or centered GDELT scores), as it
    is defined for all real numbers and approximates the natural log for large
    positive values.

    Args:
        df (pd.DataFrame): The input DataFrame.
        columns_to_transform (List[str]): A list of column names to which the
                                          IHS transformation will be applied.

    Returns:
        pd.DataFrame: The DataFrame with new columns containing the transformed
                      values, suffixed with '_IHS'.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    if not all(col in df.columns for col in columns_to_transform):
        missing = set(columns_to_transform) - set(df.columns)
        raise ValueError(f"Columns to transform not found in DataFrame: {missing}")

    # --- Apply Transformation ---
    df_transformed = df.copy()
    # Iterate through the specified list of columns.
    for col in columns_to_transform:
        # Define the new column name.
        new_col_name = f"{col}_IHS"
        # Equation: x_transformed = sinh⁻¹(x) = ln(x + sqrt(x² + 1))
        # Apply the vectorized np.arcsinh function.
        df_transformed[new_col_name] = np.arcsinh(df_transformed[col])

    return df_transformed


# ==============================================================================
# Task 6, Step 3: Construct Trade Margin Variables
# ==============================================================================

def construct_trade_margins(df: pd.DataFrame) -> pd.DataFrame:
    """
    Constructs variables for analyzing the extensive and intensive margins of trade.

    This function creates three key variables based on the source paper's
    methodology, using TradeProd sector counts (S_ijt) as the basis for the
    extensive margin.

    1.  Intensive Margin: Average trade value per traded sector (X_ijt / S_ijt).
    2.  Any-Trade Dummy: A binary indicator for whether any trade occurs (S_ijt > 0).
    3.  Sector Share: The fraction of possible sectors traded (S_ijt / 9), for
        use in bounded (e.g., Logit) models.

    Args:
        df (pd.DataFrame): The input DataFrame. Must contain the columns
                           'BilateralTradeValue_USD' and 'TradeProd_SectorCount'.

    Returns:
        pd.DataFrame: The DataFrame with new columns for the trade margins.
    """
    # --- Input Validation ---
    # This check is critical because the paper's methodology for margins
    # relies on sector counts, not the HS-6 product counts from the initial schema.
    required_cols = ['BilateralTradeValue_USD', 'TradeProd_SectorCount']
    if not all(col in df.columns for col in required_cols):
        raise ValueError(f"DataFrame must contain columns for margin construction: {required_cols}.")

    df_margins = df.copy()

    # --- Construct Intensive Margin ---
    # Equation: IntensiveMargin_ijt = X_ijt / S_ijt
    # Use np.where to safely handle division by zero for dyads with no trade.
    df_margins['IntensiveMargin_USD_per_Sector'] = np.where(
        df_margins['TradeProd_SectorCount'] > 0,
        df_margins['BilateralTradeValue_USD'] / df_margins['TradeProd_SectorCount'],
        0  # Assign 0 if no sectors are traded.
    )

    # --- Construct Any-Trade Dummy (Extensive Margin) ---
    # Equation: AnyTrade_ijt = 1(S_ijt > 0)
    # Create a binary indicator (0 or 1).
    df_margins['AnyTrade_Sector_Dummy'] = (df_margins['TradeProd_SectorCount'] > 0).astype(int)

    # --- Construct Sector Share (Extensive Margin for Logit) ---
    # Equation: SectorShare_ijt = S_ijt / 9
    # Assuming 9 total industrial sectors as per the TradeProd database.
    MAX_SECTORS = 9
    df_margins['SectorShare'] = df_margins['TradeProd_SectorCount'] / MAX_SECTORS

    return df_margins


# ==============================================================================
# Task 6: Master Orchestrator Function
# ==============================================================================

def run_variable_construction_pipeline(
    main_df: pd.DataFrame,
    filtered_gdelt_df: pd.DataFrame,
    unga_ideal_points_df: pd.DataFrame,
    ihs_columns: List[str]
) -> pd.DataFrame:
    """
    Orchestrates the entire variable construction and transformation pipeline.

    This master function executes the three core steps for Task 6:
    1. Merges the filtered GDELT series into the main panel.
    2. Computes and merges the UNGA-based political distance.
    3. Applies the IHS transformation to specified variables.
    4. Constructs the intensive and extensive trade margin variables.

    Args:
        main_df (pd.DataFrame): The primary, validated panel DataFrame.
        filtered_gdelt_df (pd.DataFrame): The DataFrame containing the filtered
                                          GDELT series from Task 5.
        unga_ideal_points_df (pd.DataFrame): DataFrame of UNGA ideal points.
        ihs_columns (List[str]): A list of columns to apply the IHS transform to.

    Returns:
        pd.DataFrame: The final, fully processed DataFrame with all constructed
                      variables ready for the estimation phase.
    """
    try:
        # --- Merge Filtered GDELT Data ---
        # Join the filtered series back to the main DataFrame.
        print("Step 0: Merging filtered GDELT political distance series...")
        # A left join ensures we keep all original observations.
        df_processed = main_df.join(filtered_gdelt_df, on=['DyadicPair', main_df.index.name])
        print("Step 0: Success. GDELT series merged.")

        # --- Step 1: Compute UNGA Political Distance ---
        print("Step 1: Computing UNGA political distance...")
        df_processed = compute_unga_political_distance(df_processed, unga_ideal_points_df)
        print("Step 1: Success. UNGA distance computed and merged.")

        # --- Step 2: Apply IHS Transformation ---
        # Add the newly created UNGA distance to the list of columns to transform.
        all_ihs_columns = ihs_columns + ['PD_UNGA']
        print(f"Step 2: Applying IHS transformation to {all_ihs_columns}...")
        df_processed = apply_ihs_transformation(df_processed, all_ihs_columns)
        print("Step 2: Success. IHS transformation applied.")

        # --- Step 3: Construct Trade Margins ---
        # This step requires a 'TradeProd_SectorCount' column, which we assume
        # exists on the main_df for this function to work.
        print("Step 3: Constructing trade margin variables...")
        df_processed = construct_trade_margins(df_processed)
        print("Step 3: Success. Trade margins constructed.")

        print("\nSUCCESS: Variable construction pipeline complete.")
        return df_processed

    except (ValueError, TypeError, KeyError) as e:
        print(f"\nERROR: Variable construction pipeline failed.")
        raise e


In [None]:
# Task 7: Panel Data Index Construction and Validation

# ==============================================================================
# Task 7, Step 1: Create Hierarchical Panel Index Structure
# ==============================================================================

def create_fixed_effects_identifiers(df: pd.DataFrame) -> pd.DataFrame:
    """
    Creates categorical identifiers required for high-dimensional fixed effects estimation.

    This function generates the specific categorical columns that modern econometric
    packages use to absorb fixed effects algorithmically. It creates identifiers for:
    1. Exporter-Year effects.
    2. Importer-Year effects.
    3. Dyadic Pair effects (already present as 'DyadicPair').
    4. Border-Year effects (distinguishing international vs. domestic trade per year).

    For memory efficiency, the newly created identifier columns are converted to
    the 'category' dtype.

    Args:
        df (pd.DataFrame): The processed DataFrame. Must have a DatetimeIndex
                           and 'ExporterISO', 'ImporterISO' columns.

    Returns:
        pd.DataFrame: The DataFrame with new categorical columns for fixed effects.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame) or not isinstance(df.index, pd.DatetimeIndex):
        raise TypeError("Input must be a pandas DataFrame with a DatetimeIndex.")
    required_cols = ['ExporterISO', 'ImporterISO']
    if not all(col in df.columns for col in required_cols):
        raise ValueError(f"DataFrame must contain columns: {required_cols}.")

    df_out = df.copy()

    # --- Create Year Column ---
    # Extract the year from the DatetimeIndex for creating time-based effects.
    df_out['Year'] = df_out.index.year

    # --- Create Exporter-Year and Importer-Year Identifiers ---
    # These identifiers capture all time-varying, country-specific shocks.
    df_out['FE_ExporterYear'] = (df_out['ExporterISO'] + '_' + df_out['Year'].astype(str))
    df_out['FE_ImporterYear'] = (df_out['ImporterISO'] + '_' + df_out['Year'].astype(str))

    # --- Create Border-Year Identifier ---
    # This identifier creates a separate dummy for international trade in each year,
    # allowing the effect of crossing a border to vary over time.
    # Domestic observations are grouped into a single 'Domestic' category.
    df_out['FE_BorderYear'] = np.where(
        df_out['ExporterISO'] != df_out['ImporterISO'],
        'Border_' + df_out['Year'].astype(str),
        'Domestic'
    )

    # --- Convert to Category Dtype for Memory Efficiency ---
    # This is a critical optimization for high-dimensional fixed effects.
    categorical_fe_cols = ['FE_ExporterYear', 'FE_ImporterYear', 'FE_BorderYear', 'DyadicPair']
    for col in categorical_fe_cols:
        # Ensure the column exists before attempting conversion.
        if col in df_out.columns:
            df_out[col] = df_out[col].astype('category')

    return df_out


# ==============================================================================
# Task 7, Step 2: Fixed Effects Dummy Variable Construction (Validation)
# ==============================================================================

def validate_fixed_effects_identifiers(
    df: pd.DataFrame,
    fe_identifier_cols: List[str]
) -> bool:
    """
    Validates that the fixed effects identifier columns are correctly prepared.

    NOTE: This function does NOT create dummy variable matrices. In modern
    econometrics, high-dimensional fixed effects are absorbed algorithmically
    to avoid creating enormous, memory-intensive matrices. This function
    instead validates that the categorical IDENTIFIERS are present and correctly
    formatted for use with packages like `statsmodels` or `fixest`.

    Args:
        df (pd.DataFrame): The DataFrame with FE identifier columns.
        fe_identifier_cols (List[str]): A list of the names of the FE
                                        identifier columns to validate.

    Returns:
        bool: True if all validations pass.

    Raises:
        ValueError: If any identifier column is missing or has an incorrect dtype.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")

    # --- Check for Presence and Type ---
    for col in fe_identifier_cols:
        # Verify that each required identifier column exists.
        if col not in df.columns:
            raise ValueError(f"Fixed effects identifier column '{col}' is missing.")
        # Verify that the column has been converted to the efficient 'category' dtype.
        if not pd.api.types.is_categorical_dtype(df[col]):
            raise TypeError(f"Identifier column '{col}' must be of 'category' dtype for efficiency.")
        # Verify there are no missing values in the identifier columns.
        if df[col].isnull().any():
            raise ValueError(f"Identifier column '{col}' contains null values.")

    return True


# ==============================================================================
# Task 7, Step 3: Interaction Term Construction
# ==============================================================================

def create_interaction_terms(
    df: pd.DataFrame,
    interactions_to_create: List[Tuple[str, str, str]]
) -> pd.DataFrame:
    """
    Creates interaction terms between specified pairs of variables.

    This function takes a list of desired interactions and creates them using
    efficient, vectorized multiplication.

    Args:
        df (pd.DataFrame): The input DataFrame.
        interactions_to_create (List[Tuple[str, str, str]]):
            A list of tuples, where each tuple contains:
            (name of variable 1, name of variable 2, name for new interaction column).

    Returns:
        pd.DataFrame: The DataFrame with the new interaction term columns added.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")

    df_out = df.copy()

    # --- Iterate and Create Interactions ---
    for var1, var2, new_name in interactions_to_create:
        # Check that both source columns exist before proceeding.
        if var1 not in df_out.columns or var2 not in df_out.columns:
            raise ValueError(f"Cannot create interaction '{new_name}': "
                             f"Source column '{var1}' or '{var2}' not found.")

        # Create the new interaction term using vectorized multiplication.
        # This correctly propagates NaNs.
        df_out[new_name] = df_out[var1] * df_out[var2]

    return df_out


# ==============================================================================
# Task 7: Master Orchestrator Function
# ==============================================================================

def run_panel_data_preparation(
    df: pd.DataFrame,
    interactions_spec: List[Tuple[str, str, str]]
) -> pd.DataFrame:
    """
    Orchestrates the full pipeline for preparing panel data for estimation.

    This master function executes the three core steps for Task 7:
    1. Creates the necessary categorical identifiers for fixed effects.
    2. Validates that these identifiers are correctly formatted.
    3. Creates all specified interaction terms between variables.

    Args:
        df (pd.DataFrame): The fully processed DataFrame from the previous task.
        interactions_spec (List[Tuple[str, str, str]]): The specification for
                                                        all interaction terms to
                                                        be created.

    Returns:
        pd.DataFrame: The final DataFrame with all identifiers and interaction
                      terms, ready for the estimation phase.
    """
    try:
        # Step 1: Create the categorical identifiers for fixed effects.
        print("Step 1: Creating fixed effects identifiers...")
        df_prepared = create_fixed_effects_identifiers(df)
        print("Step 1: Success. FE identifiers created and converted to category dtype.")

        # Step 2: Validate the created identifiers.
        print("Step 2: Validating fixed effects identifiers...")
        fe_cols = ['FE_ExporterYear', 'FE_ImporterYear', 'FE_BorderYear', 'DyadicPair']
        validate_fixed_effects_identifiers(df_prepared, fe_cols)
        print("Step 2: Success. FE identifiers are valid and ready for absorption.")

        # Step 3: Create all required interaction terms.
        print("Step 3: Creating interaction terms...")
        df_final = create_interaction_terms(df_prepared, interactions_spec)
        print(f"Step 3: Success. Created {len(interactions_spec)} interaction terms.")

        print("\nSUCCESS: Panel data preparation pipeline complete.")
        return df_final

    except (ValueError, TypeError, KeyError) as e:
        print(f"\nERROR: Panel data preparation pipeline failed.")
        raise e


In [None]:
# Task 8: Sample Preparation for Different Estimation Methods

# ==============================================================================
# Task 8, Step 1: PPML Sample Preparation Including Domestic Trade
# ==============================================================================

def prepare_ppml_sample(
    dyadic_df: pd.DataFrame,
    country_df: pd.DataFrame,
    core_regression_vars: List[str]
) -> pd.DataFrame:
    """
    Prepares the final sample for PPML estimation by incorporating domestic trade
    flows and performing listwise deletion of missing core variables.

    This function implements a critical step from modern structural gravity
    theory: including domestic trade (production for the home market) as an
    observation for each country-year. The implementation follows a clean and
    modern pandas workflow, avoiding deprecated arguments and complex methods in
    favor of explicit and readable data manipulation steps.

    The process is as follows:
    1. Constructs a DataFrame of domestic trade observations from country-level data.
    2. Explicitly aligns the columns of the domestic and international DataFrames.
    3. Concatenates the two DataFrames into a single panel.
    4. Sorts the combined panel chronologically.
    5. Performs listwise deletion on rows with missing values in essential
       regression variables.

    Args:
        dyadic_df (pd.DataFrame): The main panel of international dyadic trade flows.
                                  Must have a DatetimeIndex.
        country_df (pd.DataFrame): A DataFrame with country-level annual data,
                                   including ['ISO', 'Year', 'GDP_USD',
                                   'TotalExports_USD'].
        core_regression_vars (List[str]): A list of essential column names. Any
                                          row with a missing value in these
                                          columns will be dropped from the final
                                          sample.

    Returns:
        pd.DataFrame: The final estimation-ready DataFrame including both
                      international and domestic trade observations, sorted by
                      index and cleaned of missing values in core variables.

    Raises:
        TypeError: If inputs are not pandas DataFrames.
        ValueError: If required columns are missing from the input DataFrames.
    """
    # --- 1. Input Validation ---
    # Verify that both inputs are pandas DataFrames.
    if not isinstance(dyadic_df, pd.DataFrame) or not isinstance(country_df, pd.DataFrame):
        raise TypeError("Inputs 'dyadic_df' and 'country_df' must be pandas DataFrames.")
    # Check for required columns in the country-level data.
    required_country_cols = ['ISO', 'Year', 'GDP_USD', 'TotalExports_USD']
    if not all(col in country_df.columns for col in required_country_cols):
        raise ValueError(f"Country DataFrame must contain columns: {required_country_cols}.")
    # Check that the main DataFrame has a DatetimeIndex.
    if not isinstance(dyadic_df.index, pd.DatetimeIndex):
        raise TypeError("Main DataFrame 'dyadic_df' must have a DatetimeIndex.")

    # --- 2. Construct Domestic Trade Observations ---
    # Create a copy to avoid modifying the original country data DataFrame.
    domestic = country_df.copy()

    # Equation: DomesticTrade_i = max(0, GDP_i - TotalExports_i)
    # Calculate domestic trade. .clip(lower=0) is a robust, vectorized
    # way to enforce the non-negativity constraint.
    domestic['BilateralTradeValue_USD'] = (
        domestic['GDP_USD'] - domestic['TotalExports_USD']
    ).clip(lower=0)

    # For domestic observations, the exporter and importer are the same country.
    domestic['ExporterISO'] = domestic['ISO']
    domestic['ImporterISO'] = domestic['ISO']

    # Create the corresponding 'DyadicPair' identifier (e.g., 'USA_USA').
    domestic['DyadicPair'] = domestic['ISO'] + '_' + domestic['ISO']

    # Create a DatetimeIndex to match the dyadic panel's structure.
    # By convention, annual data is placed at the start of the year.
    domestic.index = pd.to_datetime(domestic['Year'].astype(str) + '-01-01')

    # --- 3. Align Columns for Clean Concatenation ---
    # Identify the common columns between the two dataframes.
    common_columns = dyadic_df.columns.intersection(domestic.columns)
    # Subset the domestic dataframe to only these common columns.
    domestic_aligned = domestic[common_columns]
    # Ensure the column order is identical to the dyadic dataframe.
    domestic_aligned = domestic_aligned[dyadic_df.columns]

    # --- 4. Concatenate International and Domestic Data ---
    # Combine the international and newly created domestic observations.
    # `ignore_index=False` preserves the DatetimeIndex.
    full_panel = pd.concat([dyadic_df, domestic_aligned])

    # Explicitly sort the index to restore a clean chronological order.
    full_panel.sort_index(inplace=True)

    # --- 5. Final Listwise Deletion ---
    # Drop any row with missing values in the core set of variables
    # required for the main regression specification.
    n_before = len(full_panel)
    final_sample = full_panel.dropna(subset=core_regression_vars)
    n_after = len(final_sample)

    # Provide a clear log of the filtering action.
    print(
        f"PPML sample preparation: Dropped {n_before - n_after} of {n_before} "
        f"rows with missing core variables. Final sample size: {n_after}."
    )

    # Return the final, analysis-ready DataFrame.
    return final_sample


# ==============================================================================
# Task 8, Step 2: Logit Sample Preparation for Extensive Margin
# ==============================================================================

def prepare_logit_sample(
    df: pd.DataFrame,
    dependent_var: str,
    handle_gattwto_collinearity: bool = True
) -> pd.DataFrame:
    """
    Prepares a sample for Logit estimation by removing perfect prediction cases
    and optionally handling a specific collinearity issue.

    This function performs two critical data preparation steps for binary choice
    models on panel data:
    1.  It identifies and removes dyadic pairs where the outcome variable is
        constant over time (always 0 or always 1), as these groups provide no
        information for estimation with fixed effects.
    2.  As per the source paper (p. 12), it can programmatically address a
        potential collinearity issue that arises when the "no GATT/WTO member"
        category becomes too sparse after the first filter. If triggered, it
        collapses the "no-member" and "one-member" categories by dropping the
        'GATTWTO_One' indicator.

    Args:
        df (pd.DataFrame): The input DataFrame for estimation.
        dependent_var (str): The name of the binary (0/1) dependent variable,
                             e.g., 'AnyTrade_Sector_Dummy'.
        handle_gattwto_collinearity (bool): If True, performs the check for
                                            sparsity in the baseline GATT/WTO
                                            category and drops the 'GATTWTO_One'
                                            column if necessary.

    Returns:
        pd.DataFrame: A filtered and potentially modified DataFrame that is
                      suitable for Logit estimation.

    Raises:
        TypeError: If the input is not a pandas DataFrame.
        ValueError: If required columns are missing from the DataFrame.
    """
    # --- 1. Input Validation ---
    # Verify the input is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    # Define the list of columns required for the function's logic.
    required_cols = [dependent_var, 'DyadicPair', 'GATTWTO_One', 'GATTWTO_Both']
    # Check if all required columns are present.
    if not all(col in df.columns for col in required_cols):
        raise ValueError(f"DataFrame must contain required columns: {required_cols}.")

    # --- 2. Filter for Perfect Prediction ---
    # Use groupby().transform('nunique') to efficiently find the number of
    # unique outcomes for the dependent variable within each dyad.
    outcome_variation = df.groupby('DyadicPair')[dependent_var].transform('nunique')
    # Keep only the dyads where the number of unique outcomes is greater than 1
    # (i.e., where the outcome varies over time).
    logit_sample = df[outcome_variation > 1].copy()

    # Provide a clear log of the filtering action.
    n_dyads_before = df['DyadicPair'].nunique()
    n_dyads_after = logit_sample['DyadicPair'].nunique()
    print(
        f"Logit sample preparation: Dropped {n_dyads_before - n_dyads_after} of "
        f"{n_dyads_before} dyads due to perfect prediction."
    )

    # --- 3. Conditional Collinearity Handling ---
    # Proceed with this step only if the flag is enabled.
    if handle_gattwto_collinearity:
        # Define the baseline category: dyads with no GATT/WTO members.
        is_baseline = (logit_sample['GATTWTO_One'] == 0) & (logit_sample['GATTWTO_Both'] == 0)

        # Count the number of unique dyads in this baseline category.
        n_baseline_dyads = logit_sample.loc[is_baseline, 'DyadicPair'].nunique()

        # Define a pragmatic threshold for severe sparsity. If fewer than 2
        # dyads exist in the baseline, it's highly likely to be collinear
        # with the dyadic fixed effects.
        MIN_BASELINE_DYADS = 2
        if n_baseline_dyads < MIN_BASELINE_DYADS:
            # If the baseline is too sparse, drop the 'GATTWTO_One' column.
            logit_sample.drop(columns=['GATTWTO_One'], inplace=True)
            # Issue a clear warning to the user explaining the action taken.
            # The `warnings` module is preferred over `print` for such notifications.
            warnings.warn(
                f"Baseline 'no-member' category has only {n_baseline_dyads} dyad(s) "
                f"after filtering. Dropping 'GATTWTO_One' to collapse categories and "
                f"avoid collinearity, as per source paper methodology."
            )

    # Return the fully prepared sample.
    return logit_sample


# ==============================================================================
# Task 8, Step 3: Create Temporal Subsample Datasets
# ==============================================================================

def create_temporal_subsamples(
    df: pd.DataFrame,
    periods_spec: Dict[str, Tuple[int, int]]
) -> Dict[str, pd.DataFrame]:
    """
    Creates a dictionary of DataFrames, each corresponding to a specific time period.

    This function slices the main DataFrame into temporal subsamples based on a
    provided specification, facilitating analysis of how effects vary over time.

    Args:
        df (pd.DataFrame): The input DataFrame. Must have a 'Year' column.
        periods_spec (Dict[str, Tuple[int, int]]): A dictionary where keys are
            period labels (e.g., "1995-2008") and values are tuples of
            (start_year, end_year).

    Returns:
        Dict[str, pd.DataFrame]: A dictionary of the temporal subsample DataFrames.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    if 'Year' not in df.columns:
        raise ValueError("DataFrame must have a 'Year' column for temporal slicing.")

    # --- Create Subsamples ---
    subsamples = {}
    for period_label, (start_year, end_year) in periods_spec.items():
        # Use a boolean mask for efficient slicing.
        mask = (df['Year'] >= start_year) & (df['Year'] <= end_year)
        subsample = df[mask].copy()

        # --- Validate Subsample ---
        if subsample.empty:
            print(f"Warning: Temporal subsample '{period_label}' is empty.")

        subsamples[period_label] = subsample

    return subsamples


# ==============================================================================
# Task 8: Master Orchestrator Function
# ==============================================================================

def run_sample_preparation_pipeline(
    main_df: pd.DataFrame,
    country_level_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the creation of all analytical samples for the study.

    This master function executes the three core sample preparation steps:
    1. Creates the main PPML sample, crucially including domestic trade flows.
    2. Creates a subsample for Logit estimation, handling perfect prediction.
    3. Creates a dictionary of temporal subsamples for historical analysis.

    Args:
        main_df (pd.DataFrame): The fully processed international dyadic panel.
        country_level_df (pd.DataFrame): Annual country-level data for domestic trade.
        config (Dict[str, Any]): The project configuration dictionary containing
                                 specifications for variables and periods.

    Returns:
        Dict[str, Any]: A dictionary containing all prepared samples:
                        - 'ppml_sample': The main estimation DataFrame.
                        - 'logit_sample': The sample for extensive margin analysis.
                        - 'temporal_subsamples': A dict of historical subsamples.
    """
    try:
        # Define core variables needed for listwise deletion from config or hardcode.
        # This should include the dependent var, main regressors, and interactions.
        # For demonstration, a simplified list is used.
        core_vars = [
            'BilateralTradeValue_USD', 'PD_GDELT_Filtered_IHS', 'PD_UNGA_IHS',
            'GATTWTO_Both', 'GATTWTO_One', 'RTA'
        ]

        # Step 1: Prepare the primary sample for PPML estimation.
        print("Step 1: Preparing PPML sample with domestic trade...")
        ppml_sample = prepare_ppml_sample(main_df, country_level_df, core_vars)
        print("Step 1: Success. PPML sample created.")

        # Step 2: Prepare the sample for Logit estimation.
        print("Step 2: Preparing Logit sample (handling perfect prediction)...")
        logit_sample = prepare_logit_sample(ppml_sample, 'AnyTrade_Sector_Dummy')
        print("Step 2: Success. Logit sample created.")

        # Step 3: Create temporal subsamples from the main PPML sample.
        print("Step 3: Creating temporal subsamples...")
        # This specification should ideally come from the config dictionary.
        periods = {
            "Pre_WTO": (1980, 1994),
            "Early_WTO": (1995, 2008),
            "Post_Crisis": (2009, 2023)
        }
        temporal_subsamples = create_temporal_subsamples(ppml_sample, periods)
        print("Step 3: Success. Temporal subsamples created.")

        # --- Compile and Return All Samples ---
        analytical_samples = {
            'ppml_sample': ppml_sample,
            'logit_sample': logit_sample,
            'temporal_subsamples': temporal_subsamples
        }

        print("\nSUCCESS: All analytical samples have been prepared.")
        return analytical_samples

    except (ValueError, TypeError, KeyError) as e:
        print(f"\nERROR: Sample preparation pipeline failed.")
        raise e


In [None]:
# Task 9: Clustering and Standard Error Preparation.

# ==============================================================================
# Task 9, Step 1: Cluster Group Construction and Validation
# ==============================================================================

def validate_cluster_structure(
    df: pd.DataFrame,
    cluster_col: str,
    small_cluster_warning_frac: float = 0.05
) -> pd.DataFrame:
    """
    Validates the structure of clustering units in the panel data.

    This function checks that the specified clustering column exists and is of a
    suitable type. It then calculates the size of each cluster and issues a
    warning if a significant fraction of clusters are "small," which could
    potentially affect the reliability of cluster-robust standard errors.

    Args:
        df (pd.DataFrame): The analytical DataFrame.
        cluster_col (str): The name of the column that identifies the clusters
                           (e.g., 'DyadicPair').
        small_cluster_warning_frac (float): The fraction of clusters that can be
                                            "small" before a warning is issued.

    Returns:
        pd.DataFrame: The input DataFrame, passed through if validation succeeds.

    Raises:
        ValueError: If the cluster column is missing or contains null values.
        TypeError: If the cluster column is not of a suitable dtype.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    if cluster_col not in df.columns:
        raise ValueError(f"Cluster column '{cluster_col}' not found in DataFrame.")
    if df[cluster_col].isnull().any():
        raise ValueError(f"Cluster column '{cluster_col}' contains null values.")

    # --- Calculate Cluster Sizes ---
    # Use the efficient .groupby().size() method to count observations per cluster.
    cluster_sizes = df.groupby(cluster_col).size()

    # --- Validate Cluster Structure ---
    # Define a robust threshold for what constitutes a "small" cluster.
    # This heuristic is more data-driven than a fixed number.
    N = len(df)
    G = len(cluster_sizes)
    min_size_threshold = max(5, math.sqrt(N / G))

    # Identify clusters that fall below this threshold.
    small_clusters = cluster_sizes[cluster_sizes < min_size_threshold]

    # Check if the fraction of small clusters exceeds the warning threshold.
    if not small_clusters.empty and (len(small_clusters) / G > small_cluster_warning_frac):
        warnings.warn(
            f"{len(small_clusters)} of {G} clusters ({len(small_clusters)/G:.1%}) "
            f"have fewer than {min_size_threshold:.1f} observations. "
            f"Asymptotic properties of cluster-robust standard errors may not hold."
        )

    return df


# ==============================================================================
# Task 9, Step 2: Bootstrap Sample Preparation
# ==============================================================================

def generate_bootstrap_samples(
    df: pd.DataFrame,
    cluster_col: str,
    n_replications: int,
    seed: int
) -> Iterator[pd.DataFrame]:
    """
    Generates bootstrap samples using cluster-based resampling.

    This function is a generator that yields a specified number of bootstrap
    samples. Crucially, it performs a cluster bootstrap, meaning it resamples
    the clusters (e.g., dyadic pairs) with replacement, preserving the within-
    cluster correlation structure of the panel data. This is the methodologically
    correct way to bootstrap standard errors for clustered data.

    Args:
        df (pd.DataFrame): The analytical DataFrame.
        cluster_col (str): The column identifying the clusters to resample.
        n_replications (int): The number of bootstrap samples to generate.
        seed (int): A seed for the random number generator for reproducibility.

    Yields:
        Iterator[pd.DataFrame]: A generator that yields one bootstrap sample
                                DataFrame at a time.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    if cluster_col not in df.columns:
        raise ValueError(f"Cluster column '{cluster_col}' not found in DataFrame.")

    # --- Setup ---
    # Initialize a dedicated random number generator for reproducibility.
    rng = np.random.default_rng(seed)
    # Get the array of unique cluster identifiers.
    unique_clusters = df[cluster_col].unique()
    n_clusters = len(unique_clusters)

    # --- Generation Loop ---
    for _ in range(n_replications):
        # --- Resample Clusters ---
        # Resample the CLUSTER IDENTIFIERS with replacement.
        resampled_clusters_ids = rng.choice(
            unique_clusters, size=n_clusters, replace=True
        )

        # --- Construct Bootstrap Sample ---
        # An efficient way to build the sample is to create a temporary DataFrame
        # of the resampled cluster IDs and merge it back to the original data.
        # This is often faster than concatenating many small groups in a loop.
        resampled_df_list = []
        # Create a Series from the resampled IDs to count occurrences of each cluster.
        resampled_counts = pd.Series(resampled_clusters_ids).value_counts()

        # Get the data for each unique cluster that was selected.
        # Grouping once is more efficient than multiple selections.
        grouped_data = df.groupby(cluster_col)
        for cluster_id, count in resampled_counts.items():
            # Append the data for the cluster `count` times.
            resampled_df_list.extend([grouped_data.get_group(cluster_id)] * count)

        # Concatenate all the pieces to form the final bootstrap sample.
        bootstrap_sample = pd.concat(resampled_df_list)

        # Yield the complete bootstrap sample.
        yield bootstrap_sample


# ==============================================================================
# Task 9, Step 3: Split-Panel Jackknife Setup
# ==============================================================================

def generate_split_panel_jackknife_samples(
    df: pd.DataFrame,
    cluster_col: str,
    n_groups: int
) -> Iterator[pd.DataFrame]:
    """
    Generates samples for the split-panel jackknife procedure.

    This function is a generator that implements the data-splitting step of the
    split-panel jackknife estimator (Hinz, Stammann, and Wanner, 2021). It
    divides the clusters (dyads) into a specified number of groups and then,
    for each group, yields a DataFrame containing all data *except* for the
    observations belonging to that group.

    Args:
        df (pd.DataFrame): The analytical DataFrame.
        cluster_col (str): The column identifying the clusters.
        n_groups (int): The number of groups (G) to split the clusters into.

    Yields:
        Iterator[pd.DataFrame]: A generator that yields one jackknife sample
                                DataFrame at a time.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    if cluster_col not in df.columns:
        raise ValueError(f"Cluster column '{cluster_col}' not found in DataFrame.")
    if not (isinstance(n_groups, int) and n_groups > 1):
        raise ValueError("'n_groups' must be an integer greater than 1.")

    # --- Setup ---
    # Get the unique cluster identifiers and sort them for deterministic grouping.
    unique_clusters = np.sort(df[cluster_col].unique())

    # Check if there are enough clusters to form the requested number of groups.
    if n_groups > len(unique_clusters):
        raise ValueError(f"Cannot create {n_groups} groups from only {len(unique_clusters)} clusters.")

    # Split the sorted cluster IDs into G approximately equal-sized groups.
    cluster_groups = np.array_split(unique_clusters, n_groups)

    # --- Generation Loop ---
    # Iterate through each group of clusters to be deleted.
    for g in range(n_groups):
        # Identify the clusters to be dropped for this jackknife replication.
        clusters_to_drop = cluster_groups[g]

        # Use a boolean mask with .isin() for an efficient filtering operation.
        # This selects all rows where the cluster ID is NOT in the drop list.
        jackknife_sample = df[~df[cluster_col].isin(clusters_to_drop)].copy()

        # Yield the resulting jackknife sample.
        yield jackknife_sample


# ==============================================================================
# Task 9: Master Orchestrator Function
# ==============================================================================

def run_inference_preparation(
    df: pd.DataFrame,
    cluster_col: str,
    n_bootstrap: int,
    n_jackknife_groups: int,
    seed: int
) -> Dict[str, Any]:
    """
    Orchestrates the preparation of data structures for advanced inference.

    This master function executes the three core steps for Task 9:
    1. Validates the cluster structure of the analytical data.
    2. Creates a generator for producing cluster-bootstrap samples.
    3. Creates a generator for producing split-panel jackknife samples.

    It returns the validated DataFrame and the generators, which can then be
    consumed by the estimation functions in subsequent tasks.

    Args:
        df (pd.DataFrame): The final, analysis-ready DataFrame.
        cluster_col (str): The name of the column identifying clusters.
        n_bootstrap (int): The number of bootstrap replications to prepare for.
        n_jackknife_groups (int): The number of groups for the split-panel jackknife.
        seed (int): A random seed for the bootstrap generator's reproducibility.

    Returns:
        Dict[str, Any]: A dictionary containing:
                        - 'validated_df': The input DataFrame after passing cluster validation.
                        - 'bootstrap_generator': An iterator for bootstrap samples.
                        - 'jackknife_generator': An iterator for jackknife samples.
    """
    try:
        # Step 1: Validate the cluster structure.
        print("Step 1: Validating cluster structure...")
        validated_df = validate_cluster_structure(df, cluster_col)
        print("Step 1: Success. Cluster structure is valid.")

        # Step 2: Create the bootstrap sample generator.
        print(f"Step 2: Preparing generator for {n_bootstrap} bootstrap samples...")
        bootstrap_generator = generate_bootstrap_samples(
            validated_df, cluster_col, n_bootstrap, seed
        )
        print("Step 2: Success. Bootstrap generator created.")

        # Step 3: Create the split-panel jackknife sample generator.
        print(f"Step 3: Preparing generator for {n_jackknife_groups} jackknife groups...")
        jackknife_generator = generate_split_panel_jackknife_samples(
            validated_df, cluster_col, n_jackknife_groups
        )
        print("Step 3: Success. Jackknife generator created.")

        # --- Compile and Return Results ---
        inference_preparations = {
            'validated_df': validated_df,
            'bootstrap_generator': bootstrap_generator,
            'jackknife_generator': jackknife_generator
        }

        print("\nSUCCESS: All preparations for inference are complete.")
        return inference_preparations

    except (ValueError, TypeError) as e:
        print(f"\nERROR: Inference preparation pipeline failed.")
        raise e


In [None]:
# Task 10: PPML Gravity Model Estimation Implementation

# ==============================================================================
# Task 10, Step 1: Construct PPML Specification Formula
# ==============================================================================

def construct_gravity_model_formula(
    dependent_var: str,
    regressors: List[str],
    fixed_effects: List[str],
    interaction_vars: List[str],
    interaction_term: str
) -> str:
    """
    Constructs a model specification formula for use with pyfixest.

    This function programmatically builds a formula string that precisely
    specifies the structural gravity model. It handles the critical requirement
    that political distance variables only apply to international, not domestic,
    trade flows by interacting them with an international trade dummy.

    Args:
        dependent_var (str): The name of the dependent variable.
        regressors (List[str]): A list of main regressor variable names that
                                apply to all observations.
        fixed_effects (List[str]): A list of categorical variables to be used
                                   as high-dimensional fixed effects.
        interaction_vars (List[str]): A list of variables (e.g., political
                                      distance) that should be interacted with
                                      the international dummy.
        interaction_term (str): The name of the binary variable that indicates
                                international trade (1) vs. domestic trade (0).

    Returns:
        str: A complete model formula string in patsy/fixest syntax.
    """
    # --- 1. Input Validation ---
    if not all(isinstance(arg, list) for arg in [regressors, fixed_effects, interaction_vars]):
        raise TypeError("Regressors, fixed_effects, and interaction_vars must be lists of strings.")

    # --- 2. Construct Regressor Part of the Formula ---
    # Combine main regressors with the interacted terms.
    # The syntax 'A:B' creates the interaction between A and B.
    interacted_part = [f"{interaction_term}:{var}" for var in interaction_vars]
    all_regressors = regressors + interacted_part
    regressor_str = " + ".join(all_regressors)

    # --- 3. Construct Fixed Effects Part of the Formula ---
    fe_str = " + ".join(fixed_effects)

    # --- 4. Assemble the Final Formula ---
    # The full formula combines dependent var, regressors, and fixed effects,
    # separated by '~' and '|'.
    formula = f"{dependent_var} ~ {regressor_str} | {fe_str}"

    return formula


# ==============================================================================
# Task 10, Step 2 & 3: Execute PPML Estimation with Cluster-Robust SEs
# ==============================================================================

def estimate_ppml_gravity_model(
    df: pd.DataFrame,
    formula: str,
    cluster_col: str
) -> Any:
    """
    Estimates a PPML gravity model with high-dimensional fixed effects.

    This function uses the `pyfixest` library, which is highly optimized for
    estimating models with multiple fixed effects. It specifies a Poisson
    model (PPML) and computes cluster-robust standard errors in a single,
    efficient call.

    Args:
        df (pd.DataFrame): The analytical DataFrame, prepared for estimation.
        formula (str): The model specification formula from the previous step.
        cluster_col (str): The name of the column to cluster standard errors on.

    Returns:
        Fixest: A fitted model object from pyfixest. This object contains
                coefficients, standard errors, p-values, and other
                estimation diagnostics.

    Raises:
        ValueError: If any variables in the formula are not in the DataFrame.
        RuntimeError: If the estimation algorithm fails to converge.
    """
    # --- 1. Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    # pyfixest internally checks for missing columns, so we don't need to repeat it.

    try:
        # --- 2. Run Estimation ---
        # The feols function is the core of pyfixest.
        # `family='poisson'` specifies the PPML estimator.
        # `vcov={'CRV1': cluster_col}` specifies one-way cluster-robust
        # standard errors, clustered by the given column. pyfixest handles
        # the sandwich estimation and finite-sample corrections internally.
        model_fit = feols(
            fml=formula,
            data=df,
            family='poisson',
            vcov={'CRV1': cluster_col}
        )

        # Check for convergence (pyfixest stores this in the object).
        if not model_fit._is_converged:
             raise RuntimeError("PPML estimation failed to converge.")

        return model_fit

    except Exception as e:
        # Catch any other errors during estimation.
        print(f"An error occurred during PPML estimation: {e}")
        raise


# ==============================================================================
# Task 10: Master Orchestrator Function
# ==============================================================================

def run_ppml_estimation_pipeline(
    df: pd.DataFrame,
    spec: Dict[str, Any]
) -> Any:
    """
    Orchestrates the entire PPML gravity model estimation pipeline.

    This master function prepares the data and formula, runs the estimation,
    and returns the final fitted model object.

    Args:
        df (pd.DataFrame): The fully prepared analytical DataFrame.
        spec (Dict[str, Any]): A dictionary defining the model specification,
                               containing keys like 'dependent_var', 'regressors',
                               'fixed_effects', etc.

    Returns:
        Fixest: The final fitted model object from pyfixest.
    """
    try:
        # --- 1. Prepare Data for Formula ---
        # Create the international trade dummy required for the formula.
        df_est = df.copy()
        df_est['is_international'] = (df_est['ExporterISO'] != df_est['ImporterISO']).astype(int)

        # --- 2. Construct the Model Formula ---
        print("Step 1: Constructing model formula...")
        formula = construct_gravity_model_formula(
            dependent_var=spec['dependent_var'],
            regressors=spec['main_regressors'],
            fixed_effects=spec['fixed_effects'],
            interaction_vars=spec['interaction_regressors'],
            interaction_term='is_international'
        )
        print(f"Step 1: Success. Formula created:\n{formula}")

        # --- 3. Estimate the Model ---
        print("Step 2: Estimating PPML model with high-dimensional fixed effects...")
        model_results = estimate_ppml_gravity_model(
            df=df_est,
            formula=formula,
            cluster_col=spec['cluster_col']
        )
        print("Step 2: Success. Estimation complete.")

        print("\nSUCCESS: PPML estimation pipeline complete.")
        return model_results

    except (ValueError, TypeError, KeyError, RuntimeError) as e:
        print(f"\nERROR: PPML estimation pipeline failed.")
        raise e


In [None]:
# Task 11: Logit Model Estimation for Extensive Margin

# ==============================================================================
# Task 11, Step 2: Implement Split-Panel Jackknife Estimation
# ==============================================================================

def estimate_logit_split_panel_jackknife(
    df: pd.DataFrame,
    formula: str,
    jackknife_generator: Iterator[pd.DataFrame]
) -> Tuple[pd.Series, Any]:
    """
    Estimates a Logit model using the split-panel jackknife for bias correction.

    This function implements the bias correction procedure from Hinz, Stammann,
    and Wanner (2021) for non-linear fixed effects models. It first estimates
    the model on the full sample, then iteratively re-estimates it on samples
    where one group of clusters has been removed. The final coefficients are
    bias-corrected based on these estimates.

    Args:
        df (pd.DataFrame): The analytical sample prepared for Logit estimation.
        formula (str): The model specification in pyfixest format.
        jackknife_generator (Iterator[pd.DataFrame]): A generator that yields
            the jackknife subsamples (from Task 9).

    Returns:
        Tuple[pd.Series, Any]: A tuple containing:
            - The bias-corrected coefficient estimates as a pandas Series.
            - The unfitted model object from the full-sample estimation for diagnostics.
    """
    # --- 1. Full Sample Estimation ---
    # Estimate the model on the complete dataset to get the initial,
    # potentially biased coefficients (beta_full).
    full_model_fit = feglm(fml=formula, data=df, family='logit')
    beta_full = full_model_fit.coef()

    # --- 2. Jackknife Loop for Sub-Sample Estimations ---
    # Collect the coefficient estimates from each jackknife replication.
    beta_jackknife_list = []
    n_groups = 0
    for jackknife_sample in jackknife_generator:
        # Estimate the model on the subsample where one group is deleted.
        sub_model_fit = feglm(fml=formula, data=jackknife_sample, family='logit')
        beta_jackknife_list.append(sub_model_fit.coef())
        n_groups += 1

    # --- 3. Bias Correction Calculation ---
    # Concatenate the list of coefficient series into a DataFrame.
    beta_jackknife_df = pd.concat(beta_jackknife_list, axis=1)

    # Equation: beta_BC = beta_full - (G-1)/G * sum(beta_{-g} - beta_full)
    # Calculate the mean of the jackknife estimates.
    mean_beta_jackknife = beta_jackknife_df.mean(axis=1)
    # Calculate the bias term.
    bias_term = (n_groups - 1) * (mean_beta_jackknife - beta_full)
    # Compute the bias-corrected coefficients.
    beta_bias_corrected = beta_full - bias_term

    return beta_bias_corrected, full_model_fit


# ==============================================================================
# Task 11, Step 3: Bootstrap Standard Error Computation
# ==============================================================================

def _bootstrap_replication_worker(
    bootstrap_sample: pd.DataFrame,
    formula: str,
    cluster_col: str,
    n_jackknife_groups: int
) -> pd.Series:
    """
    A helper function to run one full bootstrap replication.

    This function takes a single bootstrap sample and runs the entire
    split-panel jackknife estimation procedure on it. It is designed to be
    called in parallel by the main bootstrap function.

    Args:
        bootstrap_sample (pd.DataFrame): A single bootstrap sample.
        formula (str): The model specification formula.
        cluster_col (str): The name of the clustering column.
        n_jackknife_groups (int): The number of groups for the jackknife.

    Returns:
        pd.Series: The bias-corrected coefficient estimates for this replication.
    """
    try:
        # Create a jackknife generator for this specific bootstrap sample.
        jackknife_gen = generate_split_panel_jackknife_samples(
            bootstrap_sample, cluster_col, n_jackknife_groups
        )
        # Run the jackknife estimation procedure.
        beta_bc, _ = estimate_logit_split_panel_jackknife(
            bootstrap_sample, formula, jackknife_gen
        )
        return beta_bc
    except Exception:
        # In case of a convergence failure on a specific bootstrap sample,
        # return NaNs. This is a robust way to handle problematic resamples.
        return pd.Series(dtype=np.float64)


def compute_logit_bootstrap_standard_errors(
    df: pd.DataFrame,
    formula: str,
    bootstrap_generator: Iterator[pd.DataFrame],
    cluster_col: str,
    n_jackknife_groups: int,
    n_jobs: int = -1
) -> pd.Series:
    """
    Computes standard errors for the Logit model using a bootstrap of the
    entire split-panel jackknife procedure.

    This function implements the full, computationally intensive inference
    procedure. For each bootstrap sample, it re-runs the entire split-panel
    jackknife estimation to get a set of bias-corrected coefficients. The
    standard deviation of these coefficients across all bootstrap replications
    is the bootstrap standard error. The process is parallelized for efficiency.

    Args:
        df (pd.DataFrame): The analytical sample for Logit estimation.
        formula (str): The model specification formula.
        bootstrap_generator (Iterator[pd.DataFrame]): A generator for bootstrap samples.
        cluster_col (str): The name of the clustering column.
        n_jackknife_groups (int): The number of groups for the jackknife.
        n_jobs (int): The number of CPU cores to use for parallelization.
                      -1 means use all available cores.

    Returns:
        pd.Series: The bootstrap standard errors for the coefficients.
    """
    # Use joblib.Parallel to distribute the bootstrap replications across CPU cores.
    # This is essential for making this computationally demanding task feasible.
    with Parallel(n_jobs=n_jobs) as parallel:
        # The `delayed` function creates a tuple of (function, args, kwargs)
        # for each task to be executed in parallel.
        results = parallel(
            delayed(_bootstrap_replication_worker)(
                sample, formula, cluster_col, n_jackknife_groups
            ) for sample in bootstrap_generator
        )

    # Concatenate the list of coefficient Series into a single DataFrame.
    # Each column represents a bootstrap replication.
    bootstrap_coefs_df = pd.concat(results, axis=1)

    # Drop any replications that failed to converge (resulted in NaNs).
    n_failures = bootstrap_coefs_df.isnull().any(axis=0).sum()
    if n_failures > 0:
        warnings.warn(
            f"{n_failures} of {len(results)} bootstrap replications failed to "
            "converge and were dropped."
        )
        bootstrap_coefs_df.dropna(axis=1, inplace=True)

    # The bootstrap standard error is the standard deviation across replications.
    # ddof=1 provides the sample standard deviation.
    standard_errors = bootstrap_coefs_df.std(axis=1, ddof=1)
    standard_errors.name = "BootstrapSE"

    return standard_errors


# ==============================================================================
# Task 11: Master Orchestrator Function
# ==============================================================================

def run_logit_estimation_pipeline(
    df: pd.DataFrame,
    spec: Dict[str, Any],
    constants: 'ProjectConstants',
    seed: int
) -> Dict[str, pd.Series]:
    """
    Orchestrates the entire Logit model estimation and inference pipeline.

    This master function executes the full, advanced procedure for the extensive
    margin analysis:
    1. Prepares the data and generators for jackknife and bootstrap.
    2. Estimates the model coefficients using the split-panel jackknife for bias correction.
    3. Computes valid standard errors using a bootstrap of the entire jackknife procedure.

    Args:
        df (pd.DataFrame): The analytical sample prepared for Logit estimation.
        spec (Dict[str, Any]): A dictionary defining the model specification.
        constants (ProjectConstants): The dataclass of project constants.
        seed (int): A random seed for the bootstrap generator.

    Returns:
        Dict[str, pd.Series]: A dictionary containing the final results:
                              - 'coefficients': The bias-corrected coefficients.
                              - 'standard_errors': The bootstrap standard errors.
    """
    try:
        # --- 1. Preparation ---
        print("Step 1: Preparing data and generators for Logit estimation...")
        # The formula construction function from Task 10 can be reused.
        formula = construct_gravity_model_formula(
            dependent_var=spec['dependent_var'],
            regressors=spec['main_regressors'],
            fixed_effects=spec['fixed_effects'],
            interaction_vars=spec['interaction_regressors'],
            interaction_term='is_international'
        )
        # Create the necessary sample generators from Task 9.
        n_jackknife_groups = 50 # A reasonable default
        jackknife_gen = generate_split_panel_jackknife_samples(
            df, spec['cluster_col'], n_jackknife_groups
        )
        bootstrap_gen = generate_bootstrap_samples(
            df, spec['cluster_col'], constants.N_BOOTSTRAP, seed
        )
        print("Step 1: Success. Formula and generators are ready.")

        # --- 2. Point Estimation (Split-Panel Jackknife) ---
        print(f"Step 2: Estimating coefficients via split-panel jackknife ({n_jackknife_groups} groups)...")
        coefficients, _ = estimate_logit_split_panel_jackknife(df, formula, jackknife_gen)
        print("Step 2: Success. Bias-corrected coefficients estimated.")

        # --- 3. Inference (Bootstrap) ---
        print(f"Step 3: Computing standard errors via bootstrap ({constants.N_BOOTSTRAP} replications)...")
        standard_errors = compute_logit_bootstrap_standard_errors(
            df, formula, bootstrap_gen, spec['cluster_col'], n_jackknife_groups
        )
        print("Step 3: Success. Bootstrap standard errors computed.")

        # --- 4. Compile and Return Results ---
        results = {
            'coefficients': coefficients,
            'standard_errors': standard_errors
        }

        print("\nSUCCESS: Logit estimation pipeline complete.")
        return results

    except Exception as e:
        print(f"\nERROR: Logit estimation pipeline failed.")
        raise e


In [None]:
# Task 12: Model Diagnosis and Specification Testing

# ==============================================================================
# Task 12, Step 1: PPML Model Diagnostics
# ==============================================================================

def diagnose_ppml_model(
    ppml_results: Fixest,
    ppml_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Performs diagnostic checks on a fitted PPML (Poisson) model.

    This function assesses the validity and properties of the PPML estimation by:
    1. Verifying that the estimation algorithm converged.
    2. Calculating a dispersion parameter (phi) to test for overdispersion,
       a common issue in count data models.

    Args:
        ppml_results (Fixest): The fitted model object from pyfixest.
        ppml_df (pd.DataFrame): The DataFrame used for the PPML estimation.

    Returns:
        Dict[str, Any]: A dictionary containing key diagnostic statistics.
    """
    # --- 1. Input Validation ---
    if not isinstance(ppml_results, Fixest):
        raise TypeError("Input 'ppml_results' must be a pyfixest Fixest object.")
    if not isinstance(ppml_df, pd.DataFrame):
        raise TypeError("Input 'ppml_df' must be a pandas DataFrame.")

    diagnostics = {}

    # --- 2. Check Convergence Status ---
    # The _is_converged attribute is a boolean flag set by the pyfixest engine.
    diagnostics['is_converged'] = ppml_results._is_converged

    # --- 3. Test for Overdispersion ---
    # Extract the dependent variable (y) and fitted values (mu_hat).
    y_true = ppml_results.dependent_var.values.flatten()
    mu_hat = ppml_results.predict(newdata=ppml_df)

    # Calculate the number of observations and estimated parameters.
    n_obs = ppml_results.nobs
    # Note: nparams is the number of regression coefficients, not including
    # the absorbed fixed effects. This is a standard approximation for this test.
    k_params = ppml_results.nparams

    # Calculate residual degrees of freedom.
    rdf = n_obs - k_params
    diagnostics['residual_degrees_of_freedom'] = rdf

    # Equation: Pearson Chi-Squared = sum( (y_i - mu_hat_i)^2 / mu_hat_i )
    # Calculate the Pearson chi-squared statistic. A small epsilon is added
    # to the denominator for numerical stability in case of mu_hat = 0.
    pearson_chi2 = np.sum((y_true - mu_hat)**2 / (mu_hat + 1e-8))
    diagnostics['pearson_chi2'] = pearson_chi2

    # Equation: Dispersion (phi) = Pearson Chi-Squared / (N - K)
    # A value >> 1 suggests overdispersion.
    if rdf > 0:
        dispersion_phi = pearson_chi2 / rdf
        diagnostics['dispersion_phi'] = dispersion_phi
    else:
        diagnostics['dispersion_phi'] = np.nan

    return diagnostics


# ==============================================================================
# Task 12, Step 2: Logit Model Diagnostics
# ==============================================================================

def diagnose_logit_model(
    logit_full_sample_results: Fixest
) -> Dict[str, Any]:
    """
    Performs diagnostic checks on a fitted high-dimensional fixed effects Logit model.

    Standard goodness-of-fit tests are often invalid with many fixed effects.
    This function focuses on two key, informative diagnostics:
    1. Convergence status, which is a strong indicator of potential separation issues.
    2. McFadden's Pseudo R-squared, which measures the improvement in model fit
       over a null model with only an intercept.

    Args:
        logit_full_sample_results (Fixest): The fitted model object from the
            full-sample Logit estimation (before jackknifing).

    Returns:
        Dict[str, Any]: A dictionary containing key diagnostic statistics.
    """
    # --- 1. Input Validation ---
    if not isinstance(logit_full_sample_results, Fixest):
        raise TypeError("Input must be a pyfixest Fixest object from a Logit model.")

    diagnostics = {}

    # --- 2. Check Convergence Status (for Separation) ---
    # For Logit models, failure to converge often indicates quasi-complete
    # separation, even after filtering for perfect prediction.
    diagnostics['is_converged'] = logit_full_sample_results._is_converged

    # --- 3. Calculate McFadden's Pseudo R-squared ---
    # Get the log-likelihood of the fully specified model.
    logL_full = logit_full_sample_results.loglik()

    # Calculate the log-likelihood of a null (intercept-only) model.
    y = logit_full_sample_results.dependent_var.values.flatten()
    p_mean = np.mean(y)
    logL_null = np.sum(y * np.log(p_mean) + (1 - y) * np.log(1 - p_mean))

    # Equation: R^2_McFadden = 1 - (logL_full / logL_null)
    if logL_null != 0:
        pseudo_r2_mcfadden = 1 - (logL_full / logL_null)
        diagnostics['pseudo_r2_mcfadden'] = pseudo_r2_mcfadden
    else:
        diagnostics['pseudo_r2_mcfadden'] = np.nan

    return diagnostics


# ==============================================================================
# Task 12, Step 3: Cross-Model Consistency Validation
# ==============================================================================

def validate_cross_model_consistency(
    ppml_results: Fixest,
    logit_results: Dict[str, pd.Series]
) -> pd.DataFrame:
    """
    Compares key results between the PPML and Logit models for consistency.

    This function creates a summary table comparing the estimated coefficients,
    standard errors, and p-values for variables common to both models. It also
    adds a check for sign consistency, which is a key indicator of a robust
    underlying relationship, even if coefficient magnitudes are not directly
    comparable.

    Args:
        ppml_results (Fixest): The fitted model object from the PPML estimation.
        logit_results (Dict[str, pd.Series]): The results dictionary from the
            Logit estimation pipeline, containing 'coefficients' and
            'standard_errors'.

    Returns:
        pd.DataFrame: A DataFrame indexed by regressor name, summarizing and
                      comparing the results from the two models.
    """
    # --- 1. Extract Results from Model Objects ---
    ppml_summary = ppml_results.summary().reset_index().rename(columns={'index': 'Variable'})
    logit_coefs = logit_results['coefficients'].rename('Coefficient_Logit')
    logit_ses = logit_results['standard_errors'].rename('StdError_Logit')

    # Calculate Logit z-scores and p-values.
    logit_z = logit_coefs / logit_ses
    logit_p = (2 * (1 - norm.cdf(np.abs(logit_z)))).rename('PValue_Logit')

    # Combine Logit results into a single DataFrame.
    logit_summary = pd.concat([logit_coefs, logit_ses, logit_p], axis=1).reset_index().rename(columns={'index': 'Variable'})

    # --- 2. Merge Results for Comparison ---
    # Perform an inner merge to keep only the variables present in both models.
    comparison_df = pd.merge(
        ppml_summary[['Variable', 'Estimate', 'Std. Error', 'Pr(>|t|)']],
        logit_summary,
        on='Variable',
        how='inner'
    ).rename(columns={
        'Estimate': 'Coefficient_PPML',
        'Std. Error': 'StdError_PPML',
        'Pr(>|t|)': 'PValue_PPML'
    }).set_index('Variable')

    # --- 3. Check for Sign Consistency ---
    # A robust finding should have the same sign in both models.
    comparison_df['Sign_Consistent'] = (
        np.sign(comparison_df['Coefficient_PPML']) == np.sign(comparison_df['Coefficient_Logit'])
    )

    return comparison_df


# ==============================================================================
# Task 12: Master Orchestrator Function
# ==============================================================================

def run_diagnostics_pipeline(
    ppml_results: Fixest,
    ppml_df: pd.DataFrame,
    logit_full_sample_results: Fixest,
    logit_final_results: Dict[str, pd.Series]
) -> Dict[str, Any]:
    """
    Orchestrates the entire model diagnosis and specification testing pipeline.

    Args:
        ppml_results (Fixest): The fitted PPML model object.
        ppml_df (pd.DataFrame): The DataFrame used for the PPML estimation.
        logit_full_sample_results (Fixest): The full-sample Logit model fit,
            used for diagnostics before jackknifing.
        logit_final_results (Dict[str, pd.Series]): The final bias-corrected
            and bootstrapped results from the Logit pipeline.

    Returns:
        Dict[str, Any]: A dictionary containing the diagnostic reports for
                        each model and the cross-model consistency check.
    """
    try:
        # Step 1: Run diagnostics on the PPML model.
        print("Step 1: Running diagnostics on PPML model...")
        ppml_diagnostics = diagnose_ppml_model(ppml_results, ppml_df)
        print("Step 1: Success. PPML diagnostics complete.")

        # Step 2: Run diagnostics on the Logit model.
        print("Step 2: Running diagnostics on Logit model...")
        logit_diagnostics = diagnose_logit_model(logit_full_sample_results)
        print("Step 2: Success. Logit diagnostics complete.")

        # Step 3: Perform cross-model consistency validation.
        print("Step 3: Performing cross-model consistency validation...")
        consistency_report = validate_cross_model_consistency(
            ppml_results, logit_final_results
        )
        print("Step 3: Success. Cross-model consistency validated.")

        # --- Compile and Return All Reports ---
        master_report = {
            'ppml_diagnostics': ppml_diagnostics,
            'logit_diagnostics': logit_diagnostics,
            'consistency_report': consistency_report
        }

        print("\nSUCCESS: Diagnostics pipeline complete.")
        return master_report

    except Exception as e:
        print(f"\nERROR: Diagnostics pipeline failed.")
        raise e


In [None]:
# Task 13: Delta Method Implementation for Nonlinear Effect Sizes

# ==============================================================================
# Task 13, Step 1: Calculate Political Distance Standard Deviations
# ==============================================================================

def calculate_pd_standard_deviations(
    df: pd.DataFrame,
    pd_variables: List[str]
) -> Dict[str, float]:
    """
    Calculates the sample standard deviation for specified political distance variables.

    Args:
        df (pd.DataFrame): The analytical DataFrame.
        pd_variables (List[str]): A list of the political distance column names
                                  for which to calculate the standard deviation.

    Returns:
        Dict[str, float]: A dictionary mapping variable names to their standard deviation.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame.")
    if not all(col in df.columns for col in pd_variables):
        raise ValueError("One or more specified pd_variables not in DataFrame.")

    # --- Calculation ---
    # Use a dictionary comprehension for a concise implementation.
    # .std(ddof=1) correctly calculates the sample standard deviation,
    # automatically ignoring NaN values.
    std_devs = {col: df[col].std(ddof=1) for col in pd_variables}

    return std_devs


# ==============================================================================
# Task 13, Step 2: Compute Marginal Effects for PPML Models
# ==============================================================================

def compute_ppml_marginal_effects(
    ppml_results: Fixest,
    std_dev: float,
    main_pd_term: str,
    interaction_scenarios: Dict[str, List[str]]
) -> pd.DataFrame:
    """
    Computes one-std-deviation marginal effects and standard errors for a PPML model.

    This function calculates the percentage change in the dependent variable for a
    one-standard-deviation increase in political distance under different
    institutional scenarios (e.g., non-WTO members vs. WTO members). It uses the
    Delta Method to calculate the standard errors for these non-linear transformations.

    Args:
        ppml_results (Fixest): The fitted model object from the PPML estimation.
        std_dev (float): The standard deviation of the political distance variable.
        main_pd_term (str): The name of the main political distance regressor.
        interaction_scenarios (Dict[str, List[str]]): A dictionary defining the
            scenarios. Keys are scenario names (e.g., "Both WTO Members"), and
            values are lists of additional interaction term coefficients to
            include for that scenario.

    Returns:
        pd.DataFrame: A DataFrame indexed by scenario, with columns for the
                      point estimate ('EffectPct') and its standard error ('StdError').
    """
    # --- 1. Extract Model Components ---
    # Get the coefficient estimates as a Series.
    beta = ppml_results.coef()
    # Get the variance-covariance matrix as a DataFrame.
    vcov = ppml_results.vcov()
    # Ensure consistent ordering.
    vcov = vcov.loc[beta.index, beta.index]

    results = {}
    # --- 2. Iterate Through Scenarios ---
    for scenario_name, interaction_terms in interaction_scenarios.items():
        # --- 3. Calculate Point Estimate ---
        # Identify all relevant coefficient names for this scenario.
        relevant_terms = [main_pd_term] + interaction_terms
        # Calculate the combined linear effect (c' * beta).
        linear_effect = beta[relevant_terms].sum()

        # Equation: Effect = 100 * (exp(linear_effect * std_dev) - 1)
        # This is the percentage change for a one-standard-deviation shock.
        point_estimate = 100 * (np.exp(linear_effect * std_dev) - 1)

        # --- 4. Calculate Standard Error via Delta Method ---
        # Create the selection vector 'c' for the gradient.
        c_vector = pd.Series(0, index=beta.index)
        c_vector[relevant_terms] = 1

        # Equation: grad(g(beta)) = 100 * exp(c' * beta * std_dev) * std_dev * c
        # Calculate the gradient of the transformation function w.r.t. beta.
        gradient = 100 * np.exp(linear_effect * std_dev) * std_dev * c_vector.values

        # Equation: Var(g(beta)) = grad' * VCV(beta) * grad
        # Calculate the variance of the transformed effect.
        effect_variance = gradient.T @ vcov.values @ gradient
        # The standard error is the square root of the variance.
        std_error = np.sqrt(effect_variance)

        # Store the results.
        results[scenario_name] = {'EffectPct': point_estimate, 'StdError': std_error}

    # --- 5. Format and Return Output ---
    return pd.DataFrame.from_dict(results, orient='index')


# ==============================================================================
# Task 13, Step 3: Compute Marginal Effects for Logit Models
# ==============================================================================

def compute_logit_marginal_effects(
    df: pd.DataFrame,
    formula: str,
    logit_results: Dict[str, Any],
    std_dev: float,
    main_pd_term: str,
    interaction_scenarios: Dict[str, List[str]]
) -> pd.DataFrame:
    """
    Computes average marginal effects (AMEs) for a Logit model using the
    full bootstrap distribution for inference.

    This function calculates the average change in probability for a one-std-dev
    increase in political distance. Instead of the Delta Method, it robustly
    calculates the standard error by re-computing the AME for each bootstrap
    replication of the coefficients, then taking the standard deviation of that
    distribution.

    Args:
        df (pd.DataFrame): The analytical DataFrame used for the Logit estimation.
        formula (str): The model specification formula.
        logit_results (Dict[str, Any]): The results dictionary from the Logit
            pipeline, containing 'coefficients' and 'bootstrap_coefs_df'.
        std_dev (float): The standard deviation of the political distance variable.
        main_pd_term (str): The name of the main political distance regressor.
        interaction_scenarios (Dict[str, List[str]]): A dictionary defining the
            scenarios for which to calculate effects.

    Returns:
        pd.DataFrame: A DataFrame indexed by scenario, with columns for the
                      point estimate ('AME_pp') and its bootstrap standard error ('StdError').
    """
    # --- 1. Prepare Design Matrix ---
    # Use pyfixest's internal tools to create the design matrix from the formula.
    # This correctly handles all formula syntax.
    Y, X, _, _, _, _ = model_matrix_fixest(formula, df, family='logit')

    # --- 2. Define Helper for AME Calculation ---
    def calculate_ame(beta_vector: pd.Series) -> Dict[str, float]:
        """Calculates the AME for all scenarios given one coefficient vector."""
        # Calculate the linear prediction for all observations.
        eta = X @ beta_vector
        # Calculate the PDF of the logistic distribution at each prediction.
        pdf_eta = logistic.pdf(eta)

        ames = {}
        for name, terms in interaction_scenarios.items():
            # Get the relevant sum of coefficients for this scenario.
            beta_sum = beta_vector[[main_pd_term] + terms].sum()
            # Calculate the marginal effect for each observation.
            marginal_effects = pdf_eta * beta_sum
            # The AME is the mean of these individual effects.
            # Multiply by std_dev to get the effect in percentage points (pp).
            ames[name] = np.mean(marginal_effects) * std_dev * 100
        return ames

    # --- 3. Calculate Point Estimate (AME) ---
    # Calculate the AME using the main, bias-corrected coefficient estimates.
    point_estimates = calculate_ame(logit_results['coefficients'])

    # --- 4. Calculate Standard Errors (Bootstrap) ---
    # Get the DataFrame of bootstrap coefficients (each column is a replication).
    bootstrap_coefs = logit_results['bootstrap_coefs_df']
    # Apply the AME calculation to each column (each bootstrap replication).
    bootstrap_ames = bootstrap_coefs.apply(calculate_ame, axis=0)
    # The result is a DataFrame where rows are scenarios and columns are replications.
    # The standard error is the standard deviation across the rows.
    std_errors = bootstrap_ames.std(axis=1, ddof=1)

    # --- 5. Format and Return Output ---
    results_df = pd.DataFrame.from_dict(point_estimates, orient='index', columns=['AME_pp'])
    results_df['StdError'] = std_errors

    return results_df


In [None]:
# Task 14: Institutional Moderation Effect Quantification

# ==============================================================================
# Task 14, Step 1: WTO Attenuation Effect Quantification
# ==============================================================================

def quantify_wto_attenuation_effect(
    model_results: Fixest,
    std_dev: float,
    main_pd_term: str,
    wto_interaction_term: str
) -> Dict[str, float]:
    """
    Quantifies the WTO's attenuation effect and its standard error via the Delta Method.

    This function provides a complete and rigorous calculation of one of the
    paper's key findings: the extent to which WTO membership mitigates the
    negative impact of political distance on trade. It calculates:
    1. The marginal effect for non-members (baseline).
    2. The marginal effect for WTO members.
    3. The attenuation effect, which is the difference between (1) and (2).
    4. The standard error of this difference, calculated precisely using the
       multivariate Delta Method and the model's full variance-covariance matrix.

    Args:
        model_results (Fixest): The fitted model object from pyfixest.
        std_dev (float): The standard deviation of the political distance variable.
        main_pd_term (str): The name of the main political distance regressor
                            (e.g., 'is_international:PD_IHS').
        wto_interaction_term (str): The name of the interaction term between
                                    political distance and WTO membership
                                    (e.g., 'is_international:PD_IHS:GATTWTO_Both').

    Returns:
        Dict[str, float]: A dictionary containing the point estimate of the
                          attenuation effect ('Attenuation_Effect_Pct') and its
                          standard error ('Attenuation_StdError').
    """
    # --- 1. Input Validation ---
    if not isinstance(model_results, Fixest):
        raise TypeError("Input 'model_results' must be a pyfixest Fixest object.")
    required_terms = [main_pd_term, wto_interaction_term]
    if not all(term in model_results.coef().index for term in required_terms):
        raise ValueError("One or more specified terms not found in model coefficients.")

    # --- 2. Extract Model Components ---
    # Get the coefficient estimates as a pandas Series.
    beta = model_results.coef()
    # Get the full variance-covariance matrix.
    vcov = model_results.vcov()
    # Ensure the VCV matrix is aligned with the coefficient vector.
    vcov = vcov.loc[beta.index, beta.index]

    # --- 3. Define Scenarios and Selection Vectors ---
    # Create a zero vector 'c' for selecting coefficients.
    c_base = pd.Series(0.0, index=beta.index)
    # For the baseline (non-WTO) scenario, only the main PD term is active.
    c_base[main_pd_term] = 1.0

    # For the WTO scenario, both the main term and the interaction term are active.
    c_wto = c_base.copy()
    c_wto[wto_interaction_term] = 1.0

    # --- 4. Calculate Point Estimates for Each Effect ---
    # Calculate the linear combination of coefficients for the baseline.
    lin_eff_base = c_base.T @ beta
    # Calculate the linear combination for the WTO scenario.
    lin_eff_wto = c_wto.T @ beta

    # Equation for percentage effect: g(beta) = 100 * (exp(c' * beta * std_dev) - 1)
    # Calculate the non-linear percentage effect for the baseline.
    effect_base = 100 * (np.exp(lin_eff_base * std_dev) - 1)
    # Calculate the non-linear percentage effect for the WTO scenario.
    effect_wto = 100 * (np.exp(lin_eff_wto * std_dev) - 1)

    # The attenuation is the (negative) baseline effect minus the (less negative) WTO effect.
    attenuation_estimate = effect_base - effect_wto

    # --- 5. Calculate Standard Error of the Difference via Delta Method ---
    # The function of interest is g_diff = g_base(beta) - g_wto(beta).
    # The gradient of this difference is grad(g_base) - grad(g_wto).

    # Equation for gradient: grad(g(beta)) = 100 * exp(c' * beta * std_dev) * std_dev * c
    # Calculate the gradient vector for the baseline effect.
    grad_base = 100 * np.exp(lin_eff_base * std_dev) * std_dev * c_base.values
    # Calculate the gradient vector for the WTO effect.
    grad_wto = 100 * np.exp(lin_eff_wto * std_dev) * std_dev * c_wto.values

    # The gradient of the difference is the difference of the gradients.
    grad_diff = grad_base - grad_wto

    # Equation for variance: Var(g_diff) = grad_diff' * VCV(beta) * grad_diff
    # Calculate the variance of the attenuation effect using the full VCV matrix.
    attenuation_variance = grad_diff.T @ vcov.values @ grad_diff

    # The standard error is the square root of the variance.
    attenuation_se = np.sqrt(attenuation_variance)

    # Return the final, fully calculated results.
    return {
        'Attenuation_Effect_Pct': attenuation_estimate,
        'Attenuation_StdError': attenuation_se
    }


# ==============================================================================
# Task 14, Step 2 & 3: Hypothesis Testing Function
# ==============================================================================

def test_institutional_effect_equality(
    model_results: Fixest,
    coef1_name: str,
    coef2_name: str
) -> Dict[str, float]:
    """
    Tests the equality of two institutional interaction coefficients using a Wald test.

    This function performs a formal statistical test of the null hypothesis that
    two coefficients are equal (e.g., H0: beta_1 - beta_2 = 0). It constructs
    the appropriate restriction vector and computes the Wald statistic, which
    is compared against a Chi-squared distribution to obtain a p-value.

    Args:
        model_results (Fixest): The fitted model object from pyfixest.
        coef1_name (str): The name of the first coefficient in the hypothesis.
        coef2_name (str): The name of the second coefficient to be subtracted.

    Returns:
        Dict[str, float]: A dictionary containing the Wald statistic and its p-value.
    """
    # --- 1. Input Validation ---
    if not isinstance(model_results, Fixest):
        raise TypeError("Input 'model_results' must be a pyfixest Fixest object.")
    if not all(c in model_results.coef().index for c in [coef1_name, coef2_name]):
        raise ValueError("One or both specified coefficients not found in model results.")

    # --- 2. Extract Model Components ---
    # Get the coefficient estimates and the variance-covariance matrix.
    beta = model_results.coef()
    vcov = model_results.vcov()

    # --- 3. Construct Restriction Vector (R) ---
    # The vector R defines the linear combination for the test: R * beta = beta_1 - beta_2.
    R = pd.Series(0.0, index=beta.index)
    R[coef1_name] = 1.0
    R[coef2_name] = -1.0

    # --- 4. Calculate Wald Statistic ---
    # Equation: W = (R*beta)' * [R * VCV(beta) * R']^-1 * (R*beta)
    # For a single restriction (1xK vector R), this simplifies.
    r_beta = R.T @ beta
    r_vcov_r = R.T @ vcov @ R

    # Ensure the denominator is not zero to avoid division errors.
    if r_vcov_r <= 1e-12:
        return {'wald_statistic': np.nan, 'p_value': np.nan}

    wald_stat = (r_beta**2) / r_vcov_r

    # --- 5. Calculate p-value ---
    # The Wald statistic for a single restriction follows a Chi-squared
    # distribution with 1 degree of freedom under the null hypothesis.
    p_value = 1.0 - chi2.cdf(wald_stat, df=1)

    return {'wald_statistic': wald_stat, 'p_value': p_value}


def test_joint_governance_effects(
    model_results: Fixest,
    governance_interaction_terms: List[str]
) -> pd.DataFrame:
    """
    Performs an F-test for the joint significance of governance interaction terms.

    This function tests the null hypothesis that all specified governance-related
    interaction coefficients are jointly equal to zero. It leverages the
    built-in `.f_test()` method of the pyfixest results object for a robust
    and convenient implementation that correctly handles the model's degrees of freedom.

    Args:
        model_results (Fixest): The fitted model object from pyfixest.
        governance_interaction_terms (List[str]): A list of the governance
            interaction coefficients to be jointly tested against zero.

    Returns:
        pd.DataFrame: A DataFrame summarizing the F-test results, including
                      the F-statistic, p-value, and degrees of freedom, as
                      returned by pyfixest.
    """
    # --- 1. Input Validation ---
    if not isinstance(model_results, Fixest):
        raise TypeError("Input 'model_results' must be a pyfixest Fixest object.")
    if not all(term in model_results.coef().index for term in governance_interaction_terms):
        missing = set(governance_interaction_terms) - set(model_results.coef().index)
        raise ValueError(f"Governance terms not found in model coefficients: {missing}")

    # --- 2. Perform F-test ---
    # The pyfixest `.f_test()` method is the most direct and robust way to
    # perform this joint hypothesis test. It correctly uses the model's
    # degrees of freedom and VCV matrix.
    f_test_results = model_results.f_test(governance_interaction_terms)

    return f_test_results


# ==============================================================================
# Task 14: Master Orchestrator Function
# ==============================================================================

def run_institutional_quantification_pipeline(
    model_results: Fixest,
    std_dev: float,
    spec: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the quantification and testing of institutional effects.

    This master function executes the three core steps for Task 14:
    1. Quantifies the WTO attenuation effect and its standard error via Delta Method.
    2. Tests for equality between the WTO and RTA moderation effects via Wald test.
    3. Tests for the joint significance of governance channel effects via F-test.

    Args:
        model_results (Fixest): The fitted PPML or Logit model object.
        std_dev (float): The standard deviation of the relevant political distance variable.
        spec (Dict[str, Any]): The model specification dictionary, containing
                               lists of variable names for testing.

    Returns:
        Dict[str, Any]: A dictionary containing the results of all quantification
                        and hypothesis testing steps.
    """
    try:
        # Step 1: Quantify the WTO attenuation effect.
        print("Step 1: Quantifying WTO attenuation effect...")
        wto_attenuation = quantify_wto_attenuation_effect(
            model_results,
            std_dev,
            main_pd_term=spec['main_pd_term'],
            wto_interaction_term=spec['wto_interaction_term']
        )
        print("Step 1: Success. WTO attenuation quantified.")

        # Step 2: Test equality of WTO and RTA effects.
        print("Step 2: Testing equality of WTO vs RTA moderation effects...")
        equality_test = test_institutional_effect_equality(
            model_results,
            coef1_name=spec['wto_interaction_term'],
            coef2_name=spec['rta_interaction_term']
        )
        print("Step 2: Success. Equality test complete.")

        # Step 3: Test joint significance of governance effects.
        print("Step 3: Testing joint significance of governance channels...")
        governance_test = test_joint_governance_effects(
            model_results,
            governance_interaction_terms=spec['governance_interaction_terms']
        )
        print("Step 3: Success. Joint governance test complete.")

        # --- Compile and Return All Results ---
        quantification_report = {
            'wto_attenuation': wto_attenuation,
            'wto_vs_rta_equality_test': equality_test,
            'joint_governance_test': governance_test
        }

        print("\nSUCCESS: Institutional effect quantification pipeline complete.")
        return quantification_report

    except Exception as e:
        print(f"\nERROR: Institutional quantification pipeline failed.")
        raise e


In [None]:
# Task 15: Temporal Heterogeneity Analysis

# ==============================================================================
# Task 15, Step 1 & 2: Period-Specific Effect Calculation
# ==============================================================================

def calculate_period_specific_effects(
    temporal_subsamples: Dict[str, pd.DataFrame],
    pd_variable: str,
    spec: Dict[str, Any],
    interaction_scenarios: Dict[str, List[str]]
) -> pd.DataFrame:
    """
    Estimates the model and calculates marginal effects for each temporal subsample.

    This function iterates through a dictionary of pre-defined time periods,
    re-runs the entire PPML estimation pipeline on each subsample, and then
    calculates the period-specific marginal effects and their confidence intervals.

    Args:
        temporal_subsamples (Dict[str, pd.DataFrame]): A dictionary of DataFrames,
            where each key is a period label and each value is the data for that period.
        pd_variable (str): The name of the political distance variable.
        spec (Dict[str, Any]): The model specification dictionary for estimation.
        interaction_scenarios (Dict[str, List[str]]): The scenarios for which
            to calculate marginal effects.

    Returns:
        pd.DataFrame: A DataFrame with a MultiIndex (Period, Scenario) summarizing
                      the marginal effect, standard error, and 95% confidence
                      interval for each scenario in each time period.
    """
    # --- 1. Initialize Storage ---
    all_period_effects = []

    # --- 2. Iterate Through Each Time Period ---
    for period_label, subsample_df in temporal_subsamples.items():
        print(f"--- Analyzing Period: {period_label} ---")

        # --- 3. Re-estimate Model on Subsample ---
        # Run the full estimation pipeline on the data for this period only.
        period_model_results = run_ppml_estimation_pipeline(subsample_df, spec)

        # --- 4. Calculate Period-Specific Standard Deviation ---
        # The variance of the regressor can change over time, so it must be
        # recalculated for each subsample for accurate marginal effects.
        period_std_devs = calculate_pd_standard_deviations(subsample_df, [pd_variable])
        period_std_dev = period_std_devs[pd_variable]

        # --- 5. Calculate Period-Specific Marginal Effects ---
        # Use the function from Task 13 with the period-specific results.
        period_effects = compute_ppml_marginal_effects(
            period_model_results,
            period_std_dev,
            spec['main_pd_term'],
            interaction_scenarios
        )

        # --- 6. Calculate Confidence Intervals ---
        # Calculate the lower and upper bounds of the 95% CI.
        period_effects['CI_Lower'] = period_effects['EffectPct'] - 1.96 * period_effects['StdError']
        period_effects['CI_Upper'] = period_effects['EffectPct'] + 1.96 * period_effects['StdError']

        # Add the period label for aggregation.
        period_effects['Period'] = period_label
        all_period_effects.append(period_effects)

    # --- 7. Aggregate and Format Results ---
    # Concatenate all results into a single DataFrame.
    final_results_df = pd.concat(all_period_effects)
    # Set a descriptive MultiIndex.
    final_results_df = final_results_df.reset_index().rename(
        columns={'index': 'Scenario'}
    ).set_index(['Period', 'Scenario'])

    return final_results_df


# ==============================================================================
# Task 15, Step 3: Test for Structural Break in Institutional Effect
# ==============================================================================

def test_temporal_stability_of_wto_effect(
    df: pd.DataFrame,
    spec: Dict[str, Any],
    break_year: int
) -> pd.Series:
    """
    Tests for a structural break in the WTO attenuation effect using a pooled
    regression with a triple-interaction term.

    This is the statistically rigorous method for testing if the moderating
    effect of the WTO on political distance changed significantly after a
    certain point in time.

    Args:
        df (pd.DataFrame): The full analytical DataFrame.
        spec (Dict[str, Any]): The base model specification.
        break_year (int): The year that defines the start of the "post" period.

    Returns:
        pd.Series: The coefficient, standard error, and p-value for the
                   triple-interaction term, which represents the change in the
                   WTO effect.
    """
    # --- 1. Prepare Data for Pooled Regression ---
    df_pooled = df.copy()
    # Create a dummy variable for the post-break period.
    post_period_dummy = f"post_{break_year}"
    df_pooled[post_period_dummy] = (df_pooled['Year'] >= break_year).astype(int)

    # --- 2. Construct the Triple-Interaction Term Name ---
    # This term captures the change in the WTO interaction effect in the post period.
    wto_interaction_term = spec['wto_interaction_term']
    triple_interaction_term = f"{post_period_dummy}:{wto_interaction_term}"

    # --- 3. Create the Pooled Model Specification ---
    # Start with the base specification.
    pooled_spec = spec.copy()
    # Add the post-period dummy, the original interaction interacted with the
    # dummy, and the new triple-interaction term to the list of regressors.
    # Note: pyfixest handles the creation of the main effects automatically.
    pooled_spec['interaction_regressors'] = spec['interaction_regressors'] + [
        f"{post_period_dummy}:{spec['main_pd_term']}",
        triple_interaction_term
    ]
    # Also add the post-period dummy as a main effect.
    pooled_spec['main_regressors'] = spec['main_regressors'] + [post_period_dummy]

    # --- 4. Estimate the Pooled Model ---
    # Run the full PPML pipeline with the new, expanded specification.
    pooled_model_results = run_ppml_estimation_pipeline(df_pooled, pooled_spec)

    # --- 5. Extract and Return the Key Coefficient ---
    # The coefficient on the triple-interaction term is the result of interest.
    # It directly measures the change in the WTO's effect.
    summary = pooled_model_results.summary()
    structural_break_result = summary.loc[triple_interaction_term]

    return structural_break_result


# ==============================================================================
# Task 15: Master Orchestrator Function
# ==============================================================================

def run_temporal_analysis_pipeline(
    temporal_subsamples: Dict[str, pd.DataFrame],
    full_df: pd.DataFrame,
    spec: Dict[str, Any],
    interaction_scenarios: Dict[str, List[str]],
    break_year: int = 2009
) -> Dict[str, Any]:
    """
    Orchestrates the entire temporal heterogeneity analysis.

    This master function performs two types of temporal analysis:
    1. It estimates the model separately on each sub-period to calculate
       period-specific marginal effects.
    2. It runs a formal statistical test for a structural break in the key
       institutional effect using a pooled regression model.

    Args:
        temporal_subsamples (Dict[str, pd.DataFrame]): Dictionary of subsample DataFrames.
        full_df (pd.DataFrame): The complete analytical DataFrame for the pooled model.
        spec (Dict[str, Any]): The model specification dictionary.
        interaction_scenarios (Dict[str, List[str]]): Scenarios for marginal effects.
        break_year (int): The year to test for a structural break.

    Returns:
        Dict[str, Any]: A dictionary containing:
            - 'period_specific_effects': A DataFrame of effects for each period.
            - 'structural_break_test': The results of the formal test for a
              change in the WTO effect.
    """
    try:
        # Step 1 & 2: Calculate period-specific effects and CIs.
        print("Step 1: Calculating period-specific marginal effects...")
        # Assume the main PD variable is the first in the interaction list.
        pd_var = spec['interaction_regressors'][0]
        period_effects = calculate_period_specific_effects(
            temporal_subsamples, pd_var, spec, interaction_scenarios
        )
        print("Step 1: Success. Period-specific effects calculated.")

        # Step 3: Test for a structural break.
        print("Step 2: Testing for structural break in WTO effect...")
        break_test_results = test_temporal_stability_of_wto_effect(
            full_df, spec, break_year
        )
        print("Step 2: Success. Structural break test complete.")

        # --- Compile and Return All Results ---
        temporal_report = {
            'period_specific_effects': period_effects,
            'structural_break_test': break_test_results
        }

        print("\nSUCCESS: Temporal heterogeneity analysis complete.")
        return temporal_report

    except Exception as e:
        print(f"\nERROR: Temporal analysis pipeline failed.")
        raise e


In [None]:
# Task 16: Create Master Robustness Orchestrator Function

# ==============================================================================
# Task 16: Worker Functions for Specific Robustness Checks
# ==============================================================================

def _apply_row_filter(df: pd.DataFrame, query: str) -> pd.DataFrame:
    """
    Applies a filter to a DataFrame's rows using a pandas query string.

    This is a simple, single-purpose worker function designed to be called by
    the robustness orchestrator. It takes a DataFrame and a query string,
    applies the filter, and returns a new DataFrame containing only the rows
    that satisfy the query condition.

    Args:
        df (pd.DataFrame): The input DataFrame to be filtered.
        query (str): A valid pandas query string (e.g., "Country == 'USA'").

    Returns:
        pd.DataFrame: A new DataFrame containing the filtered rows. A copy is
                      returned to prevent unintended side effects on the
                      original DataFrame.
    """
    # Use the pandas .query() method, which is highly efficient and readable
    # for complex boolean filtering operations.
    # A .copy() is made to ensure the returned DataFrame is independent of the
    # original, preventing slice-warning issues in downstream modifications.
    return df.query(query).copy()


def _apply_spec_change(
    spec: Dict[str, Any],
    var_map: Dict[str, str]
) -> Dict[str, Any]:
    """
    Modifies a model specification dictionary by replacing variable names.

    This worker function is designed to be called by the robustness orchestrator
    to facilitate robustness checks that involve using alternative variables
    (e.g., swapping one measure of political distance for another). It iterates
    through the lists of regressors in the specification and replaces any
    variable found in the `var_map` key with its corresponding value.

    Args:
        spec (Dict[str, Any]): The original model specification dictionary.
        var_map (Dict[str, str]): A dictionary mapping old variable names (keys)
                                  to new variable names (values).

    Returns:
        Dict[str, Any]: A new, deep-copied specification dictionary with the
                        variable names updated according to the var_map.
    """
    # Create a deep copy of the specification to ensure the original is not
    # mutated, allowing it to be reused for other robustness checks.
    spec_new = deepcopy(spec)

    # Iterate through the keys in the specification that contain lists of regressors.
    for key in ['main_regressors', 'interaction_regressors']:
        # Use a list comprehension to build the new list of regressors.
        # For each variable, `var_map.get(var, var)` will return the new name
        # if the variable is in the map, otherwise it returns the original name.
        spec_new[key] = [var_map.get(var, var) for var in spec_new[key]]

    # It is also crucial to update the 'main_pd_term' if it is one of the
    # variables being swapped, as this is used in downstream effect calculations.
    spec_new['main_pd_term'] = var_map.get(
        spec_new['main_pd_term'],
        spec_new['main_pd_term']
    )

    # Return the modified specification dictionary.
    return spec_new


# A registry (dictionary) that maps the string identifiers for robustness check
# types to the actual worker functions that implement them. This is the core of
# the dispatcher pattern, allowing the orchestrator to be easily extended with
# new types of robustness checks without modifying its core logic.
ROBUSTNESS_WORKERS: Dict[str, Callable] = {
    # Maps the 'filter_rows' check type to its corresponding worker function.
    'filter_rows': _apply_row_filter,
    # Maps the 'change_spec' check type to its corresponding worker function.
    'change_spec': _apply_spec_change,
}

# ==============================================================================
# Task 16: Main Orchestrator Function
# ==============================================================================

def _run_single_robustness_check(
    check_name: str,
    check_config: Dict[str, Any],
    base_df: pd.DataFrame,
    base_spec: Dict[str, Any]
) -> Tuple[str, Any]:
    """
    Worker function to execute a single, complete robustness check.

    This function is designed to be called in parallel. It takes the configuration
    for one check, applies the necessary modifications to the data or specification,
    runs the estimation pipeline, and returns the results.

    Args:
        check_name (str): The name of the robustness check.
        check_config (Dict[str, Any]): The configuration for this specific check.
        base_df (pd.DataFrame): The baseline analytical DataFrame.
        base_spec (Dict[str, Any]): The baseline model specification.

    Returns:
        Tuple[str, Any]: A tuple of the check name and the resulting model object.
    """
    print(f"--- Starting robustness check: {check_name} ---")

    # --- 1. Apply the specified modification ---
    check_type = check_config['type']
    check_params = check_config['params']

    # Start with copies of the baseline data and spec.
    modified_df = base_df
    modified_spec = base_spec

    # Dispatch to the correct worker function based on the check type.
    if check_type == 'filter_rows':
        modified_df = _apply_row_filter(base_df, **check_params)
    elif check_type == 'change_spec':
        modified_spec = _apply_spec_change(base_spec, **check_params)
    else:
        # If a new check type is added, it must have a worker in the registry.
        raise ValueError(f"Unknown robustness check type: {check_type}")

    # --- 2. Re-run the estimation pipeline ---
    try:
        # Use the (potentially modified) data and spec to run the estimation.
        model_results = run_ppml_estimation_pipeline(modified_df, modified_spec)
        print(f"--- Successfully completed robustness check: {check_name} ---")
        return (check_name, model_results)
    except Exception as e:
        # If a specific check fails (e.g., due to convergence on a smaller
        # sample), report it and return None for the result.
        print(f"--- FAILED robustness check: {check_name}. Reason: {e} ---")
        return (check_name, None)


def run_robustness_checks_pipeline(
    base_df: pd.DataFrame,
    base_spec: Dict[str, Any],
    robustness_config: Dict[str, Dict[str, Any]],
    n_jobs: int = -1
) -> Dict[str, Fixest]:
    """
    Orchestrates the execution of a suite of robustness checks in parallel.

    This master function manages the entire robustness analysis phase. It takes a
    baseline DataFrame and model specification, along with a configuration
    dictionary defining all robustness checks. It then dispatches each check
    to a worker function that runs in parallel, collecting and returning all results.

    This dispatcher pattern is highly modular and extensible. To add a new type
    of robustness check, one only needs to write a small worker function and
    add it to the `ROBUSTNESS_WORKERS` registry.

    Args:
        base_df (pd.DataFrame): The baseline, fully prepared analytical DataFrame.
        base_spec (Dict[str, Any]): The baseline model specification dictionary.
        robustness_config (Dict[str, Dict[str, Any]]): A dictionary defining all
            robustness checks to be performed.
        n_jobs (int): The number of CPU cores to use for parallel execution.
                      -1 means use all available cores.

    Returns:
        Dict[str, Fixest]: A dictionary where keys are the names of the
                           robustness checks and values are the fitted `Fixest`
                           model objects. Failed checks will have a value of None.
    """
    # --- 1. Input Validation ---
    if not isinstance(robustness_config, dict):
        raise TypeError("Robustness configuration must be a dictionary.")

    # --- 2. Parallel Execution of Robustness Checks ---
    # Use joblib.Parallel for efficient, cross-platform parallel processing.
    with Parallel(n_jobs=n_jobs) as parallel:
        # The `delayed` function wraps the worker function and its arguments.
        # joblib handles distributing these tasks to the worker pool.
        results_list = parallel(
            delayed(_run_single_robustness_check)(
                check_name, check_config, base_df, base_spec
            ) for check_name, check_config in robustness_config.items()
        )

    # --- 3. Aggregate and Return Results ---
    # Convert the list of (name, result) tuples back into a dictionary.
    final_results = {name: result for name, result in results_list}

    print("\nSUCCESS: All robustness checks have been executed.")
    return final_results


In [None]:
# Task 17: Sample Restriction Robustness Tests

# ==============================================================================
# Task 17: New, Specialized Worker Functions for the Robustness Framework
# ==============================================================================

def _apply_quantile_trim(
    df: pd.DataFrame,
    column: str,
    lower_quantile: float,
    upper_quantile: float
) -> pd.DataFrame:
    """
    Filters a DataFrame by trimming the tails of a specified column's distribution.

    This specialized worker function is for robustness checks that involve
    removing outliers based on quantiles (e.g., trimming the top and bottom 1%
    of the political distance distribution).

    Args:
        df (pd.DataFrame): The input DataFrame.
        column (str): The name of the column to trim.
        lower_quantile (float): The lower quantile boundary (e.g., 0.01 for 1%).
        upper_quantile (float): The upper quantile boundary (e.g., 0.99 for 99%).

    Returns:
        pd.DataFrame: The filtered DataFrame with the outliers removed.
    """
    # Calculate the lower and upper bounds from the data itself.
    lower_bound = df[column].quantile(lower_quantile)
    upper_bound = df[column].quantile(upper_quantile)

    # Apply the filter using the calculated bounds.
    return df[df[column].between(lower_bound, upper_bound)].copy()


def _apply_event_density_filter(
    df: pd.DataFrame,
    min_obs_threshold: int
) -> pd.DataFrame:
    """
    Filters dyadic pairs based on a minimum threshold of non-zero event observations.

    This worker function re-uses the logic from Task 4 to allow for robustness
    checks with alternative event density thresholds.

    Args:
        df (pd.DataFrame): The input DataFrame.
        min_obs_threshold (int): The minimum number of months with non-zero
                                 event counts for a dyad to be retained.

    Returns:
        pd.DataFrame: The filtered DataFrame.
    """
    # This logic is identical to the function `filter_dyads_by_event_density`
    # from Task 4, refactored here as a worker.
    has_events = df['GDELT_EventCount'] > 0
    event_density = has_events.groupby(df['DyadicPair']).transform('sum')
    return df[event_density >= min_obs_threshold].copy()


# --- Extend the Worker Registry from Task 16 with the new functions ---
# This step is crucial for making the new check types available to the orchestrator.
# In a real application, this would be defined in a single, shared location.
ROBUSTNESS_WORKERS: Dict[str, Callable] = {
    'filter_rows': _apply_row_filter,
    'change_spec': _apply_spec_change,
    'trim_quantiles': _apply_quantile_trim,
    'filter_event_density': _apply_event_density_filter,
}


# ==============================================================================
# Task 17: Configuration Definition Function
# ==============================================================================

def define_sample_restriction_checks(
    pd_var_for_trimming: str,
    robustness_min_obs: int
) -> Dict[str, Dict[str, Any]]:
    """
    Defines the configuration dictionary for all sample restriction robustness checks.

    This function encapsulates the specification of the robustness checks as a
    data structure. This configuration is then passed to the master robustness
    orchestrator (from Task 16) to be executed. This approach separates the
    *definition* of the checks from their *execution*, making the analysis
    highly transparent, modular, and easy to modify.

    Args:
        pd_var_for_trimming (str): The name of the political distance variable
                                   that will be used for the quantile trimming check.
        robustness_min_obs (int): The stricter threshold for the high event
                                  density check.

    Returns:
        Dict[str, Dict[str, Any]]: The configuration dictionary for the
                                   robustness orchestrator.
    """
    # --- Define the dictionary of robustness checks ---
    # Each key is a unique, descriptive name for the check.
    # Each value is a dictionary specifying the 'type' of check (which maps
    # to a worker function in the registry) and the 'params' for that worker.

    robustness_config = {

        # --- Geographic and Political Restrictions ---

        "Exclude China": {
            "type": "filter_rows",
            "params": {
                "query": "ExporterISO != 'CHN' and ImporterISO != 'CHN'"
            }
        },

        "Democratic Pairs Only": {
            "type": "filter_rows",
            "params": {
                "query": "DemocraticPair == 1"
            }
        },

        "Non-Conflict Pairs": {
            "type": "filter_rows",
            "params": {
                # This assumes a 'ConflictPair' dummy (0 or 1) has been
                # merged into the DataFrame from an external source like UCDP.
                "query": "ConflictPair == 0"
            }
        },

        # --- Data Quality Restrictions ---

        "Trim Political Distance Outliers": {
            "type": "trim_quantiles",
            "params": {
                "column": pd_var_for_trimming,
                "lower_quantile": 0.01,
                "upper_quantile": 0.99
            }
        },

        "High Event Density Only": {
            "type": "filter_event_density",
            "params": {
                "min_obs_threshold": robustness_min_obs
            }
        }
    }

    return robustness_config

# ==============================================================================
# Task 17: Master Orchestrator Function (Illustrative Usage)
# ==============================================================================

def run_sample_restriction_robustness_pipeline(
    base_df: pd.DataFrame,
    base_spec: Dict[str, Any],
    pd_var_for_trimming: str,
    robustness_min_obs: int,
    n_jobs: int = -1
) -> Dict[str, Fixest]:
    """
    Orchestrates the execution of all sample restriction robustness checks.

    This function serves as the entry point for Task 17. It first calls the
    configuration function to define all the checks, and then passes this
    configuration to the master robustness orchestrator from Task 16 to
    execute the analysis in parallel.

    Args:
        base_df (pd.DataFrame): The baseline, fully prepared analytical DataFrame.
        base_spec (Dict[str, Any]): The baseline model specification dictionary.
        pd_var_for_trimming (str): The PD variable to use for the trimming check.
        robustness_min_obs (int): The stricter threshold for the density check.
        n_jobs (int): The number of CPU cores to use for parallel execution.

    Returns:
        Dict[str, Fixest]: A dictionary where keys are the names of the
                           robustness checks and values are the fitted `Fixest`
                           model objects.
    """
    # Step 1: Define the full suite of sample restriction checks.
    print("Step 1: Defining sample restriction robustness checks...")
    sample_restriction_config = define_sample_restriction_checks(
        pd_var_for_trimming,
        robustness_min_obs
    )
    print(f"Step 1: Success. Defined {len(sample_restriction_config)} checks.")

    # Step 2: Execute all defined checks using the master orchestrator.
    # This re-uses the powerful, parallelized framework from Task 16.
    print("Step 2: Executing all sample restriction checks in parallel...")
    # Note: `run_robustness_checks_pipeline` is the orchestrator from Task 16.
    # It is assumed to be available in the same scope.
    results = run_robustness_checks_pipeline(
        base_df=base_df,
        base_spec=base_spec,
        robustness_config=sample_restriction_config,
        n_jobs=n_jobs
    )

    print("\nSUCCESS: Sample restriction robustness pipeline complete.")
    return results


In [None]:
# Task 18: Alternative Specification and Measurement Tests

# ==============================================================================
# Task 18: New, Specialized Worker Functions and Systematic Mapper
# ==============================================================================

def _apply_temporal_aggregation(
    df: pd.DataFrame,
    freq: str,
    agg_rules: Dict[str, str],
    gdelt_rule: Dict[str, str]
) -> pd.DataFrame:
    """
    Resamples the DataFrame to a lower frequency and prepares it for estimation.

    This worker handles robustness checks involving temporal aggregation (e.g.,
    moving from monthly to quarterly data). It applies specified aggregation
    rules and correctly handles the GDELT 'first-month' rule from the paper.

    Args:
        df (pd.DataFrame): The input DataFrame with a DatetimeIndex.
        freq (str): The target frequency string (e.g., 'Q' for quarterly).
        agg_rules (Dict[str, str]): A dictionary mapping column patterns to
                                    aggregation functions (e.g., {'_USD': 'sum'}).
        gdelt_rule (Dict[str, str]): The specific rule for the GDELT measure.

    Returns:
        pd.DataFrame: The resampled and aggregated DataFrame.
    """
    # Group by dyad to resample each time series independently.
    grouped = df.groupby('DyadicPair')
    resampled_list = []

    for name, group in grouped:
        # Apply the specific GDELT rule first.
        resampled_group = group.resample(freq).agg(gdelt_rule)

        # Identify other columns and apply general rules.
        for pattern, func in agg_rules.items():
            cols_to_agg = [c for c in group.columns if pattern in c and c not in gdelt_rule]
            if cols_to_agg:
                resampled_group[cols_to_agg] = group[cols_to_agg].resample(freq).agg(func)

        # For remaining columns (like dummies), forward-fill is a reasonable choice.
        resampled_group.fillna(method='ffill', inplace=True)
        resampled_group['DyadicPair'] = name
        resampled_list.append(resampled_group)

    return pd.concat(resampled_list).reset_index().set_index('DateTime')


def _create_systematic_var_map(
    regressor_list: List[str],
    simple_replace_map: Dict[str, str]
) -> Dict[str, str]:
    """
    Systematically creates a comprehensive variable map for specification changes.

    This helper function takes a simple one-to-one map (e.g., {'A': 'B'}) and
    applies it to a list of complex regressors (including interactions like 'C:A:D')
    to generate a full mapping for all affected terms.

    Args:
        regressor_list (List[str]): The list of all regressor names from the spec.
        simple_replace_map (Dict[str, str]): A simple dict of one-to-one replacements.

    Returns:
        Dict[str, str]: A complete map of old term -> new term.
    """
    full_var_map = {}
    for var in regressor_list:
        new_var = var
        # Perform replacements for all specified swaps.
        for old, new in simple_replace_map.items():
            # Use a robust replacement that respects interaction term boundaries.
            # This splits 'A:B', replaces 'A' with 'C' -> ['C', 'B'], then rejoins.
            components = new_var.split(':')
            new_components = [new if comp == old else comp for comp in components]
            new_var = ':'.join(new_components)

        # If a change was made, add it to the map.
        if new_var != var:
            full_var_map[var] = new_var

    return full_var_map


# --- Extend the Worker Registry from Task 16/17 with the new function ---
ROBUSTNESS_WORKERS: Dict[str, Callable] = {
    'filter_rows': _apply_row_filter,
    'change_spec': _apply_spec_change,
    'trim_quantiles': _apply_quantile_trim,
    'filter_event_density': _apply_event_density_filter,
    'resample_and_prepare': _apply_temporal_aggregation, # New worker
}


# ==============================================================================
# Task 18: Configuration Definition Function
# ==============================================================================

def define_spec_and_measurement_checks(
    base_spec: Dict[str, Any]
) -> Dict[str, Dict[str, Any]]:
    """
    Defines the configuration for alternative specification and measurement checks.

    This function programmatically generates the configuration dictionary that
    will drive the robustness orchestrator. It uses a systematic mapping
    function to ensure that all interaction terms are correctly updated when
    a core variable is swapped, eliminating potential for manual error.

    Args:
        base_spec (Dict[str, Any]): The baseline model specification dictionary.

    Returns:
        Dict[str, Dict[str, Any]]: The configuration dictionary for the
                                   robustness orchestrator.
    """
    # --- 1. Define the basic variable swaps ---
    # Define the swap for Political Distance measures.
    pd_swap = {'PD_GDELT_Filtered_IHS': 'PD_UNGA_IHS'}

    # Define the swap for Governance control variables.
    gov_swap = {
        'Corruption_IHS': 'WGI_RuleOfLaw_IHS',
        'Polity_IHS': 'WGI_VoiceAndAccountability_IHS'
    }

    # --- 2. Systematically generate the full variable maps ---
    # Get the full list of regressors from the base specification.
    all_regressors = base_spec['main_regressors'] + base_spec['interaction_regressors']

    # Create the complete map for the PD swap.
    pd_var_map = _create_systematic_var_map(all_regressors, pd_swap)
    # Also map the main PD term identifier used in downstream tasks.
    pd_var_map[base_spec['main_pd_term']] = base_spec['main_pd_term'].replace(
        list(pd_swap.keys())[0], list(pd_swap.values())[0]
    )

    # Create the complete map for the governance swap.
    gov_var_map = _create_systematic_var_map(all_regressors, gov_swap)

    # --- 3. Define the Configuration Dictionary ---
    robustness_config = {
        "Alternative PD Measure (UNGA)": {
            "type": "change_spec",
            "params": {"var_map": pd_var_map}
        },
        "Alternative Governance Controls (WGI)": {
            "type": "change_spec",
            "params": {"var_map": gov_var_map}
        },
        # This check now requires a new worker, but the config is clean.
        "Quarterly Temporal Aggregation": {
            "type": "resample_and_prepare",
            "params": {
                "freq": "Q",
                "agg_rules": {"_USD": "sum", "Count": "sum", "_IHS": "mean"},
                "gdelt_rule": {"GDELT_GoldsteinMean": "first"}
            }
        }
    }

    return robustness_config


# ==============================================================================
# Task 18: Master Orchestrator Function (Illustrative Usage)
# ==============================================================================

def run_spec_and_measurement_robustness_pipeline(
    base_df: pd.DataFrame,
    base_spec: Dict[str, Any],
    n_jobs: int = -1
) -> Dict[str, Fixest]:
    """
    Orchestrates the execution of all specification and measurement robustness checks.

    Args:
        base_df (pd.DataFrame): The baseline, fully prepared analytical DataFrame.
        base_spec (Dict[str, Any]): The baseline model specification dictionary.
        n_jobs (int): The number of CPU cores to use for parallel execution.

    Returns:
        Dict[str, Fixest]: A dictionary of the fitted `Fixest` model objects
                           for each robustness check.
    """
    # Step 1: Define the full suite of specification checks programmatically.
    print("Step 1: Defining specification and measurement robustness checks...")
    spec_config = define_spec_and_measurement_checks(base_spec)
    print(f"Step 1: Success. Defined {len(spec_config)} checks.")

    # Step 2: Execute all defined checks using the master orchestrator.
    print("Step 2: Executing all specification checks in parallel...")
    # Note: `run_robustness_checks_pipeline` is the orchestrator from Task 16.
    results = run_robustness_checks_pipeline(
        base_df=base_df,
        base_spec=base_spec,
        robustness_config=spec_config,
        n_jobs=n_jobs
    )

    print("\nSUCCESS: Specification and measurement robustness pipeline complete.")
    return results


In [None]:
# Robustness Analysis Orchestrator Function

# ==============================================================================
# Task 16-18: Consolidated Robustness Analysis Orchestrator
# ==============================================================================

def run_all_robustness_analyses(
    base_df: pd.DataFrame,
    base_spec: Dict[str, Any],
    constants: 'ProjectConstants',
    n_jobs: int = -1
) -> Dict[str, Fixest]:
    """
    Executes the complete suite of robustness analyses for the project.

    This high-level orchestrator serves as the single entry point for the entire
    robustness analysis phase (Phase 6). It systematically performs the following steps:
    1. Calls specialized functions to generate the configurations for both
       sample restriction checks (Task 17) and specification/measurement
       checks (Task 18).
    2. Merges these configurations into a single, comprehensive suite of tests,
       validating for non-overlapping check names.
    3. Passes this master configuration to the powerful, parallelized robustness
       execution engine (`run_robustness_checks_pipeline` from Task 16).

    This design is highly modular and extensible, separating the definition of
    robustness checks from their execution.

    Args:
        base_df (pd.DataFrame): The baseline, fully prepared analytical DataFrame.
        base_spec (Dict[str, Any]): The baseline model specification dictionary.
        constants (ProjectConstants): The dataclass of project constants, needed
                                      for defining some checks.
        n_jobs (int): The number of CPU cores to use for parallel execution.
                      -1 means use all available cores.

    Returns:
        Dict[str, Fixest]: A dictionary where keys are the descriptive names of
                           each robustness check and values are the fitted `Fixest`
                           model objects (or None if a check failed).
    """
    # --- 1. Input Validation ---
    # Basic type checking for the primary inputs.
    if not isinstance(base_df, pd.DataFrame):
        raise TypeError("base_df must be a pandas DataFrame.")
    if not isinstance(base_spec, dict):
        raise TypeError("base_spec must be a dictionary.")

    try:
        # --- 2. Generate Configurations for Each Type of Check ---
        print("Step 1: Defining all robustness check configurations...")

        # Generate the configuration for sample restriction checks from Task 17.
        sample_restriction_config = define_sample_restriction_checks(
            pd_var_for_trimming=base_spec['main_pd_term'],
            robustness_min_obs=constants.ROBUSTNESS_MIN_OBS
        )

        # Generate the configuration for specification & measurement checks from Task 18.
        spec_measurement_config = define_spec_and_measurement_checks(base_spec)

        print(
            f"Step 1: Success. Defined {len(sample_restriction_config)} sample restriction checks "
            f"and {len(spec_measurement_config)} specification checks."
        )

        # --- 3. Merge Configurations and Validate ---
        # Check for any overlapping keys, which would indicate a naming collision.
        common_keys = sample_restriction_config.keys() & spec_measurement_config.keys()
        if common_keys:
            raise ValueError(f"Duplicate robustness check names found: {common_keys}")

        # Merge the two dictionaries into a single master configuration.
        # The `|` operator is a clean way to merge dicts in Python 3.9+.
        master_robustness_config = sample_restriction_config | spec_measurement_config

        print(f"Total robustness checks to execute: {len(master_robustness_config)}")

        # --- 4. Execute All Checks via the Master Orchestrator ---
        # Call the powerful, parallelized pipeline from Task 16 to run everything.
        all_results = run_robustness_checks_pipeline(
            base_df=base_df,
            base_spec=base_spec,
            robustness_config=master_robustness_config,
            n_jobs=n_jobs
        )

        print("\nSUCCESS: All robustness analyses have been completed.")
        return all_results

    except Exception as e:
        # Catch any errors during the process for a clean failure.
        print(f"\nERROR: The main robustness analysis pipeline failed.")
        raise e


In [None]:
# Task 19: Comprehensive Results Table Construction

# ==============================================================================
# Task 19, Step 1 & 2: Main and Margin-Specific Results Tables
# ==============================================================================

class StoredLogitResults:
    """
    An adapter class to make custom Logit results compatible with modelsummary.

    The `modelsummary` library is designed to parse standard statistical model
    objects from libraries like `statsmodels` or `pyfixest`. This class serves
    as a lightweight wrapper around the custom-calculated Logit results (which
    are stored in a dictionary). It exposes the key estimation outputs (coefficients,
    standard errors, nobs) as attributes and provides a `pvalues` method,
    mimicking the interface that `modelsummary` expects. This allows for the
    seamless integration of our bespoke, computationally intensive Logit results
    into the same professional-grade tables as the more standard PPML results.

    Attributes:
        params (pd.Series): A Series of the bias-corrected coefficient estimates.
        bse (pd.Series): A Series of the bootstrapped standard errors.
        nobs (int): The number of observations used in the estimation.
        model_name (str): An optional attribute to set the column name in the table.
    """
    def __init__(self, logit_results: Dict[str, pd.Series], nobs: int):
        """
        Initializes the StoredLogitResults adapter.

        Args:
            logit_results (Dict[str, pd.Series]): A dictionary containing the
                final Logit estimation results, expecting keys 'coefficients'
                and 'standard_errors'.
            nobs (int): The number of observations used in the Logit estimation.

        Raises:
            KeyError: If the `logit_results` dictionary is missing required keys.
        """
        # --- Input Validation ---
        if 'coefficients' not in logit_results or 'standard_errors' not in logit_results:
            raise KeyError("logit_results dict must contain 'coefficients' and 'standard_errors'.")

        # --- Attribute Assignment ---
        # Assign the coefficient Series to the 'params' attribute, which is a
        # standard name that modelsummary looks for.
        self.params: pd.Series = logit_results['coefficients']

        # Assign the standard error Series to the 'bse' (beta standard error)
        # attribute, another standard name.
        self.bse: pd.Series = logit_results['standard_errors']

        # Assign the number of observations.
        self.nobs: int = nobs

        # Initialize a placeholder for the model name.
        self.model_name: str = "Model"

    def pvalues(self) -> pd.Series:
        """
        Computes and returns the p-values for the coefficient estimates.

        This method is explicitly required by `modelsummary` to generate
        significance stars. It calculates two-tailed p-values based on a
        standard normal (Z) distribution.

        Returns:
            pd.Series: A Series of p-values, indexed by the coefficient names.
        """
        # Calculate Z-scores by dividing the coefficients by their standard errors.
        z_scores = self.params / self.bse

        # Calculate the two-tailed p-value from the standard normal CDF.
        # p = 2 * (1 - CDF(|Z|))
        p_vals = 2 * (1 - norm.cdf(np.abs(z_scores)))

        return p_vals


def create_main_results_table(
    model_list: List[Any],
    model_names: List[str],
    title: str,
    coef_rename_map: Dict[str, str]
) -> pd.DataFrame:
    """
    Generates a publication-quality regression table using the modelsummary library.

    This function acts as a high-level wrapper around `modelsummary`, providing a
    standardized way to generate the main results tables for the project. It
    takes a list of fitted model objects and formats them into a clean, readable
    pandas DataFrame that resembles tables found in academic journals.

    Args:
        model_list (List[Any]): A list of fitted model objects. These can be
                                `pyfixest` objects or instances of the
                                `StoredLogitResults` adapter class.
        model_names (List[str]): A list of strings to be used as column headers
                                 for each model in the table.
        title (str): The title to be displayed above the regression table.
        coef_rename_map (Dict[str, str]): A dictionary to rename raw coefficient
                                          names (keys) to human-readable labels
                                          (values) for presentation.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the formatted regression table.

    Raises:
        ValueError: If the number of models does not match the number of model names.
    """
    # --- 1. Input Validation ---
    # Ensure that each model provided will have a corresponding name in the table.
    if len(model_list) != len(model_names):
        raise ValueError("Length of model_list must match length of model_names.")

    # --- 2. Define Table Formatting ---
    # Define a list of goodness-of-fit statistics to display at the bottom of the table.
    # `modelsummary` allows specifying the raw attribute name, a clean display name,
    # and a number format string. It will intelligently skip metrics not
    # found on a given model object.
    gof_metrics = [
        {"raw": "nobs", "clean": "Observations", "fmt": "{:,}"},
        {"raw": "rsquared", "clean": "R2", "fmt": "{:.3f}"},
        # The custom Logit adapter does not have an R2, but we can add a custom
        # attribute to it if we were to calculate a pseudo R2.
        # For now, modelsummary will just skip it for the Logit model.
    ]

    # --- 3. Prepare Models ---
    # Set the custom model names for the columns. `modelsummary` looks for a
    # `model_name` attribute on the passed objects.
    for model, name in zip(model_list, model_names):
        # This try-except block makes the function robust to model objects
        # that might not have a `model_name` attribute or are immutable.
        try:
            model.model_name = name
        except AttributeError:
            warnings.warn(f"Could not set model_name for model of type {type(model)}.")

    # --- 4. Generate Table with modelsummary ---
    # Call the main modelsummary function with the specified configuration.
    results_table = modelsummary.modelsummary(
        models=model_list,
        output='df',  # Specify DataFrame as the output format.
        title=title,
        stars=True,  # Automatically add significance stars (e.g., *, **, ***).
        coef_map=coef_rename_map,
        gof_map=gof_metrics,
        fmt="{:.3f}"  # Format all numeric estimates to 3 decimal places.
    )

    # Return the resulting DataFrame.
    return results_table

# ==============================================================================
# Task 19, Step 3: Robustness Results Summary Table
# ==============================================================================

def _format_coef_and_stars(coef: float, p_value: float) -> str:
    """
    A helper function to format a coefficient and its significance stars into a string.

    This utility is used for creating custom summary tables (like the robustness
    table) where results need to be presented concisely in a single cell.

    Args:
        coef (float): The coefficient estimate.
        p_value (float): The p-value associated with the coefficient.

    Returns:
        str: A formatted string, e.g., "0.123***".
    """
    # --- Determine Significance Level ---
    # Assign the appropriate star notation based on conventional p-value thresholds.
    if p_value < 0.01:
        stars = '***'
    elif p_value < 0.05:
        stars = '**'
    elif p_value < 0.1:
        stars = '*'
    else:
        stars = ''

    # --- Format and Return String ---
    # Combine the formatted coefficient (to 3 decimal places) and the stars.
    return f"{coef:.3f}{stars}"

def create_robustness_summary_table(
    robustness_results: Dict[str, Fixest],
    key_variables: List[str]
) -> pd.DataFrame:
    """
    Creates a summary table comparing key coefficients across robustness checks.

    This function distills the results from the entire suite of robustness
    checks into a concise table, allowing for a quick assessment of how stable
    the key findings are across different model specifications and samples.

    Args:
        robustness_results (Dict[str, Fixest]): The dictionary of results from
            the robustness orchestrator.
        key_variables (List[str]): A list of the key coefficient names to display.

    Returns:
        pd.DataFrame: A DataFrame where rows are the key variables and columns
                      are the different robustness checks.
    """
    # --- 1. Initialize Storage ---
    summary_data = []

    # --- 2. Iterate Through Robustness Check Results ---
    for check_name, model_fit in robustness_results.items():
        # Initialize a dictionary for this column of the table.
        result_col = {'Check': check_name}

        # If the model failed to run, fill with 'NA'.
        if model_fit is None:
            for var in key_variables:
                result_col[var] = "NA"
        else:
            # Extract the full summary table for this model.
            summary = model_fit.summary()
            # For each key variable, extract and format the result.
            for var in key_variables:
                if var in summary.index:
                    coef = summary.loc[var, 'Estimate']
                    p_val = summary.loc[var, 'Pr(>|t|)']
                    result_col[var] = _format_coef_and_stars(coef, p_val)
                else:
                    result_col[var] = "NA" # Variable not in this model

        summary_data.append(result_col)

    # --- 3. Format and Return Output ---
    # Convert the list of dictionaries to a DataFrame.
    summary_df = pd.DataFrame(summary_data)
    # Set the check name as the index and transpose for the desired format.
    summary_df = summary_df.set_index('Check').T

    return summary_df


# ==============================================================================
# Task 19: Master Orchestrator Function
# ==============================================================================

def run_results_presentation_pipeline(
    ppml_models: Dict[str, Fixest],
    logit_models: Dict[str, Dict[str, pd.Series]],
    logit_nobs: Dict[str, int],
    robustness_results: Dict[str, Fixest],
    config: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the creation of all final results tables.

    Args:
        ppml_models (Dict[str, Fixest]): A dictionary of fitted PPML models.
        logit_models (Dict[str, Dict[str, pd.Series]]): A dictionary of Logit results.
        logit_nobs (Dict[str, int]): A dictionary of observation counts for Logit models.
        robustness_results (Dict[str, Fixest]): The dictionary of all robustness run results.
        config (Dict[str, Any]): A configuration dictionary containing specifications
                                 for table titles, variable renaming, etc.

    Returns:
        Dict[str, pd.DataFrame]: A dictionary of the generated results tables.
    """
    try:
        # --- 1. Prepare Models for Main Table ---
        print("Step 1: Preparing models for main results table...")
        # Combine PPML and Logit models into a single list.
        all_models = list(ppml_models.values())
        model_names = list(ppml_models.keys())

        # Adapt the custom Logit results for modelsummary.
        for name, results in logit_models.items():
            all_models.append(StoredLogitResults(results, logit_nobs[name]))
            model_names.append(name)
        print("Step 1: Success.")

        # --- 2. Create Main Results Table ---
        print("Step 2: Creating main results table...")
        main_table = create_main_results_table(
            model_list=all_models,
            model_names=model_names,
            title=config.get('main_table_title', "Main Regression Results"),
            coef_rename_map=config.get('coef_rename_map', {})
        )
        print("Step 2: Success.")

        # --- 3. Create Robustness Summary Table ---
        print("Step 3: Creating robustness summary table...")
        robustness_table = create_robustness_summary_table(
            robustness_results,
            key_variables=config.get('key_robustness_vars', [])
        )
        print("Step 3: Success.")

        # --- 4. Compile and Return ---
        tables = {
            'main_results': main_table,
            'robustness_summary': robustness_table
        }

        print("\nSUCCESS: All results tables have been generated.")
        return tables

    except Exception as e:
        print(f"\nERROR: Results presentation pipeline failed.")
        raise e


In [None]:
# Task 20: Effect Size Visualization and Temporal Analysis

# ==============================================================================
# Task 20, Step 1: Margin-Specific Effect Size Charts
# ==============================================================================

def plot_marginal_effects(
    marginal_effects_df: pd.DataFrame,
    title: str,
    x_label: str
) -> plt.Figure:
    """
    Creates a horizontal bar chart to visualize marginal effects and their CIs.

    This function takes a DataFrame of calculated marginal effects and produces a
    professional-quality "coefficient plot" that clearly shows the magnitude and
    statistical significance of the effects for different scenarios.

    Args:
        marginal_effects_df (pd.DataFrame): A DataFrame indexed by scenario name,
            with columns 'EffectPct' (or 'AME_pp') and 'StdError'.
        title (str): The title for the plot.
        x_label (str): The label for the x-axis (e.g., '% Change in Trade').

    Returns:
        matplotlib.figure.Figure: The generated figure object.
    """
    # --- 1. Data Preparation ---
    # Make a copy to avoid modifying the original DataFrame.
    plot_df = marginal_effects_df.copy()
    # The effect column might have different names, so standardize it.
    effect_col = [c for c in plot_df.columns if 'Effect' in c or 'AME' in c][0]
    plot_df.rename(columns={effect_col: 'Effect'}, inplace=True)

    # Calculate the confidence interval bounds.
    plot_df['ci_half_width'] = 1.96 * plot_df['StdError']

    # --- 2. Plotting ---
    # Set a professional plot style.
    sns.set_theme(style="whitegrid")
    fig, ax = plt.subplots(figsize=(10, 6))

    # Create the horizontal bar plot.
    bars = ax.barh(
        y=plot_df.index,
        width=plot_df['Effect'],
        color=sns.color_palette("vlag", n_colors=len(plot_df))[::-1],
        alpha=0.8
    )

    # Add the error bars representing the 95% confidence interval.
    ax.errorbar(
        x=plot_df['Effect'],
        y=plot_df.index,
        xerr=plot_df['ci_half_width'],
        fmt='none',
        color='black',
        capsize=5
    )

    # --- 3. Aesthetics and Labels ---
    # Add a vertical line at zero for reference.
    ax.axvline(0, color='black', linestyle='--', linewidth=1)
    # Set the title and labels.
    ax.set_title(title, fontsize=16, pad=20)
    ax.set_xlabel(x_label, fontsize=12)
    ax.set_ylabel("")
    # Invert y-axis to have the first item on top.
    ax.invert_yaxis()
    # Add data labels to the bars.
    ax.bar_label(bars, fmt='%.2f', padding=5)

    # Ensure a tight layout.
    fig.tight_layout()

    return fig


# ==============================================================================
# Task 20, Step 2: Temporal Heterogeneity Visualization
# ==============================================================================

def plot_temporal_heterogeneity(
    period_effects_df: pd.DataFrame,
    title: str
) -> sns.FacetGrid:
    """
    Creates a point plot to visualize the evolution of effects over time.

    This function uses seaborn's `catplot` to create a faceted point plot,
    which is ideal for showing how an estimated effect (and its confidence
    interval) changes across different discrete time periods.

    Args:
        period_effects_df (pd.DataFrame): The DataFrame of period-specific
            effects from Task 15, with a MultiIndex of (Period, Scenario).
        title (str): The main title for the entire plot grid.

    Returns:
        seaborn.FacetGrid: The generated FacetGrid object, which can be further
                           customized or saved.
    """
    # --- 1. Data Preparation ---
    # Reset the index to make 'Period' and 'Scenario' regular columns for plotting.
    plot_df = period_effects_df.reset_index()

    # --- 2. Plotting with Seaborn FacetGrid ---
    # `catplot` is a high-level interface for drawing categorical plots onto a
    # FacetGrid. `kind='point'` creates the point plot with CI error bars.
    # `col='Scenario'` creates separate subplots for each scenario.
    g = sns.catplot(
        data=plot_df,
        x='Period',
        y='EffectPct',
        kind='point',
        col='Scenario',
        capsize=0.2,
        height=5,
        aspect=1.2,
        sharey=True,
        palette='viridis'
    )

    # --- 3. Aesthetics and Labels ---
    # Set the main title for the figure.
    g.fig.suptitle(title, y=1.03, fontsize=16)
    # Set axis labels.
    g.set_axis_labels("Time Period", "Marginal Effect (%)")
    # Add a horizontal line at zero on each subplot for reference.
    g.map(plt.axhline, y=0, color='black', linestyle='--', linewidth=1)

    return g


# ==============================================================================
# Task 20, Step 3: Robustness Test Visualization
# ==============================================================================

def plot_robustness_check_stability(
    baseline_model: Fixest,
    robustness_results: Dict[str, Fixest],
    variable_to_plot: str,
    title: str
) -> plt.Figure:
    """
    Creates a coefficient stability plot to visualize robustness check results.

    This plot shows the point estimate and 95% confidence interval for a key
    coefficient from the baseline model and compares it to the estimates from
    a series of robustness check models.

    Args:
        baseline_model (Fixest): The fitted baseline model object.
        robustness_results (Dict[str, Fixest]): The dictionary of results from
            the robustness orchestrator.
        variable_to_plot (str): The name of the key coefficient to visualize.
        title (str): The title for the plot.

    Returns:
        matplotlib.figure.Figure: The generated figure object.
    """
    # --- 1. Extract Data for Plotting ---
    plot_data = []
    # First, get the baseline result.
    summary_base = baseline_model.summary()
    base_estimate = summary_base.loc[variable_to_plot, 'Estimate']
    base_se = summary_base.loc[variable_to_plot, 'Std. Error']

    # Iterate through the robustness check results.
    for check_name, model_fit in robustness_results.items():
        if model_fit is not None and variable_to_plot in model_fit.coef().index:
            summary = model_fit.summary()
            estimate = summary.loc[variable_to_plot, 'Estimate']
            se = summary.loc[variable_to_plot, 'Std. Error']
            plot_data.append({
                'Check': check_name,
                'Estimate': estimate,
                'LowerCI': estimate - 1.96 * se,
                'UpperCI': estimate + 1.96 * se
            })

    plot_df = pd.DataFrame(plot_data)

    # --- 2. Plotting ---
    sns.set_theme(style="whitegrid")
    fig, ax = plt.subplots(figsize=(10, 0.5 * len(plot_df) + 2))

    # Plot the point estimates for each robustness check.
    ax.scatter(y=plot_df['Check'], x=plot_df['Estimate'], color='black', zorder=10)
    # Plot the confidence interval lines for each check.
    for i, row in plot_df.iterrows():
        ax.plot([row['LowerCI'], row['UpperCI']], [i, i], color='black', linewidth=1)

    # --- 3. Add Baseline Reference ---
    # Add a vertical line for the baseline model's point estimate.
    ax.axvline(base_estimate, color='red', linestyle='-', linewidth=2, label='Baseline Estimate')
    # Add a shaded region for the baseline model's 95% confidence interval.
    base_lower_ci = base_estimate - 1.96 * base_se
    base_upper_ci = base_estimate + 1.96 * base_se
    ax.axvspan(base_lower_ci, base_upper_ci, color='red', alpha=0.1, label='Baseline 95% CI')

    # --- 4. Aesthetics and Labels ---
    ax.set_title(title, fontsize=16, pad=20)
    ax.set_xlabel(f"Coefficient Estimate for '{variable_to_plot}'", fontsize=12)
    ax.set_ylabel("")
    ax.invert_yaxis()
    ax.legend()
    fig.tight_layout()

    return fig


In [None]:
# Task 21: Research Synthesis and Policy Implications

# ==============================================================================
# Task 21: Helper Functions for Synthesizing Results
# ==============================================================================

def _format_finding(estimate: float, se: float, unit: str) -> str:
    """
    Formats a key quantitative finding into a standardized, human-readable sentence.

    This helper function takes a point estimate, its standard error, and a unit
    of measurement, then generates a complete sentence that reports the finding
    along with its statistical significance. It is a crucial component for the
    programmatic generation of a research summary report.

    The process involves:
    1. Calculating the Z-score from the estimate and standard error.
    2. Computing the two-tailed p-value based on the standard normal distribution.
    3. Determining the level of statistical significance based on conventional
       thresholds (10%, 5%, 1%).
    4. Assembling a formatted string that includes all this information.

    Args:
        estimate (float): The point estimate of the effect (e.g., a coefficient
                          or marginal effect).
        se (float): The standard error of the estimate.
        unit (str): The unit of the estimate, to be included in the output
                    string (e.g., "%", "pp", "points").

    Returns:
        str: A formatted string summarizing the finding, for example:
             "The estimated effect is -5.20 % (SE = 1.34, p = 0.000), which is
             statistically significant at the 1% level."

    Raises:
        ValueError: If the standard error is non-positive, which would make
                    the Z-score calculation invalid.
    """
    # --- 1. Input Validation ---
    # Ensure the standard error is a positive number for a valid calculation.
    if not isinstance(se, (int, float)) or se <= 0:
        raise ValueError("Standard error (se) must be a positive number.")

    # --- 2. Calculate Test Statistics ---
    # Calculate the Z-score, which measures how many standard errors the
    # estimate is away from zero.
    z_score = estimate / se

    # Calculate the two-tailed p-value from the Z-score using the cumulative
    # distribution function (CDF) of the standard normal distribution.
    # p = 2 * P(Z > |z_score|) = 2 * (1 - CDF(|z_score|))
    p_value = 2 * (1 - norm.cdf(np.abs(z_score)))

    # --- 3. Determine Significance Level ---
    # Compare the p-value against conventional alpha levels to determine the
    # appropriate significance statement.
    if p_value < 0.01:
        significance = "statistically significant at the 1% level"
    elif p_value < 0.05:
        significance = "statistically significant at the 5% level"
    elif p_value < 0.1:
        significance = "statistically significant at the 10% level"
    else:
        significance = "not statistically significant"

    # --- 4. Format and Return the Final String ---
    # Use an f-string to assemble the final, comprehensive summary sentence.
    # Estimates are formatted to two decimal places, and the p-value to three.
    return (
        f"The estimated effect is {estimate:.2f} {unit} "
        f"(SE = {se:.2f}, p = {p_value:.3f}), which is {significance}."
    )

def summarize_baseline_findings(
    ppml_marginal_effects: pd.DataFrame,
    wto_attenuation: Dict[str, float],
    baseline_scenario: str,
    wto_scenario: str
) -> Dict[str, str]:
    """
    Summarizes the main findings from the baseline model.

    Args:
        ppml_marginal_effects (pd.DataFrame): DataFrame of marginal effects.
        wto_attenuation (Dict[str, float]): Dictionary with attenuation effect results.
        baseline_scenario (str): Name of the baseline (non-WTO) scenario.
        wto_scenario (str): Name of the WTO member scenario.

    Returns:
        Dict[str, str]: A dictionary of summary statements.
    """
    # Extract baseline penalty on trade.
    base_effect = ppml_marginal_effects.loc[baseline_scenario, 'EffectPct']
    base_se = ppml_marginal_effects.loc[baseline_scenario, 'StdError']

    # Extract WTO attenuation effect.
    attenuation_effect = wto_attenuation['Attenuation_Effect_Pct']
    attenuation_se = wto_attenuation['Attenuation_StdError']

    # Generate summary statements.
    summary = {
        "Political Distance Penalty": _format_finding(base_effect, base_se, "%"),
        "WTO Attenuation Effect": _format_finding(attenuation_effect, attenuation_se, "pp"),
    }
    return summary

def assess_robustness(
    baseline_model: Fixest,
    robustness_results: Dict[str, Fixest],
    key_coefficient: str
) -> Dict[str, Any]:
    """
    Assesses the robustness of a key coefficient across multiple model specifications.

    Args:
        baseline_model (Fixest): The fitted baseline model object.
        robustness_results (Dict[str, Fixest]): Dictionary of robustness model results.
        key_coefficient (str): The name of the coefficient to assess.

    Returns:
        Dict[str, Any]: A dictionary summarizing the robustness assessment.
    """
    # Get the sign and significance of the baseline coefficient.
    base_summary = baseline_model.summary()
    base_sign = np.sign(base_summary.loc[key_coefficient, 'Estimate'])
    base_is_sig = base_summary.loc[key_coefficient, 'Pr(>|t|)'] < 0.05

    # Initialize counters.
    successful_runs = 0
    sign_consistent = 0
    sig_consistent = 0

    # Iterate through robustness checks.
    for model in robustness_results.values():
        if model is not None and key_coefficient in model.coef().index:
            successful_runs += 1
            summary = model.summary()
            current_sign = np.sign(summary.loc[key_coefficient, 'Estimate'])
            current_is_sig = summary.loc[key_coefficient, 'Pr(>|t|)'] < 0.05

            if current_sign == base_sign:
                sign_consistent += 1
                if current_is_sig == base_is_sig:
                    sig_consistent += 1

    # Classify robustness based on consistency.
    consistency_frac = sig_consistent / successful_runs if successful_runs > 0 else 0
    if consistency_frac > 0.9: classification = "Highly Robust"
    elif consistency_frac > 0.7: classification = "Moderately Robust"
    else: classification = "Sensitive"

    return {
        "Key Coefficient": key_coefficient,
        "Total Checks": len(robustness_results),
        "Successful Runs": successful_runs,
        "Sign Consistency": f"{sign_consistent}/{successful_runs} ({sign_consistent/successful_runs:.0%})",
        "Significance Consistency": f"{sig_consistent}/{successful_runs} ({consistency_frac:.0%})",
        "Overall Assessment": classification
    }


# ==============================================================================
# Task 21, New Helper Function: Assess Economic Significance
# ==============================================================================

def assess_economic_significance(
    effect_estimate: float,
    thresholds: Dict[str, float]
) -> Dict[str, Any]:
    """
    Classifies the economic significance of an estimated effect based on thresholds.

    This function translates a statistical point estimate into a qualitative
    assessment of its practical importance, moving beyond p-values to consider
    the magnitude of the effect.

    Args:
        effect_estimate (float): The point estimate of the effect (e.g., a
                                 percentage point change).
        thresholds (Dict[str, float]): A dictionary defining the boundaries for
            classification. Keys are labels (e.g., "Substantial") and values
            are the minimum absolute effect size for that category. The
            dictionary should be ordered from most to least substantial.

    Returns:
        Dict[str, Any]: A dictionary containing the original effect size and a
                        string classification of its economic significance.
    """
    # --- 1. Input Validation ---
    if not isinstance(effect_estimate, (int, float)):
        raise TypeError("effect_estimate must be a numeric value.")
    if not isinstance(thresholds, dict) or not thresholds:
        raise ValueError("thresholds must be a non-empty dictionary.")

    # --- 2. Classification Logic ---
    # Take the absolute value of the effect for magnitude comparison.
    abs_effect = abs(effect_estimate)

    # Default classification is "Marginal".
    classification = "Marginal"

    # Iterate through the thresholds (assumed to be ordered high to low)
    # to find the appropriate classification.
    for label, threshold_value in thresholds.items():
        if abs_effect >= threshold_value:
            classification = label
            break # Stop at the first (highest) category met.

    # --- 3. Format and Return Output ---
    return {
        "Effect_Size": f"{effect_estimate:.2f}",
        "Classification": classification
    }


# ==============================================================================
# Task 21: Master Orchestrator Function
# ==============================================================================

def generate_synthesis_report(
    baseline_ppml_results: Fixest,
    ppml_marginal_effects: pd.DataFrame,
    wto_attenuation_results: Dict[str, float],
    robustness_results: Dict[str, Fixest],
    temporal_break_test_results: pd.Series,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Generates a complete, structured synthesis report of all key quantitative findings.

    This function orchestrates the entire synthesis phase, calling helper
    functions to programmatically extract, assess, and summarize the main
    results from the entire analytical pipeline. The output is a nested
    dictionary that serves as a complete, reproducible summary of the project's
    conclusions, now including an assessment of economic significance.

    Args:
        baseline_ppml_results (Fixest): The main fitted PPML model object.
        ppml_marginal_effects (pd.DataFrame): DataFrame of marginal effects.
        wto_attenuation_results (Dict[str, float]): Results of the attenuation analysis.
        robustness_results (Dict[str, Fixest]): Dictionary of all robustness models.
        temporal_break_test_results (pd.Series): Results of the structural break test.
        config (Dict[str, Any]): A configuration dictionary specifying key variable
                                 names, scenario names, and thresholds for the report.

    Returns:
        Dict[str, Any]: A nested dictionary containing the full synthesis report.
    """
    try:
        # --- Step 1: Summarize Baseline Findings ---
        print("Step 1: Summarizing baseline findings...")
        baseline_summary = summarize_baseline_findings(
            ppml_marginal_effects,
            wto_attenuation_results,
            config['baseline_scenario'],
            config['wto_scenario']
        )
        print("Step 1: Success.")

        # --- Step 2: Assess Robustness of the Key Finding ---
        print("Step 2: Assessing robustness of the key finding...")
        robustness_summary = assess_robustness(
            baseline_ppml_results,
            robustness_results,
            config['key_coefficient_for_robustness']
        )
        print("Step 2: Success.")

        # --- Step 3: Summarize Temporal Evolution ---
        print("Step 3: Summarizing temporal evolution...")
        temporal_summary = {
            "Finding": f"Test for structural break in WTO effect post-{config['break_year']}",
            "Summary": _format_finding(
                temporal_break_test_results['Estimate'],
                temporal_break_test_results['Std. Error'],
                "points"
            )
        }
        print("Step 3: Success.")

        # --- Step 4: Assess Economic Significance (New Step) ---
        print("Step 4: Assessing economic significance...")
        # Define the thresholds for significance.
        eco_sig_thresholds = {"Substantial": 5.0, "Meaningful": 1.0}
        # Extract the key effect to assess.
        key_effect = wto_attenuation_results['Attenuation_Effect_Pct']
        # Call the new helper function.
        economic_significance_summary = assess_economic_significance(
            key_effect,
            eco_sig_thresholds
        )
        print("Step 4: Success.")

        # --- Step 5: Compile Final Report ---
        final_report = {
            "1_Baseline_Findings": baseline_summary,
            "2_Robustness_Assessment": robustness_summary,
            "3_Temporal_Heterogeneity": temporal_summary,
            "4_Economic_Significance": economic_significance_summary
        }

        print("\nSUCCESS: Complete synthesis report generated.")
        return final_report

    except Exception as e:
        print(f"\nERROR: Synthesis report generation failed.")
        raise e


In [None]:
# Top-Level Orchestrator Function

# ==============================================================================
# Standalone Helper Functions
# ==============================================================================

def synthesize_country_level_data(master_df: pd.DataFrame) -> pd.DataFrame:
    """
    Synthesizes a country-year panel from a dyadic master panel.

    This function is a crucial pre-processing step that extracts country-specific
    time-series data (like GDP and Total Exports) from a dyadic panel where
    such data is repeated for both the exporter and importer. It isolates the
    exporter-specific and importer-specific columns, stacks them, standardizes
    their names, and removes duplicate country-year observations to create a
    clean, canonical country-year dataset. This dataset is essential for
    calculating domestic trade flows as required by structural gravity theory.

    Args:
        master_df (pd.DataFrame): The dyadic master DataFrame. It must have a
            DatetimeIndex and columns for ExporterISO, ImporterISO, and
            country-specific variables suffixed with '_Exporter' and '_Importer'.

    Returns:
        pd.DataFrame: A clean, de-duplicated country-year panel with columns
                      ['ISO', 'Year', 'GDP_USD', 'TotalExports_USD', ...].

    Raises:
        TypeError: If the input is not a pandas DataFrame.
        ValueError: If essential columns are missing.
    """
    # --- 1. Input Validation ---
    # Verify the input is a pandas DataFrame with a DatetimeIndex.
    if not isinstance(master_df, pd.DataFrame) or not isinstance(master_df.index, pd.DatetimeIndex):
        raise TypeError("Input must be a pandas DataFrame with a DatetimeIndex.")

    # --- 2. Data Extraction and Reshaping ---
    # Create a temporary copy to work with.
    df = master_df.copy()
    # Extract the year from the index, as it's a key for the country-year panel.
    df['Year'] = df.index.year

    # Define the mapping for exporter columns to generic names.
    exp_cols = {c: c.replace('_Exporter', '') for c in df.columns if '_Exporter' in c}
    # Define the mapping for importer columns to generic names.
    imp_cols = {c: c.replace('_Importer', '') for c in df.columns if '_Importer' in c}

    # Create the exporter-specific DataFrame.
    exp_df = df[['ExporterISO', 'Year'] + list(exp_cols.keys())].rename(
        columns={'ExporterISO': 'ISO', **exp_cols}
    )
    # Create the importer-specific DataFrame.
    imp_df = df[['ImporterISO', 'Year'] + list(imp_cols.keys())].rename(
        columns={'ImporterISO': 'ISO', **imp_cols}
    )

    # --- 3. Concatenation and Deduplication ---
    # Stack the exporter and importer dataframes.
    country_df = pd.concat([exp_df, imp_df], ignore_index=True)

    # Drop duplicate country-year observations to create the final panel.
    country_df.drop_duplicates(subset=['ISO', 'Year'], inplace=True)

    # Drop any rows that might have missing data after the reshape.
    return country_df.dropna().reset_index(drop=True)


def synthesize_unga_data(master_df: pd.DataFrame) -> pd.DataFrame:
    """
    Synthesizes a UNGA ideal point country-year panel from a dyadic master panel.

    This function assumes that UNGA ideal point estimates have already been
    merged onto the master dyadic DataFrame (as 'IdealPoint_Exporter' and
    'IdealPoint_Importer'). It extracts these values, stacks them, and creates
    the clean, canonical country-year UNGA dataset required by the variable
    construction pipeline.

    Args:
        master_df (pd.DataFrame): The dyadic master DataFrame, which must contain
            the 'IdealPoint_Exporter' and 'IdealPoint_Importer' columns.

    Returns:
        pd.DataFrame: A clean, de-duplicated country-year panel with columns
                      ['ISO', 'Year', 'IdealPoint'].

    Raises:
        ValueError: If the required IdealPoint columns are not in the DataFrame.
    """
    # --- 1. Input Validation ---
    required_cols = ['IdealPoint_Exporter', 'IdealPoint_Importer', 'ExporterISO', 'ImporterISO']
    if not all(col in master_df.columns for col in required_cols):
        raise ValueError(f"master_df must contain required columns: {required_cols}")

    # --- 2. Data Extraction and Reshaping ---
    # Create a temporary copy and extract the year.
    df = master_df.copy()
    df['Year'] = df.index.year

    # Extract non-null exporter ideal points.
    exp_unga = df.loc[df['IdealPoint_Exporter'].notna(), ['ExporterISO', 'Year', 'IdealPoint_Exporter']].rename(
        columns={'ExporterISO': 'ISO', 'IdealPoint_Exporter': 'IdealPoint'}
    )
    # Extract non-null importer ideal points.
    imp_unga = df.loc[df['IdealPoint_Importer'].notna(), ['ImporterISO', 'Year', 'IdealPoint_Importer']].rename(
        columns={'ImporterISO': 'ISO', 'IdealPoint_Importer': 'IdealPoint'}
    )

    # --- 3. Concatenation and Deduplication ---
    # Stack the two datasets.
    unga_df = pd.concat([exp_unga, imp_unga], ignore_index=True)

    # Drop duplicate country-year observations.
    return unga_df.drop_duplicates(subset=['ISO', 'Year']).reset_index(drop=True)


def define_baseline_model_spec() -> Dict[str, Any]:
    """
    Defines and returns the baseline model specification as a dictionary.

    This function centralizes the main econometric model's specification.
    Encapsulating the specification in a function ensures consistency across
    different parts of the pipeline (e.g., main estimation, robustness checks)
    and makes it easy to modify the baseline model in one place.

    Returns:
        Dict[str, Any]: A dictionary containing all components of the model
                        specification, including dependent variable, regressors,
                        fixed effects, and cluster variable.
    """
    # This dictionary structure is designed to be parsed by the various
    # estimation and analysis orchestrators.
    specification = {
        'dependent_var': 'BilateralTradeValue_USD',
        'main_regressors': ['RTA', 'GATTWTO_One', 'GATTWTO_Both'],
        'interaction_regressors': [
            'PD_GDELT_Filtered_IHS',
            'PD_GDELT_Filtered_IHS:RTA',
            'PD_GDELT_Filtered_IHS:GATTWTO_One',
            'PD_GDELT_Filtered_IHS:GATTWTO_Both'
        ],
        'fixed_effects': ['FE_ExporterYear', 'FE_ImporterYear', 'DyadicPair'],
        'cluster_col': 'DyadicPair',
        'main_pd_term': 'is_international:PD_GDELT_Filtered_IHS',
        'wto_interaction_term': 'is_international:PD_GDELT_Filtered_IHS:GATTWTO_Both',
        'rta_interaction_term': 'is_international:PD_GDELT_Filtered_IHS:RTA',
        'governance_interaction_terms': [] # Baseline model excludes these.
    }
    return specification


def generate_and_save_all_figures(output_dir: Path, **kwargs: Any) -> None:
    """
    Orchestrates the generation and saving of all primary project figures.

    This function takes a keyword dictionary of all necessary data and results
    objects, calls the respective plotting functions for each figure, and saves
    the output to high-resolution files in the specified directory.

    Args:
        output_dir (Path): A pathlib.Path object pointing to the output directory.
        **kwargs (Any): A keyword dictionary of results objects. Expected keys include:
            'ppml_marginal_effects' (pd.DataFrame),
            'temporal_effects' (pd.DataFrame),
            'ppml_results' (Fixest),
            'robustness_results' (Dict[str, Fixest]),
            'base_spec' (Dict[str, Any]).
    """
    # --- 1. Input Validation ---
    # Check that the output directory exists.
    if not output_dir.is_dir():
        raise FileNotFoundError(f"Output directory does not exist: {output_dir}")

    print("--- Generating and saving all figures ---")

    # --- 2. Plot 1: PPML Marginal Effects ---
    # Check if the required data for this plot is available.
    if 'ppml_marginal_effects' in kwargs:
        # Generate the plot using the dedicated plotting function.
        fig1 = plot_marginal_effects(
            kwargs['ppml_marginal_effects'],
            title="PPML: Effect of a 1-SD Increase in Political Distance",
            x_label="% Change in Trade Value"
        )
        # Define the output path and save the figure.
        fig1_path = output_dir / "figure_1_ppml_marginal_effects.png"
        fig1.savefig(fig1_path, dpi=300, bbox_inches='tight')
        print(f"Saved Figure 1 to: {fig1_path}")

    # --- 3. Plot 2: Temporal Heterogeneity ---
    # Check for the required data.
    if 'temporal_effects' in kwargs:
        # Generate the plot.
        fig2 = plot_temporal_heterogeneity(
            kwargs['temporal_effects'],
            title="Evolution of WTO Attenuation Effect Over Time"
        )
        # Save the figure.
        fig2_path = output_dir / "figure_2_temporal_heterogeneity.png"
        fig2.savefig(fig2_path, dpi=300, bbox_inches='tight')
        print(f"Saved Figure 2 to: {fig2_path}")

    # --- 4. Plot 3: Robustness Stability Plot ---
    # This plot requires multiple inputs.
    required_robustness_keys = ['ppml_results', 'robustness_results', 'base_spec']
    if all(key in kwargs for key in required_robustness_keys) and kwargs['robustness_results']:
        # Generate the plot.
        fig3 = plot_robustness_check_stability(
            kwargs['ppml_results'],
            kwargs['robustness_results'],
            variable_to_plot=kwargs['base_spec']['wto_interaction_term'],
            title="Coefficient Stability of WTO Interaction Term"
        )
        # Save the figure.
        fig3_path = output_dir / "figure_3_robustness_stability.png"
        fig3.savefig(fig3_path, dpi=300, bbox_inches='tight')
        print(f"Saved Figure 3 to: {fig3_path}")

    # --- 5. Cleanup ---
    # Close all figure objects to free up memory.
    plt.close('all')
    print("--- All figures generated and saved successfully ---")


# ==============================================================================
# Define Full Input Schema
# ==============================================================================

def define_full_schema() -> Dict[str, Type]:
    """
    Defines the complete and accurate schema for the master input DataFrame.

    This function serves as the single source of truth for the expected structure
    of the input data. It includes all 26 columns required for the full end-to-end
    pipeline, incorporating critical corrections and additions identified during
    the detailed analysis of the research methodology. Specifically:
    - It adds 'IdealPoint_Exporter' and 'IdealPoint_Importer' for the UNGA analysis.
    - It correctly specifies 'TradeProd_SectorCount' for the margin analysis,
      replacing the less accurate 'ExtensiveMargin_ProductCount'.

    The schema is used by the `run_dataframe_validation_and_assessment` function
    to rigorously validate the raw input data before any processing begins.

    Returns:
        Dict[str, Type]: A dictionary where keys are the required column names
                         and values are their precise expected numpy/object dtypes.
    """
    # Define the schema as a dictionary for clarity and easy validation.
    # Using specific numpy dtypes (e.g., np.float64) is a best practice for
    # ensuring data precision and memory consistency.

    full_schema = {
        # --- Core Bilateral Trade Variables ---
        "ExporterISO": object,
        "ImporterISO": object,
        "DyadicPair": object,
        "BilateralTradeValue_USD": np.float64,

        # --- Extensive Margin Variables (Corrected) ---
        # NOTE: 'ExtensiveMargin_ProductCount' is replaced with the correct
        # variable for the paper's margin analysis.
        "TradeProd_SectorCount": np.int64,
        "BACI_TotalValue_USD": np.float64,

        # --- Political Distance and Institutional Variables ---
        "GDELT_GoldsteinMean": np.float64,
        "GDELT_GoldsteinStd": np.float64,
        "GDELT_EventCount": np.int64,
        "GATTWTO_Both": np.int64,
        "GATTWTO_One": np.int64,
        "RTA": np.int64,

        # --- Country-Level Economic Controls ---
        "GDP_USD_Exporter": np.float64,
        "GDP_USD_Importer": np.float64,
        "TotalExports_USD_Exporter": np.float64,
        "TotalExports_USD_Importer": np.float64,
        "DomesticTrade_Exporter": np.float64,
        "DomesticTrade_Importer": np.float64,

        # --- Political and Governance Variables ---
        "PolityScore_Exporter": np.int64,
        "PolityScore_Importer": np.int64,
        "CorruptionIndex_Exporter": np.float64,
        "CorruptionIndex_Importer": np.float64,

        # --- Constructed Political Variables ---
        "DemocraticPair": np.int64,
        "PoliticalDistance": np.float64,

        # --- CRITICAL ADDITIONS for UNGA Analysis ---
        # These columns are required by the pipeline but were missing from the
        # initial 24-column specification.
        "IdealPoint_Exporter": np.float64,
        "IdealPoint_Importer": np.float64,
    }

    return full_schema


# ==============================================================================
# The Top-Level Orchestrator
# ==============================================================================

def execute_full_research_pipeline(
    master_df: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str,
    run_intensive_logit: bool = True,
    run_robustness_checks: bool = True
) -> None:
    """
    Executes the complete end-to-end research pipeline for the study.

    This master function is the single entry point for the entire project. It
    takes the raw, monolithic dataset and the project configuration, and then
    sequentially executes all 21 tasks, from data validation and preprocessing
    to estimation, analysis, robustness checks, and final report generation.

    The function is designed to be a "one-button" replication script. It has no
    return value; its primary effect is to populate the specified output
    directory with all reproducible research outputs, including data tables,
    figures, and a final JSON synthesis report.

    Args:
        master_df (pd.DataFrame): The single, monolithic input DataFrame containing
                                  all raw dyadic and country-level variables. It
                                  is assumed to contain the necessary columns for
                                  synthesizing all required sub-datasets.
        config (Dict[str, Any]): The master configuration dictionary for the project.
        output_dir (str): The path to the directory where all outputs will be saved.
        run_intensive_logit (bool): Flag to run the computationally expensive
                                    Logit estimation. Defaults to True.
        run_robustness_checks (bool): Flag to run the computationally expensive
                                      robustness checks. Defaults to True.

    Raises:
        Exception: Re-raises any exception caught during the pipeline's
                   execution after printing a detailed error message and
                   traceback.
    """
    # Wrap the entire pipeline in a try-except block for robust error handling.
    try:
        # --- Setup ---
        # Define the output directory using pathlib for cross-platform compatibility.
        output_path = Path(output_dir)
        # Create the directory and any parent directories if they do not exist.
        output_path.mkdir(parents=True, exist_ok=True)

        # --- Phase 1: Configuration and Data Validation ---
        print("--- PHASE 1: CONFIGURATION AND DATA VALIDATION ---")
        # Validate the configuration dictionary and create the immutable constants object.
        constants = run_configuration_and_validation(config)
        # Define the full, corrected schema for the input data.
        full_schema = define_full_schema()
        # Validate the input DataFrame against the rigorous schema.
        run_dataframe_validation_and_assessment(master_df, full_schema)
        # Run a comprehensive data quality audit and print the results.
        _ = run_data_quality_assessment(master_df)

        # --- Phase 2: Data Preprocessing and Transformation ---
        print("\n--- PHASE 2: DATA PREPROCESSING AND TRANSFORMATION ---")
        # Synthesize the required country-level and UNGA data from the master frame.
        country_df = synthesize_country_level_data(master_df)
        unga_df = synthesize_unga_data(master_df)
        # Run the GDELT preprocessing pipeline (filtering, AR(1) variance estimation).
        gdelt_series, q_vars = run_gdelt_preprocessing_pipeline(master_df, constants)
        # Run the Particle Filter to get the smoothed political distance measure.
        filtered_gdelt = run_particle_filter_pipeline(gdelt_series, q_vars, constants, base_seed=2025)
        # Run the final variable construction pipeline (merge results, create margins, etc.).
        analytical_df = run_variable_construction_pipeline(
            main_df=master_df,
            filtered_gdelt_df=filtered_gdelt,
            unga_ideal_points_df=unga_df,
            ihs_columns=['PD_GDELT_Filtered', 'PD_UNGA', 'PolityScore_Exporter', 'PolityScore_Importer']
        )

        # --- Phase 3: Model Preparation ---
        print("\n--- PHASE 3: MODEL PREPARATION ---")
        # Define the baseline model specification.
        base_spec = define_baseline_model_spec()
        # Create the interaction terms required by the specification.
        interaction_terms_to_create = [(term.split(':')[1], term.split(':')[2], term) for term in base_spec['interaction_regressors'] if len(term.split(':')) == 3]
        # Add fixed effects identifiers and interaction terms to the DataFrame.
        prepared_df = run_panel_data_preparation(analytical_df, interaction_terms_to_create)
        # Define the core variables for listwise deletion.
        core_vars = list(base_spec['main_regressors']) + [base_spec['dependent_var']] + [base_spec['interaction_regressors'][0]]
        # Create the specific analytical samples (PPML, Logit, Temporal).
        samples = run_sample_preparation_pipeline(prepared_df, country_df, {'core_regression_vars': core_vars})
        # Prepare the generators for bootstrap and jackknife procedures.
        inference_prep = run_inference_preparation(
            samples['ppml_sample'], 'DyadicPair', constants.N_BOOTSTRAP, 50, seed=2025
        )

        # --- Phase 4: Primary Estimation ---
        print("\n--- PHASE 4: PRIMARY ECONOMETRIC ESTIMATION ---")
        # Estimate the main PPML model.
        ppml_results = run_ppml_estimation_pipeline(samples['ppml_sample'], base_spec)
        # Conditionally run the computationally intensive Logit model.
        logit_results = None
        if run_intensive_logit:
            logit_spec = base_spec.copy()
            logit_spec['dependent_var'] = 'AnyTrade_Sector_Dummy'
            logit_results = run_logit_estimation_pipeline(
                samples['logit_sample'], logit_spec, constants, seed=2025
            )

        # --- Phase 5: Analysis and Interpretation ---
        print("\n--- PHASE 5: ANALYSIS AND INTERPRETATION ---")
        # Calculate the standard deviation of the key regressor for marginal effects.
        std_devs = calculate_pd_standard_deviations(samples['ppml_sample'], ['PD_GDELT_Filtered_IHS'])
        # Calculate the PPML marginal effects.
        ppml_marginal_effects = compute_ppml_marginal_effects(
            ppml_results, std_devs['PD_GDELT_Filtered_IHS'],
            base_spec['main_pd_term'],
            {'Baseline (Non-WTO)': [], 'WTO Members': [base_spec['wto_interaction_term']]}
        )
        # Quantify the key institutional effects and run hypothesis tests.
        quant_report = run_institutional_quantification_pipeline(ppml_results, std_devs['PD_GDELT_Filtered_IHS'], base_spec)
        # Analyze the temporal stability of the findings.
        temp_report = run_temporal_analysis_pipeline(
            samples['temporal_subsamples'], samples['ppml_sample'], base_spec,
            {'Baseline (Non-WTO)': [], 'WTO Members': [base_spec['wto_interaction_term']]}, 2009
        )

        # --- Phase 6: Robustness Analysis ---
        print("\n--- PHASE 6: ROBUSTNESS ANALYSIS ---")
        # Conditionally run the full suite of robustness checks.
        robustness_results = None
        if run_robustness_checks:
            robustness_results = run_all_robustness_analyses(
                samples['ppml_sample'], base_spec, constants
            )

        # --- Phase 7: Results Presentation ---
        print("\n--- PHASE 7: RESULTS PRESENTATION AND SYNTHESIS ---")
        # Generate and save all result tables.
        tables = run_results_presentation_pipeline(
            {'Baseline PPML': ppml_results},
            {'Baseline Logit': logit_results} if logit_results else {},
            {'Baseline Logit': len(samples['logit_sample'])} if logit_results else {},
            robustness_results if robustness_results else {},
            {'key_robustness_vars': [base_spec['wto_interaction_term']]}
        )
        tables['main_results'].to_csv(output_path / "table_1_main_results.csv")
        if 'robustness_summary' in tables and not tables['robustness_summary'].empty:
            tables['robustness_summary'].to_csv(output_path / "table_2_robustness_summary.csv")

        # Generate and save all figures.
        generate_and_save_all_figures(
            output_path,
            ppml_marginal_effects=ppml_marginal_effects,
            temporal_effects=temp_report['period_specific_effects'],
            ppml_results=ppml_results,
            robustness_results=robustness_results,
            base_spec=base_spec
        )

        # Generate and save the final JSON synthesis report.
        final_report = generate_synthesis_report(
            ppml_results, ppml_marginal_effects, quant_report['wto_attenuation'],
            robustness_results if robustness_results else {},
            temp_report['structural_break_test'],
            {'baseline_scenario': 'Baseline (Non-WTO)', 'wto_scenario': 'WTO Members',
             'key_coefficient_for_robustness': base_spec['wto_interaction_term'],
             'break_year': 2009}
        )
        with open(output_path / "final_synthesis_report.json", 'w') as f:
            json.dump(final_report, f, indent=4)

        # Print a final success message.
        print(f"\n--- EXECUTION COMPLETE ---")
        print(f"All outputs saved to: {output_path.resolve()}")

    except Exception as e:
        # If any error occurs anywhere in the pipeline, catch it, print a
        # detailed error message and traceback, and then re-raise it.
        print(f"\n--- TOP-LEVEL PIPELINE FAILED ---")
        print(f"An error occurred in one of the pipeline stages: {e}")
        traceback.print_exc()
        raise
