# **`README.md`**

# Replication of "*Measuring economic outlook in the news timely and efficiently*"

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2511.04299-b31b1b.svg)](https://arxiv.org/abs/2511.04299)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/measuring_economic_outlook_in_news)
[![Discipline](https://img.shields.io/badge/Discipline-Computational%20Economics-00529B)](https://github.com/chirindaopensource/measuring_economic_outlook_in_news)
[![Data Source](https://img.shields.io/badge/Data%20Source-Swissdox%40LiRI-003299)](https://www.liri.uzh.ch/en/services/swissdox.html)
[![Core Method](https://img.shields.io/badge/Method-LLM--Based%20Sentiment%20Analysis-orange)](https://github.com/chirindaopensource/measuring_economic_outlook_in_news)
[![Analysis](https://img.shields.io/badge/Analysis-Time%20Series%20Forecasting-red)](https://github.com/chirindaopensource/measuring_economic_outlook_in_news)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![Anthropic](https://img.shields.io/badge/Anthropic-Claude%203.5%20Sonnet-D97A53?logo=anthropic&logoColor=white)](https://www.anthropic.com/news/claude-3-5-sonnet)
[![SentenceTransformers](https://img.shields.io/badge/SentenceTransformers-jina--embeddings--v2-2E4053)](https://huggingface.co/jinaai/jina-embeddings-v2-base-de)
[![TensorFlow](https://img.shields.io/badge/TensorFlow-%23FF6F00.svg?style=flat&logo=TensorFlow&logoColor=white)](https://www.tensorflow.org/)
[![Scikit-learn](https://img.shields.io/badge/scikit--learn-%23F7931E.svg?style=flat&logo=scikit-learn&logoColor=white)](https://scikit-learn.org/)
[![Statsmodels](https://img.shields.io/badge/Statsmodels-150458-blue)](https://www.statsmodels.org/stable/index.html)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/measuring_economic_outlook_in_news`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Measuring economic outlook in the news timely and efficiently"** by:

*   Elliot Beck
*   Franziska Eckert
*   Linus Kühne
*   Helge Liebert
*   Rina Rosenblatt-Wisch

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous data validation and cleansing to large-scale embedding, model training, indicator construction, and the final econometric evaluation.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `run_complete_neos_study`](#key-callable-run_complete_neos_study)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Beck et al. (2025). The core of this repository is the iPython Notebook `measuring_economic_outlook_in_news_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed as a robust and scalable system for constructing a high-frequency economic sentiment indicator (NEOS) from a large corpus of news articles.

The paper's central contribution is a novel, resource-efficient methodology for sentiment analysis that is suitable for institutions with data privacy constraints. This codebase operationalizes the paper's experimental design, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Execute a multi-stage pipeline to cleanse, prepare, and generate high-dimensional embeddings for a large news corpus.
-   Train a weakly supervised neural network to filter for economics-relevant articles.
-   Programmatically generate a synthetic, high-quality labeled dataset for sentiment analysis using an LLM (Claude 3.5 Sonnet), avoiding the need for manual labeling and preserving data privacy.
-   Train a regularized logistic regression model to score the sentiment of millions of articles efficiently.
-   Construct the final monthly NEOS indicator, its early-release variants, and a traditional lexicon-based baseline.
-   Perform a comprehensive econometric evaluation using a pseudo-out-of-sample (POOS) forecasting exercise to test the indicator's predictive power for GDP growth.
-   Run Diebold-Mariano tests with HAC-robust errors to assess the statistical significance of the findings.
-   Conduct a full suite of robustness checks to test the sensitivity of the results to key methodological choices.

## Theoretical Background

The implemented methods are grounded in principles from natural language processing, machine learning, and time-series econometrics.

**1. LLM-based Synthetic Data Generation:**
The core innovation is the use of a powerful LLM to generate a small, perfectly labeled training set for sentiment classification. This avoids the costly and time-consuming process of manual annotation and, crucially, allows a sentiment model to be trained without exposing any proprietary source data to external APIs.

**2. High-Dimensional Classification with Regularization:**
The sentiment classifier is a logistic regression model trained on high-dimensional embeddings where the number of features ($p=1024$) exceeds the number of samples ($n=256$). To prevent overfitting, L2 (Ridge) regularization is essential. The model minimizes the regularized negative log-likelihood:
$$
\mathcal{L}(\boldsymbol{\beta}) = -\sum_{i=1}^{n} \left[ y_i \log(\sigma(\mathbf{x}_i^T \boldsymbol{\beta})) + (1-y_i) \log(1-\sigma(\mathbf{x}_i^T \boldsymbol{\beta})) \right] + \lambda ||\boldsymbol{\beta}||_2^2
$$

**3. Pseudo-Out-of-Sample (POOS) Forecast Evaluation:**
To rigorously assess the indicator's predictive value, the project implements a POOS forecasting exercise. This involves simulating a real-time forecasting process by iterating through time, using an expanding window of historical data to estimate the forecasting models at each step. This method strictly avoids look-ahead bias. The core forecasting model is:
$$
y_{t+h} = \alpha + \beta y_{t-1} + \gamma x_t^{(m)} + \varepsilon_t \quad \quad (1)
$$
This is compared against a benchmark AR(1) model where $\gamma=0$.

**4. Diebold-Mariano Test with HAC Errors:**
To test if the improvement in forecast accuracy (measured by RMSE) is statistically significant, the Diebold-Mariano (DM) test is used. The implementation is modified to use a Heteroskedasticity and Autocorrelation Consistent (HAC) variance estimator (Newey-West), which is critical for handling the serial correlation present in multi-step-ahead forecast errors ($h>0$).

## Features

The provided iPython Notebook (`measuring_economic_outlook_in_news_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into 23 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters are managed in an external `config.yaml` file.
-   **Scalable Data Processing:** Includes efficient, batch-based processing for large-scale embedding and inference, with out-of-core storage using HDF5.
-   **Resumable Pipeline:** The main orchestrator implements checkpointing, allowing the pipeline to be stopped and resumed without re-running expensive completed steps.
-   **Robust Model Training:** Implements best practices for both neural network and classical model training, including temporal validation splits, early stopping, and cross-validated hyperparameter tuning.
-   **Rigorous Econometric Analysis:** Implements the full POOS forecasting loop and DM-HAC significance tests with high fidelity.
-   **Complete Replication and Robustness:** A single top-level function call can execute the entire study, including a comprehensive suite of sensitivity analyses.
-   **Full Provenance:** The pipeline generates a detailed log file and a final `reproducibility_manifest.json` that captures all configurations, library versions, and artifact paths for a given run.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Cleansing (Tasks 1-5):** Ingests and validates all raw inputs, cleanses the news corpus according to the paper's scope, and adds temporal features.
2.  **Embedding (Task 6):** Generates 1024-dimensional `jina-embeddings-v2` for the entire corpus.
3.  **Relevance Filtering (Tasks 7-9):** Trains a weakly supervised MLP to identify economics-related articles and filters the corpus.
4.  **Sentiment Model Training (Tasks 10-12):** Generates a synthetic training set with Claude 3.5 Sonnet, embeds it, and trains an L2-regularized logistic regression classifier.
5.  **Indicator Construction (Tasks 13-14):** Scores all relevant articles for sentiment and aggregates the scores into monthly baseline and early-release indicators.
6.  **Econometric Evaluation (Tasks 15-20):** Aligns all indicators to a quarterly frequency, runs the full POOS forecasting exercise, computes RMSE ratios, performs DM-HAC tests, and generates correlation tables.
7.  **Visualization & Robustness (Tasks 21-23):** Generates all charts from the paper and runs a full suite of sensitivity analyses on key methodological choices.

## Core Components (Notebook Structure)

The `measuring_economic_outlook_in_news_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 23 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `run_complete_neos_study`

The project is designed around a single, top-level user-facing interface function:

-   **`run_complete_neos_study`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end, including the baseline analysis and all robustness checks. A single call to this function reproduces the entire computational portion of the project.

## Prerequisites

-   Python 3.9+
-   An Anthropic API key.
-   Sufficient disk space for embeddings (~8GB per million articles).
-   A GPU is highly recommended for the embedding and relevance model training steps.
-   Core dependencies: `pandas`, `numpy`, `pyyaml`, `pyarrow`, `tensorflow`, `scikit-learn`, `statsmodels`, `sentence-transformers`, `h5py`, `joblib`, `anthropic`, `umap-learn`, `matplotlib`, `seaborn`, `tqdm`, `faker`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/measuring_economic_outlook_in_news.git
    cd measuring_economic_outlook_in_news
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install -r requirements.txt
    ```

4.  **Set your Anthropic API Key:**
    ```sh
    export ANTHROPIC_API_KEY='your-key-here'
    ```

## Input Data Structure

The pipeline requires several input DataFrames with specific schemas, which are rigorously validated. A synthetic data generator is included in the notebook for a self-contained demonstration.
1.  **`raw_news_data_df`**: The large-scale news article corpus.
2.  **`raw_macro_data_df`**: Quarterly macroeconomic data (GDP, etc.).
3.  **`monthly_indicator_data_df`**: Monthly comparator indicators (PMI, KOF).
4.  **`release_calendar_df`**: Metadata on indicator release dates.
5.  **`evaluation_windows_df`**: Metadata on valid evaluation periods for certain indicators.
6.  A translated German sentiment **lexicon file** (CSV).

All other parameters are controlled by the `config.yaml` file.

## Usage

The `measuring_economic_outlook_in_news_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `run_complete_neos_study` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load configuration from the YAML file.
    with open('config.yaml', 'r') as f:
        config = yaml.safe_load(f)
    
    # 2. Define paths.
    ROOT_OUTPUT_DIRECTORY = './neos_study_output'
    LEXICON_PATH = './dummy_lexicon_de.csv'
    
    # 3. Generate a full set of synthetic data files for the demonstration.
    # (The data generation functions are defined earlier in the notebook)
    raw_news_data_df = create_synthetic_news_data(1000, config)
    # ... create all other synthetic DataFrames ...
    
    # 4. Execute the entire replication study.
    final_results = run_complete_neos_study(
        raw_news_data_df=raw_news_data_df,
        raw_macro_data_df=raw_macro_data_df,
        monthly_indicator_data_df=monthly_indicator_data_df,
        release_calendar_df=release_calendar_df,
        evaluation_windows_df=evaluation_windows_df,
        fused_master_input_specification=config,
        root_output_directory=ROOT_OUTPUT_DIRECTORY,
        lexicon_path=LEXICON_PATH
    )
    
    # 5. Inspect final results.
    print("--- Baseline Run Status ---")
    print(final_results['baseline_run_results']['status'])
```

## Output Structure

The pipeline generates a structured output directory:
-   **`output/baseline_run/`**: Contains all artifacts from the main pipeline run.
    -   `data/`: Intermediate data files (embeddings, scores, etc.).
    -   `models/`: Trained model files.
    -   `results/`: Final result tables (CSV) and charts (PNG).
    -   `pipeline_run.log`: A detailed log file for the run.
    -   `reproducibility_manifest.json`: A complete record of the run.
-   **`output/robustness_checks/`**: Contains a subdirectory for each sensitivity analysis, with the same internal structure as `baseline_run`.
-   **`output/robustness_checks/robustness_summary_results.csv`**: A master table comparing key results across all robustness checks.

## Project Structure

```
measuring_economic_outlook_in_news/
│
├── measuring_economic_outlook_in_news_draft.ipynb
├── config.yaml
├── requirements.txt
│
├── neos_study_output/
│   ├── baseline_run/
│   │   ├── data/
│   │   ├── models/
│   │   └── results/
│   └── robustness_checks/
│       ├── sensitivity_tau_0.4/
│       └── ...
│
├── LICENSE
└── README.md
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify all study parameters, including model names, API settings, filtering thresholds, and file paths, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Alternative Embedding Models:** The modular design allows for easy substitution of the `model_name` in the config to test other embedding models.
-   **Different Classifiers:** The sentiment model could be replaced with other classifiers (e.g., SVM, Gradient Boosting) to test for performance differences.
-   **Advanced Econometric Models:** The forecasting exercise could be extended to include more complex models, such as VARs or models with dynamic variable selection.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{beck2025measuring,
  title={Measuring economic outlook in the news timely and efficiently},
  author={Beck, Elliot and Eckert, Franziska and K{\"u}hne, Linus and Liebert, Helge and Rosenblatt-Wisch, Rina},
  journal={arXiv preprint arXiv:2511.04299},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Production-Grade Replication of "Measuring economic outlook in the news timely and efficiently".
GitHub repository: https://github.com/chirindaopensource/measuring_economic_outlook_in_news
```

## Acknowledgments

-   Credit to **Elliot Beck, Franziska Eckert, Linus Kühne, Helge Liebert, and Rina Rosenblatt-Wisch** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, TensorFlow, Scikit-learn, Statsmodels, Sentence-Transformers, and Anthropic**.

--

*This README was generated based on the structure and content of the `measuring_economic_outlook_in_news_draft.ipynb` notebook and follows best practices for research software documentation.*

# Paper

Title: "*Measuring economic outlook in the news timely and efficiently*"

Authors: Elliot Beck, Franziska Eckert, Linus Kühne, Helge Liebert, Rina Rosenblatt-Wisch

E-Journal Submission Date: 6 November 2025

Link: https://arxiv.org/abs/2511.04299

Abstract:

We introduce a novel indicator that combines machine learning and large language models with traditional statistical methods to track sentiment regarding the economic outlook in Swiss news. The indicator is interpretable and timely, and it significantly improves the accuracy of GDP growth forecasts. Our approach is resource-efficient, modular, and offers a way of benefitting from state-of-the-art large language models even if data are proprietary and cannot be stored or analyzed on external infrastructure - a restriction faced by many central banks and public institutions.

# Summary

### **Summary: "Measuring economic outlook in the news timely and efficiently"**

#### **The Core Objective and Contribution**

The authors aim to construct a high-frequency, real-time indicator of the Swiss economic outlook, which they name the **News-based Economic Outlook for Switzerland (NEOS)**. The primary contribution is not the invention of a new algorithm, but rather the development of a novel, resource-efficient methodology that makes state-of-the-art Natural Language Processing (NLP) techniques viable for institutions like central banks. These institutions often face two critical constraints:

1.  **Data Privacy:** Proprietary or sensitive data (in this case, licensed news articles) cannot be sent to external, cloud-based APIs for analysis.
2.  **Computational Resources:** Analyzing millions of documents with massive Large Language Models (LLMs) can be prohibitively expensive and slow.

The paper’s proposed pipeline elegantly circumvents these issues while delivering a high-performance economic indicator.

#### **The Methodological Pipeline – A Hybrid Approach**

The construction of NEOS is a multi-stage process that intelligently combines different machine learning paradigms. I will break it down as presented in the paper.

**(1) Data Foundation:**
The process begins with a massive corpus of 26.2 million Swiss newspaper articles (in German and French) from the Swissdox@LiRI database, spanning from 1999 to 2025. This provides a comprehensive and representative view of media discourse.

**(2) Semantic Representation: From Text to Vectors**
The authors transform the unstructured text into a structured, numerical format using a modern embedding model, `jina-embeddings-v3`. Each article becomes a 1024-dimensional vector. This is a crucial step. Unlike older bag-of-words methods, these embeddings capture the semantic meaning and context of the text, allowing for a more nuanced understanding. The choice of a multilingual model is also essential for handling both German and French sources without separate pipelines.

**(3) Relevance Filtering: Isolating Economic News**
Analyzing all 26 million articles for sentiment would be inefficient. The authors first train a neural network to perform a binary classification: is an article about economics or not? In a clever application of weak supervision, they generate a training set by assuming that articles published in dedicated "Business" or "Economics" sections are, by definition, relevant. This classifier is then used to filter the entire corpus, reducing it to a more manageable 3.1 million relevant articles.

**(4) The Core Innovation: LLM-Generated Synthetic Training Data**
This is the most brilliant part of their methodology. Manually labeling thousands of articles for positive or negative sentiment is a time-consuming, expensive, and subjective task. Instead, the authors use a powerful generative LLM (`Claude 3.5 Sonnet`) as a "synthetic data generator." They prompt the LLM to write 256 archetypal articles: 128 with a stereotypically positive economic outlook and 128 with a negative one, covering various topics (financial markets, labor, etc.).

This creates a small, perfectly polarized, and high-quality training dataset without any human labeling effort. From a machine learning perspective, this is a form of "tens-of-shot" learning, where the power of a massive LLM is distilled into a small, targeted dataset.

**(5) Sentiment Scoring: An Efficient and Interpretable Classifier**
The 256 synthetic articles are converted into their 1024-dimensional vector embeddings. A UMAP projection (Chart 2) visually confirms that these embeddings cluster neatly into "positive" and "negative" groups, validating that the vector space effectively captures the sentiment dimension.

With this validation, the authors train a simple **logistic regression model** on these 256 embedded vectors. The use of regularization is technically sound, as the number of features (1024) far exceeds the number of observations (256). This model learns the linear boundary in the high-dimensional space that separates positive from negative sentiment.

This trained logistic regression classifier is then applied to the embeddings of the 3.1 million relevant newspaper articles. The model is computationally trivial to run at this scale and produces a sentiment probability score (from 0 for negative to 1 for positive) for each article.

**(6) Aggregation: From Articles to a Time-Series Indicator**
Finally, the individual article scores are aggregated—in this case, by taking a simple monthly average—to create the final NEOS time series. The authors also compute higher-frequency variants (e.g., using only the first 7, 14, or 21 days of a month) to demonstrate the indicator's timeliness.

#### **Econometric Validation and Performance**

The paper subjects the NEOS indicator to a rigorous econometric evaluation, which is the gold standard for validating such tools.

**(1) Forecasting Setup:**
The authors conduct a pseudo-out-of-sample forecasting exercise for Swiss year-on-year GDP growth. They use a standard autoregressive (AR(1)) model as a benchmark and test whether adding NEOS provides statistically significant marginal predictive content. The model is:
*   *Y<sub>t+h</sub> = α + βY<sub>t-1</sub> + γX<sub>t</sub><sup>(m)</sup> + ε<sub>t</sub>*

The key is to test if *γ* is statistically significant and if its inclusion reduces the forecast error.

**(2) Performance Results (Table 1):**
*   **Improved Accuracy:** NEOS consistently improves GDP growth forecasts across all horizons (nowcast *h=0*, and forecasts *h=1, h=2*). The Root Mean Squared Error (RMSE) ratios are substantially below 1, indicating a roughly 15-20% reduction in forecast error.
*   **Statistical Significance:** The improvements are statistically significant according to the Diebold-Mariano test, confirming that the gains are not due to chance.
*   **Value of Timeliness:** Crucially, the "early-release" versions of NEOS (e.g., NEOS calculated on the first 21 days of the third month of a quarter) are often the *best* performers. This empirically proves the economic value of receiving a signal weeks before traditional data is released.
*   **Superiority to Baselines:** NEOS generally outperforms a traditional lexicon-based sentiment indicator and most standard survey-based indicators (e.g., SECO Consumer Sentiment, KOF Business Situation). While the Manufacturing PMI is competitive, NEOS has the advantage of higher frequency and is not tied to a fixed survey schedule.

**(3) Crisis Performance (Charts 4 & 5):**
The analysis rightly highlights that the value of such indicators is magnified during times of high uncertainty. The daily plot in Chart 4 shows how NEOS can capture sharp shifts in sentiment in response to specific news events (e.g., tariff announcements). Furthermore, a cumulative squared error analysis (Chart 5) demonstrates that NEOS's outperformance is particularly pronounced during major economic shocks like the Global Financial Crisis and the COVID-19 pandemic.

#### **Conclusion and Overall Assessment**

From my perspective as an interdisciplinary professor, this paper is an excellent example of applied research and sound engineering.

*   **From a Computer Science perspective:** The "LLM-as-generator" to train a simple, efficient classifier is an elegant and highly practical pattern. It leverages the semantic intelligence of foundation models without becoming dependent on them for large-scale inference, solving the privacy and cost problems. The modular design is also robust, allowing for future upgrades to the embedding model or LLM.
*   **From an Econometrics perspective:** The validation is rigorous and follows best practices in the field. The focus on pseudo-out-of-sample performance, the use of a standard benchmark model, and formal statistical tests of forecast improvement lend strong credibility to the results.
*   **From a Finance perspective:** The demonstration of timeliness is key. For financial market participants and policymakers, an accurate signal even a few weeks early is immensely valuable for decision-making. The ability to "drill down" from the aggregate index to the specific articles driving a change provides a level of interpretability that black-box models lack.

In summary, the authors have successfully developed and validated a novel economic indicator that is not only accurate and timely but also designed to operate within the practical constraints of real-world policy and financial institutions. It is a compelling case study in the thoughtful application of modern AI to solve a long-standing problem in economics.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================
#
#  Measuring Economic Outlook in the News Timely and Efficiently
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Measuring economic outlook in the news
#  timely and efficiently" by Beck et al. (2025). It delivers a robust,
#  end-to-end pipeline for creating a high-frequency economic sentiment indicator
#  from unstructured news text, designed for environments with strict data
#  privacy and computational constraints.
#
#  Core Methodological Components:
#  • Large-scale text data cleansing and preparation with full provenance tracking.
#  • High-dimensional feature engineering via multilingual Transformer-based embeddings.
#  • Weakly supervised neural network for domain-specific text classification (relevance filtering).
#  • LLM-based synthetic data generation for privacy-preserving sentiment classifier training.
#  • L2-regularized logistic regression for efficient and interpretable sentiment scoring.
#  • Rigorous econometric validation using pseudo-out-of-sample (POOS) forecasting.
#  • Statistical significance testing with Diebold-Mariano tests modified with HAC robust errors.
#
#  Technical Implementation Features:
#  • Modular, function-based architecture with a top-level orchestrator.
#  • Robust MLOps practices including checkpointing for resumability and comprehensive logging.
#  • Efficient, out-of-core processing for large embedding matrices using HDF5.
#  • Systematic hyperparameter tuning via stratified cross-validation.
#  • Generation of publication-quality diagnostic charts and result tables.
#  • A complete reproducibility manifest capturing configuration, library versions, and artifacts.
#
#  Paper Reference:
#  Beck, E., Eckert, F., Kühne, L., Liebert, H., & Rosenblatt-Wisch, R. (2025).
#  Measuring economic outlook in the news timely and efficiently.
#  arXiv preprint arXiv:2511.04299. https://arxiv.org/abs/2511.04299
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================

# ==============================================================================
# Fused Imports for the End-to-End NEOS Pipeline
# ==============================================================================

# --- Standard Library ---
import copy
import hashlib
import json
import logging
import os
import sys
import time
import uuid
import warnings
from typing import Any, Dict, List, Optional, Set, Tuple

# --- Core Scientific Computing ---
import numpy as np
import pandas as pd

# --- Machine Learning & NLP ---
import h5py
import joblib
import tensorflow as tf
import torch
import umap
from anthropic import APIError, Anthropic
from sentence_transformers import SentenceTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler

# --- Econometrics & Statistics ---
import statsmodels.formula.api as smf
import statsmodels.tsa.api as tsa
from scipy.stats import norm

# --- Visualization ---
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import seaborn as sns

# --- Utilities ---
from tqdm import tqdm


# Implementation

## Draft 1

### **Comprehensive Callable Summary**

#### **Task 1: `validate_raw_news_corpus`**
*   **Inputs:** `raw_news_data_df` (the unprocessed news corpus), `fused_master_input_specification` (the master configuration).
*   **Processes:** This function is a pure validation orchestrator. It performs a series of checks without modifying data:
    1.  Asserts schema compliance (column names, dtypes, timezone-aware UTC timestamps).
    2.  Verifies primary key (`article_id`) uniqueness.
    3.  Validates that categorical columns (`publication_type`, `language`) adhere to controlled vocabularies.
    4.  Checks for one-to-one mapping consistency between `outlet_id` and `source_outlet`.
    5.  Quantifies `section_label` missingness and checks temporal integrity of timestamps.
*   **Outputs:** A dictionary containing a `'status'` ('Success' or 'Failure'), a list of `errors` and `warnings`, and a `missingness_report` DataFrame.
*   **Transformation:** No data transformation occurs. The function is a read-only gatekeeper.
*   **Role in Research Pipeline:** This callable implements the initial data quality assurance step, which is a prerequisite for any rigorous analysis but is not explicitly detailed in the paper's main text. It ensures the raw data from the **Swissdox@LiRI database (Step 1, Page 3)** is structurally sound and consistent before any processing begins.

#### **Task 2: `validate_macro_data`**
*   **Inputs:** `raw_macro_data_df`, `monthly_indicator_data_df`, `release_calendar_df`, `evaluation_windows_df`.
*   **Processes:** This is another pure validation orchestrator for the macroeconomic and metadata tables.
    1.  Asserts the temporal frequency and schema of all input DataFrames.
    2.  Cross-validates that the quarterly `_m2` indicators in `raw_macro_data_df` are consistent with the values in the authoritative `monthly_indicator_data_df`.
    3.  Verifies historical `NaN` patterns in the monthly data against the `evaluation_windows_df` to guard against backfilling.
    4.  Checks the integrity of the metadata tables.
*   **Outputs:** A dictionary containing a `'status'` and a consolidated list of `issues`.
*   **Transformation:** No data transformation occurs.
*   **Role in Research Pipeline:** This callable ensures the integrity of the comparator indicators (PMIs, KOF, SECO) and the dependent variable (GDP growth) used in the **Econometric Validation (Section 3, Page 5)**. It rigorously enforces the data availability constraints mentioned in the footnotes of **Table 1 (Page 6)**.

#### **Task 3: `validate_master_config`**
*   **Inputs:** `fused_master_input_specification`.
*   **Processes:** This is a pure validation function that recursively checks the entire configuration dictionary against the hardcoded methodological choices from the paper. It validates everything from date ranges and model names to neural network architecture and econometric test parameters.
*   **Outputs:** A dictionary containing a `'status'` and a list of `issues`.
*   **Transformation:** No data transformation occurs.
*   **Role in Research Pipeline:** This is a meta-level function that serves as a pre-flight check for the entire replication study. It ensures that the code is configured to execute the exact methodology described throughout the paper, from **Data and methods (Section 2)** to **Results (Section 3)**.

#### **Task 4: `cleanse_news_corpus`**
*   **Inputs:** `raw_news_data_df`, `fused_master_input_specification`.
*   **Processes:** This is the first data transformation function.
    1.  Filters the corpus based on date range, language, and publication type.
    2.  Removes records with null or empty critical fields.
    3.  Performs deterministic deduplication based on a content fingerprint.
*   **Outputs:** A tuple containing the cleansed `pd.DataFrame` and a detailed `audit_log` dictionary.
*   **Transformation:** The input DataFrame is transformed by removing rows that do not meet the study's inclusion criteria. The number of rows is reduced.
*   **Role in Research Pipeline:** This callable implements the data curation rules described in **Footnote 1 (Page 3)**, which states, "...we discard some very specific media outlets... only concentrate on print and online newspaper articles, excluding other sources...". It prepares the raw corpus for feature engineering.

#### **Task 5: `prepare_corpus_for_embedding`**
*   **Inputs:** The cleansed DataFrame from Task 4 (`clean_df`) and the original raw DataFrame (`raw_df`).
*   **Processes:**
    1.  Performs a final assertion of timestamp integrity.
    2.  Adds new temporal feature columns (`year_month`, `day_of_month`, and boolean flags for early-release windows).
    3.  Generates a comprehensive audit manifest comparing the pre- and post-cleansing corpus.
*   **Outputs:** A tuple containing the feature-enriched `pd.DataFrame` and the `audit_manifest` dictionary.
*   **Transformation:** The input DataFrame is transformed by adding new columns. The number of rows remains the same.
*   **Role in Research Pipeline:** This callable prepares the data for the subsequent aggregation steps. The creation of early-release window flags is a direct prerequisite for constructing the timelier NEOS variants mentioned on **Page 4**: "...compute timelier variants of NEOS using just the articles released during the first 7, 14, or 21 days of each month...".

#### **Task 6: `generate_document_embeddings`**
*   **Inputs:** The prepared DataFrame from Task 5, `fused_master_input_specification`.
*   **Processes:** This is the core feature engineering step.
    1.  Loads the `jina-embeddings-v3` model.
    2.  Iterates through the corpus in batches, tokenizing and computing token statistics.
    3.  Runs local inference to transform each article's text into a 1024-dimensional vector.
    4.  Saves the embeddings to an HDF5 file and creates a crosswalk table.
    5.  Performs quality validation and drift analysis on the generated embeddings.
*   **Outputs:** A tuple containing the DataFrame augmented with token stats, paths to the embeddings and crosswalk files, and a diagnostics dictionary.
*   **Transformation:** The `full_text` column (unstructured text) is transformed into a large numerical matrix of embeddings, stored on disk. The input DataFrame is transformed by adding token statistic columns.
*   **Role in Research Pipeline:** This callable implements **Step (2) "Get document embeddings" (Page 2)**. It is the direct implementation of the statement on **Page 3**: "We use the embedding model `jina-embeddings-v3`... to generate document embeddings, i.e., transforming the articles into a numerical representation (a vector of dimension 1024)."

#### **Task 7: `prepare_relevance_training_data`**
*   **Inputs:** The augmented DataFrame, paths to the embeddings and crosswalk, `fused_master_input_specification`.
*   **Processes:**
    1.  Creates weak binary labels (`y_econ`) from the `section_label` metadata.
    2.  Identifies a high-quality subset of data from outlets with reliable metadata coverage.
    3.  Performs a strict temporal train/validation split to prevent look-ahead bias.
    4.  Retrieves the corresponding embedding vectors from the HDF5 file for the train and validation sets, ensuring perfect alignment.
*   **Outputs:** A tuple of four objects: `X_train`, `y_train`, `X_val`, `y_val`.
*   **Transformation:** A subset of the corpus metadata is transformed into two pairs of aligned feature matrices and label vectors, ready for model training.
*   **Role in Research Pipeline:** This callable implements the data preparation for **Step (3) "Filter relevant articles" (Page 2)**. It operationalizes the weak supervision approach described on **Page 3**: "We use the embeddings of articles published in these sections to train a neural network to classify whether an article is about economics or not."

#### **Task 8: `train_relevance_classifier`**
*   **Inputs:** `X_train`, `y_train`, `X_val`, `y_val`, `fused_master_input_specification`.
*   **Processes:**
    1.  Builds the MLP neural network architecture as specified in the configuration.
    2.  Configures the training process with the Adam optimizer, binary cross-entropy loss, and balanced class weighting.
    3.  Sets up an early stopping callback based on the out-of-time validation set performance (`val_auc`).
    4.  Trains the model and persists the final, best-performing version to disk.
*   **Outputs:** A tuple containing the path to the saved model and a dictionary of training results.
*   **Transformation:** The training data is transformed into a trained, serialized model artifact.
*   **Role in Research Pipeline:** This callable implements the model training portion of **Step (3) "Filter relevant articles" (Page 2)**, resulting in the classifier that will be used to filter the entire corpus.

#### **Task 9: `filter_corpus_by_relevance`**
*   **Inputs:** The full augmented DataFrame, paths to the trained relevance model, embeddings, and crosswalk, `fused_master_input_specification`.
*   **Processes:**
    1.  Loads the trained relevance classifier.
    2.  Runs batch inference on the *entire* corpus of embeddings to generate a relevance probability for every article.
    3.  Applies the classification threshold (`τ=0.5`) to identify the subset of relevant articles (`R`).
    4.  Persists all scores for auditability and creates the final filtered DataFrame `df_relevant`.
*   **Outputs:** A tuple containing the `df_relevant` DataFrame, the path to the file of all scores, and an audit report.
*   **Transformation:** The full corpus is transformed into a smaller, filtered corpus containing only articles deemed relevant to economics.
*   **Role in Research Pipeline:** This callable completes **Step (3) "Filter relevant articles" (Page 2)**. It is the direct implementation of: "We then use this model to identify articles related to economics in the data. We keep the articles related to economics and discard the others..." **(Page 3)**.

#### **Task 10: `generate_synthetic_articles`**
*   **Inputs:** `fused_master_input_specification`, an output path.
*   **Processes:**
    1.  Configures a robust, resumable loop for interacting with the Claude 3.5 Sonnet API.
    2.  Systematically constructs prompts based on the templates in Appendix A.1 to generate a balanced set of 256 articles (128 positive, 128 negative) across several economic domains.
    3.  Validates the generated text for quality (word count, uniqueness).
*   **Outputs:** The path to the saved CSV file containing the synthetic corpus.
*   **Transformation:** Prompts are transformed into a structured, labeled dataset of synthetic text.
*   **Role in Research Pipeline:** This callable implements **Step (4) "Generate example articles" (Page 2)**. It is the direct implementation of the novel data generation strategy described on **Page 3**: "...we generate synthetic example articles conveying positive or negative economic outlook using the LLM Claude 3.5 Sonnet... We generate 256 articles...".

#### **Task 11: `process_synthetic_embeddings`**
*   **Inputs:** The path to the synthetic corpus, `fused_master_input_specification`.
*   **Processes:**
    1.  Embeds the 256 synthetic articles using the *same* `jina-embeddings-v3` model from Task 6.
    2.  Validates the numerical integrity and summary statistics of the synthetic embeddings.
    3.  Performs UMAP dimensionality reduction and generates the annotated scatter plot shown in Chart 2.
*   **Outputs:** A tuple containing the `(256, 1024)` NumPy array of synthetic embeddings (`Z`), the corresponding labels (`y`), and the path to the UMAP plot.
*   **Transformation:** The synthetic text is transformed into a numerical matrix `Z`.
*   **Role in Research Pipeline:** This callable prepares the data for and provides the visual evidence supporting **Step (5.a) "Logistic regression" (Page 2)**. The visualization directly replicates **Chart 2 (Page 4)** and validates the assumption that "...The embedded articles clearly separate into two distinct clusters... which indicates that the embeddings effectively capture differences in sentiment."

#### **Task 12: `train_sentiment_classifier`**
*   **Inputs:** The synthetic embeddings `Z`, labels `y`, `fused_master_input_specification`.
*   **Processes:**
    1.  Performs a stratified K-fold cross-validated grid search to find the optimal L2 regularization strength `C`.
    2.  Trains a final `LogisticRegression` model on the entire synthetic dataset using the optimal `C`.
    3.  Persists the trained scikit-learn model object to disk.
*   **Outputs:** A tuple containing the path to the saved model and a dictionary of CV results.
*   **Transformation:** The synthetic training data is transformed into a trained, serialized sentiment classifier.
*   **Role in Research Pipeline:** This callable implements **Step (5.a) "Logistic regression" (Page 2)**. It trains the model that minimizes the regularized negative log-likelihood:
    $$
    \mathcal{L}(\boldsymbol{\beta}) = -\sum_{i=1}^{n} \left[ y_i \log(\sigma(\mathbf{x}_i^T \boldsymbol{\beta})) + (1-y_i) \log(1-\sigma(\mathbf{x}_i^T \boldsymbol{\beta})) \right] + \lambda ||\boldsymbol{\beta}||_2^2
    $$
    as described on **Page 4**, where the cross-validation finds the optimal regularization parameter $\lambda = 1/C$.

#### **Task 13: `score_relevant_articles`**
*   **Inputs:** The `df_relevant` DataFrame, paths to the sentiment model, embeddings, and crosswalk.
*   **Processes:**
    1.  Loads the trained sentiment classifier.
    2.  Runs batch inference on all embeddings corresponding to the relevant articles.
    3.  For each article `i`, it calculates the probability of a positive outlook, $p_i$.
    4.  Validates and persists the final DataFrame of scored articles.
*   **Outputs:** The path to the saved file of scored articles.
*   **Transformation:** The set of relevant articles is transformed by adding a sentiment score, $p_i$, to each one.
*   **Role in Research Pipeline:** This callable implements **Step (5.b) "Compute indicator" (Page 2)**. It is the direct implementation of the inference step described on **Page 4**: "We then apply the fitted logistic regression to the embeddings of the relevant news articles... This procedure results in a probability score between zero and one for each of the relevant articles." The probability score is calculated as:
    $$
    p_i = \sigma\big( \hat{w}^\top x_i + \hat{b} \big) = \frac{1}{1 + \exp\big(- (\hat{w}^\top x_i + \hat{b})\big)}
    $$

#### **Task 14: `construct_neos_indicators`**
*   **Inputs:** The path to the scored articles file, `fused_master_input_specification`.
*   **Processes:**
    1.  Aggregates the article-level scores (`p_i`) into a monthly time series by taking the simple arithmetic mean for each month.
    2.  Repeats this aggregation on filtered subsets of the data to create the early-release variants (first 7, 14, 21 days).
    3.  Optionally computes a daily, month-to-date cumulative average for diagnostic purposes.
*   **Outputs:** A tuple containing the `pd.DataFrame` of all monthly indicators and the optional `pd.DataFrame` of the daily indicator.
*   **Transformation:** The article-level data is transformed into aggregate monthly and daily time series.
*   **Role in Research Pipeline:** This callable completes **Step (5.b) "Compute indicator" (Page 2)**. It implements the aggregation described on **Page 4**: "...we compute our indicator by averaging all probability scores in each month." It computes the baseline indicator $\mathrm{NEOS}_m$ and the early-release variants $\mathrm{NEOS}_m^{(k)}$.

#### **Task 15: `prepare_forecasting_dataset`**
*   **Inputs:** All monthly indicator DataFrames and the raw quarterly macro DataFrame.
*   **Processes:**
    1.  Aligns all monthly indicators to a quarterly frequency using the specific information set policy (selecting month `m=2` for baseline indicators, `m=3` for early-release).
    2.  Merges these aligned predictors with the quarterly GDP data.
    3.  Creates the lagged dependent variable (`y_{t-1}`) required for the regression models.
*   **Outputs:** A single, wide-format quarterly DataFrame ready for econometric modeling.
*   **Transformation:** Multiple monthly and quarterly time series are transformed and merged into a single, aligned quarterly modeling dataset.
*   **Role in Research Pipeline:** This callable prepares the final dataset for the **pseudo-out-of-sample experiment (Page 5)**. It correctly constructs the variables $y_{t-1}$ and $x_t^{(m)}$ for the forecasting regression.

#### **Task 16: `execute_poos_forecasts`**
*   **Inputs:** The final forecasting DataFrame, `fused_master_input_specification`, `evaluation_windows_df`.
*   **Processes:**
    1.  Iterates through each indicator and forecast horizon `h`.
    2.  Implements the expanding-window POOS loop: at each time `t`, it fits the regression models on all data up to `t` and predicts for `t+h`.
    3.  Correctly handles limited-availability indicators by restricting the evaluation period.
    4.  Stores the forecast errors for every model, horizon, and time point.
*   **Outputs:** A long-format `pd.DataFrame` containing all forecast errors.
*   **Transformation:** The quarterly time-series data is transformed into a comprehensive set of out-of-sample forecast errors.
*   **Role in Research Pipeline:** This callable is the core engine of the econometric evaluation. It executes the entire **pseudo-out-of-sample experiment (Page 5)** by repeatedly estimating the benchmark AR(1) model and the indicator-augmented model:
    $$
    y_{t+h} = \alpha + \beta y_{t-1} + \gamma x_t^{(m)} + \varepsilon_t \quad \quad (1)
    $$

#### **Task 17: `compute_rmse_ratios`**
*   **Inputs:** The `forecast_errors_df` from Task 16.
*   **Processes:**
    1.  For each indicator and horizon, it finds the common sample of forecast origins shared with the AR(1) benchmark.
    2.  It computes the RMSE for both the indicator model and the benchmark model on this identical sample.
    3.  It computes the ratio of the two RMSEs.
    4.  It assembles the results into a wide-format table.
*   **Outputs:** A `pd.DataFrame` of RMSE ratios, replicating the structure of Table 1.
*   **Transformation:** The long-format error DataFrame is transformed into a summary table of performance metrics.
*   **Role in Research Pipeline:** This callable computes the primary evaluation metric reported in **Table 1 (Page 6)**. It calculates the ratio:
    $$
    R_h = \frac{\mathrm{RMSE}_{\text{Equation (1)}}}{\mathrm{RMSE}_{\text{AR(1)}}}
    $$

#### **Task 18: `perform_diebold_mariano_tests`**
*   **Inputs:** The `forecast_errors_df` and the `rmse_ratios_df`.
*   **Processes:**
    1.  For each indicator and horizon, it computes the loss differential series ($d_t$) on the common sample.
    2.  It estimates the HAC-robust variance of the mean of $d_t$ using the Newey-West estimator with a Bartlett kernel.
    3.  It computes the DM test statistic and its p-value.
    4.  It appends the p-values to the RMSE ratio table.
*   **Outputs:** The final evaluation table, augmented with DM p-values.
*   **Transformation:** The forecast errors are transformed into test statistics and p-values.
*   **Role in Research Pipeline:** This callable performs the statistical significance testing described on **Page 6**: "We assess statistical significance by applying a modified Diebold-Mariano test." It provides the p-values reported in parentheses in **Table 1 (Page 6)**.

#### **Task 19: `construct_lexicon_baseline_indicator`**
*   **Inputs:** The `df_relevant` DataFrame, path to the lexicon file.
*   **Processes:**
    1.  Loads a translated German sentiment lexicon.
    2.  Scores all relevant German-language articles based on the polarity of words in their text.
    3.  Aggregates the article-level scores into a monthly time series.
*   **Outputs:** A `pd.DataFrame` containing the monthly lexicon-based indicator.
*   **Transformation:** A subset of the corpus is transformed into a new monthly time series.
*   **Role in Research Pipeline:** This callable implements the construction of the **lexicon-based indicator (Appendix A.3, Page 11)**, which serves as a simpler, text-based baseline for comparison against the main NEOS indicator in the forecasting exercise.

#### **Task 20: `compute_correlation_benchmarks`**
*   **Inputs:** All monthly indicator series and the quarterly macro data.
*   **Processes:**
    1.  Transforms all monthly indicators to a quarterly frequency using simple three-month averaging.
    2.  Computes the Pearson correlation between each indicator (at lags 0-4) and both yoy and qoq GDP growth.
    3.  Assembles the results into two wide-format tables.
*   **Outputs:** Two `pd.DataFrame`s containing the correlation results.
*   **Transformation:** Time series are transformed into a table of correlation coefficients.
*   **Role in Research Pipeline:** This callable replicates the descriptive analysis presented in **Appendix A.2 "Correlations with GDP growth"** and generates the results shown in **Table 2 (Page 11)**.

#### **Task 21: `generate_all_charts`**
*   **Inputs:** All necessary data artifacts (synthetic data, monthly/daily indicators, forecast errors, etc.).
*   **Processes:** This orchestrator calls dedicated helper functions to generate each of the four main figures from the paper:
    *   Chart 2: UMAP visualization.
    *   Chart 3: Time-series comparison of indicators and GDP.
    *   Chart 4: Daily month-to-date timeliness diagnostic.
    *   Chart 5: Cumulative squared error difference for crisis performance analysis.
*   **Outputs:** A dictionary of paths to the saved image files.
*   **Transformation:** Various data artifacts are transformed into static visualizations.
*   **Role in Research Pipeline:** This callable generates the key figures—**Chart 2 (Page 4), Chart 3 (Page 5), Chart 4 (Page 7), and Chart 5 (Page 12)**—that visually communicate the paper's main findings.

#### **Task 22: `run_neos_pipeline`**
*   **Inputs:** All raw data DataFrames, the master config, and an output directory.
*   **Processes:** This is the main orchestrator. It executes the entire sequence of tasks from 1 to 21 in a single, robust, and resumable workflow. It manages data flow, checkpointing, logging, and final artifact generation.
*   **Outputs:** A dictionary containing the run status and paths to all generated artifacts.
*   **Transformation:** It orchestrates the entire transformation from raw data to final results.
*   **Role in Research Pipeline:** This callable represents the entire end-to-end research pipeline itself.

#### **Task 23: `run_robustness_analyses`**
*   **Inputs:** All raw data DataFrames, the master config, and a base output directory.
*   **Processes:** This is a "meta-orchestrator." It systematically runs the main `run_neos_pipeline` multiple times, each time with a slightly modified configuration or input dataset, to test the sensitivity of the results to key methodological choices. It then aggregates the results from all runs.
*   **Outputs:** A summary `pd.DataFrame` comparing the key evaluation metrics across all sensitivity runs.
*   **Transformation:** It orchestrates multiple transformations and aggregates their final outputs.
*   **Role in Research Pipeline:** This callable implements the **robustness analyses (Task 23)**, which are a critical part of any serious research project to ensure the main findings are not dependent on arbitrary modeling choices.

<br><br>

### **Usage Example**

The following is a complete, example of how to execute the end-to-end NEOS pipeline. It demonstrates how to prepare all necessary inputs—from the configuration file to high-fidelity synthetic data—and how to launch the top-level orchestrator, `run_complete_neos_study`. This example is structured as a self-contained script, assuming all previously defined functions are available in the execution environment (e.g., a single Jupyter notebook or a monolithic script).

### **Implementation-Grade Example: Executing the NEOS Pipeline**

This example is divided into three main sections:
1.  **Setup and Configuration:** Loading the master configuration from a YAML file and setting up the environment.
2.  **High-Fidelity Data Synthesis:** Programmatically generating realistic, schema-compliant synthetic data for all required DataFrame inputs. In a real-world scenario, this step would be replaced by loading data from actual databases or files.
3.  **Pipeline Execution:** Calling the top-level orchestrator with the prepared data and configuration to run the entire study.

#### **1. Setup and Configuration**

Before running the pipeline, we must prepare the environment. This involves creating the master `config.yaml` file, setting the necessary API key as an environment variable, and loading the configuration into our Python script.

##### **Step 1.1: Create the `config.yaml` file**

First, save the complete configuration provided previously into a file named `config.yaml` in the same directory as your script or notebook. This file is the single source of truth for all parameters governing the pipeline's execution.

##### **Step 1.2: Set the Environment Variable for the API Key**

For security, the Anthropic API key must not be hardcoded. It should be set as an environment variable. In a terminal or at the beginning of your notebook, execute:

```bash
# Replace 'your_api_key_here' with your actual Anthropic API key
export ANTHROPIC_API_KEY='your_api_key_here'
```

##### **Step 1.3: Load Configuration and Prepare Environment in Python**

Now, in the Python script, we will load the YAML file and define the output directory.

```python
import yaml
import os
import pandas as pd
import numpy as np
from faker import Faker
from typing import List, Dict, Any

# --- Load Master Configuration ---
# This block reads the YAML file and loads it into a Python dictionary.
# This dictionary will be passed to the main orchestrator.
CONFIG_PATH = 'config.yaml'
with open(CONFIG_PATH, 'r') as f:
    fused_master_input_specification = yaml.safe_load(f)

# --- Define Output Directory ---
# All artifacts from the pipeline runs will be saved here.
ROOT_OUTPUT_DIRECTORY = './neos_study_output'

# --- Define Path for Dummy Lexicon ---
# In a real run, this would point to the translated Barbaglia et al. lexicon.
# Here, we'll create a dummy version.
LEXICON_PATH = './dummy_lexicon_de.csv'

# Initialize Faker for data synthesis.
fake = Faker()
Faker.seed(42) # for reproducibility
```

#### **2. High-Fidelity Data Synthesis**

The following functions generate realistic, schema-compliant synthetic data for each of the five DataFrames required by the pipeline. This is a crucial step for testing and demonstration.

```python
def create_synthetic_news_data(num_articles: int, config: Dict[str, Any]) -> pd.DataFrame:
    """Generates a high-fidelity synthetic news corpus DataFrame."""
    data = []
    outlets = {
        "NZZ": "Neue Zürcher Zeitung",
        "TA": "Tages-Anzeiger",
        "LT": "Le Temps"
    }
    sections = config['master_config']['relevance_model_params']['positive_class_sections'] + ['Sport', 'Kultur', 'International']
    
    for _ in range(num_articles):
        pub_time = fake.date_time_between(start_date='-25y', end_date='now', tzinfo=pytz.utc)
        data.append({
            "article_id": f"ART_{fake.uuid4()}",
            "outlet_id": np.random.choice(list(outlets.keys())),
            "publication_type": np.random.choice(config['master_config']['data_curation_params']['publication_types_vocab'], p=[0.4, 0.5, 0.02, 0.02, 0.02, 0.02, 0.0]),
            "publication_datetime_utc": pub_time,
            "publication_timezone": "Europe/Zurich",
            "language": np.random.choice(['de', 'fr']),
            "section_label": np.random.choice(sections + [None]*len(sections)), # 50% chance of being null
            "headline": fake.sentence(nb_words=8),
            "full_text": fake.paragraph(nb_sentences=15),
            "ingestion_timestamp_utc": pub_time + pd.Timedelta(minutes=np.random.randint(1, 60)),
            "last_modified_timestamp_utc": pub_time + pd.Timedelta(minutes=np.random.randint(60, 120)),
        })
    
    df = pd.DataFrame(data)
    # Ensure correct dtypes as expected by the validation functions.
    df['source_outlet'] = df['outlet_id'].map(outlets)
    df['publication_type'] = df['publication_type'].astype('category')
    df = df.astype({
        'article_id': 'string', 'outlet_id': 'string', 'source_outlet': 'string',
        'language': 'string', 'section_label': 'string', 'headline': 'string', 'full_text': 'string'
    })
    return df

def create_synthetic_macro_data(start_date: str, end_date: str) -> pd.DataFrame:
    """Generates a high-fidelity synthetic quarterly macro DataFrame."""
    quarters = pd.date_range(start=start_date, end=end_date, freq='QS-JAN')
    data = {
        "yoy_gdp_growth_sports_adj": np.random.normal(1.5, 2.0, len(quarters)),
        "manufacturing_pmi_m2": np.random.normal(52, 5, len(quarters)),
        "service_pmi_m2": np.random.normal(55, 6, len(quarters)),
        "kof_biz_situation_m2": np.random.normal(0.5, 1.5, len(quarters)),
        "seco_consumer_sentiment_q": np.random.normal(-5, 10, len(quarters)),
    }
    df = pd.DataFrame(data, index=quarters)
    
    # Simulate historical unavailability as per the paper's footnotes.
    df.loc[df.index < '2014-01-01', 'service_pmi_m2'] = np.nan
    df.loc[df.index < '2009-04-01', 'kof_biz_situation_m2'] = np.nan
    return df

def create_synthetic_monthly_indicators(start_date: str, end_date: str) -> pd.DataFrame:
    """Generates a high-fidelity synthetic monthly indicators DataFrame."""
    months = pd.date_range(start=start_date, end=end_date, freq='MS')
    data = {
        "manufacturing_pmi": np.random.normal(52, 5, len(months)),
        "service_pmi": np.random.normal(55, 6, len(months)),
        "kof_biz_situation": np.random.normal(0.5, 1.5, len(months)),
    }
    df = pd.DataFrame(data, index=months)
    
    # Simulate historical unavailability.
    df.loc[df.index < '2014-01-01', 'service_pmi'] = np.nan
    df.loc[df.index < '2009-04-01', 'kof_biz_situation'] = np.nan
    return df

def create_synthetic_metadata_tables() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Generates the small, static metadata tables."""
    # evaluation_windows_df
    eval_data = [
        {'series_name': 'service_pmi', 'start_quarter': pd.Timestamp('2014-01-01', freq='QS-JAN'), 'end_quarter': pd.NaT},
        {'series_name': 'kof_biz_situation', 'start_quarter': pd.Timestamp('2009-04-01', freq='QS-JAN'), 'end_quarter': pd.NaT}
    ]
    evaluation_windows_df = pd.DataFrame(eval_data)

    # release_calendar_df (a small sample)
    cal_data = [
        {'series_name': 'manufacturing_pmi', 'period': pd.Timestamp('2024-01-01'), 'release_datetime_utc': pd.Timestamp('2024-01-31 08:00:00', tz='UTC'), 'notes': ''},
        {'series_name': 'seco_consumer_sentiment', 'period': pd.Timestamp('2024-01-01'), 'release_datetime_utc': pd.Timestamp('2024-02-15 09:00:00', tz='UTC'), 'notes': ''}
    ]
    release_calendar_df = pd.DataFrame(cal_data)
    
    return release_calendar_df, evaluation_windows_df

def create_dummy_lexicon(path: str):
    """Creates a small dummy lexicon file."""
    lex_data = [
        {'word_de': 'gut', 'sentiment': 'positive'},
        {'word_de': 'wachstum', 'sentiment': 'positive'},
        {'word_de': 'schlecht', 'sentiment': 'negative'},
        {'word_de': 'krise', 'sentiment': 'negative'},
    ]
    pd.DataFrame(lex_data).to_csv(path, index=False)

# --- Generate all synthetic data artifacts ---
print("--- Generating high-fidelity synthetic data for demonstration ---")
# A smaller number of articles for a runnable example. A full run would use millions.
raw_news_data_df = create_synthetic_news_data(1000, fused_master_input_specification)
raw_macro_data_df = create_synthetic_macro_data('1999-01-01', '2025-05-31')
monthly_indicator_data_df = create_synthetic_monthly_indicators('1999-01-01', '2025-05-31')
release_calendar_df, evaluation_windows_df = create_synthetic_metadata_tables()
create_dummy_lexicon(LEXICON_PATH)
print("Synthetic data generation complete.")
```

#### **3. Pipeline Execution**

With all inputs prepared (the configuration dictionary and the five DataFrames), we can now call the top-level orchestrator. This single function call will execute the entire study, including the baseline run and all robustness checks, leveraging the checkpointing system for efficiency.

```python
# --- Execute the Full Study ---
# This is the main entry point that runs the entire analysis.
# It will create a 'baseline_run' directory for the main results and a
# 'robustness_checks' directory for all the sensitivity analyses.
# The function will print detailed logs to the console and save them to a file.

if __name__ == '__main__':
    # This check is common in scripts to ensure the main execution block
    # only runs when the script is executed directly.
    
    final_results = run_complete_neos_study(
        # Pass all the prepared data and configuration.
        raw_news_data_df=raw_news_data_df,
        raw_macro_data_df=raw_macro_data_df,
        monthly_indicator_data_df=monthly_indicator_data_df,
        release_calendar_df=release_calendar_df,
        evaluation_windows_df=evaluation_windows_df,
        fused_master_input_specification=fused_master_input_specification,
        root_output_directory=ROOT_OUTPUT_DIRECTORY,
        lexicon_path=LEXICON_PATH
    )

    # --- Review Outputs ---
    # After the (potentially very long) run, you can inspect the results.
    print("\n" + "="*80)
    print(">>> TOP-LEVEL ORCHESTRATION COMPLETE <<<")
    print("="*80)
    
    # Print the status of the baseline run.
    baseline_status = final_results.get('baseline_run_results', {}).get('status', 'Unknown')
    print(f"\nBaseline Run Status: {baseline_status}")
    
    # Display the head of the robustness analysis summary.
    robustness_summary = final_results.get('robustness_summary_df')
    if robustness_summary is not None and not robustness_summary.empty:
        print("\n--- Head of Robustness Analysis Summary ---")
        print(robustness_summary.head(10))
    else:
        print("\nRobustness analysis did not produce a summary table.")
        
    print(f"\nAll artifacts, logs, and results are saved in: '{ROOT_OUTPUT_DIRECTORY}'")
```
<br>

In [None]:
# Task 1 — Validate the raw news corpus schema and data quality

# ==============================================================================
# Task 1: Validate the raw news corpus schema and data quality
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 1, Step 1:  Assert schema compliance and dtype correctness
# ------------------------------------------------------------------------------

def _validate_schema_and_dtypes(
    raw_news_data_df: pd.DataFrame
) -> List[str]:
    """
    Validates the schema, dtypes, and primary key integrity of the raw news DataFrame.

    This function performs critical low-level checks to ensure the input DataFrame
    conforms to the required structure for all downstream processing. It verifies:
    1. Presence of all required columns.
    2. Correctness of data types for each column, with special attention to
       timezone-aware UTC datetimes.
    3. Uniqueness of the 'article_id' primary key.

    Args:
        raw_news_data_df: The raw news corpus DataFrame to be validated.

    Returns:
        A list of string messages describing non-critical validation warnings.
        An empty list indicates this step passed without warnings.

    Raises:
        ValueError: If a critical validation check fails (e.g., missing
                    columns, incorrect dtypes, non-UTC timestamps, or
                    duplicate article_ids), which would prevent further
                    processing.
    """
    # Initialize a list to collect non-critical warnings.
    warnings = []

    # Define the target schema with required columns and their expected dtypes.
    # Using pandas' string dtype for text columns is best practice.
    expected_schema = {
        'article_id': 'string',
        'outlet_id': 'string', # Allow flexible storage (string or int) but validate as string
        'source_outlet': 'string',
        'publication_type': 'category',
        'publication_datetime_utc': 'datetime64[ns, UTC]',
        'language': 'string',
        'section_label': 'string',
        'full_text': 'string',
        'ingestion_timestamp_utc': 'datetime64[ns, UTC]',
        'last_modified_timestamp_utc': 'datetime64[ns, UTC]',
    }

    # --- Schema Completeness Check ---
    # Check for any missing columns from the expected schema.
    missing_columns = set(expected_schema.keys()) - set(raw_news_data_df.columns)
    # If any required columns are missing, this is a critical failure.
    if missing_columns:
        # Raise a ValueError with a descriptive message.
        raise ValueError(
            "Critical schema validation failed: Missing required columns: "
            f"{sorted(list(missing_columns))}"
        )

    # --- Dtype and Timezone Correctness Check ---
    # Iterate through the expected schema to validate each column's dtype.
    for col, expected_dtype in expected_schema.items():
        # Get the actual dtype of the column from the DataFrame.
        actual_dtype = raw_news_data_df[col].dtype

        # Handle the special case for datetime columns.
        if 'datetime' in expected_dtype:
            # Check if the column is a datetime type.
            if not pd.api.types.is_datetime64_any_dtype(actual_dtype):
                # If not, this is a critical failure.
                raise ValueError(
                    f"Critical dtype validation failed for column '{col}': "
                    f"Expected a datetime dtype, but found '{actual_dtype}'."
                )
            # Check if the datetime column is timezone-aware.
            if raw_news_data_df[col].dt.tz is None:
                # Naive datetimes are a critical failure as they are ambiguous.
                raise ValueError(
                    f"Critical timezone validation failed for column '{col}': "
                    "Column is timezone-naive. All datetimes must be "
                    "timezone-aware and in UTC."
                )
            # Check if the timezone is specifically UTC.
            if str(raw_news_data_df[col].dt.tz) != 'UTC':
                # Non-UTC timezones are a critical failure.
                raise ValueError(
                    f"Critical timezone validation failed for column '{col}': "
                    f"Expected timezone 'UTC', but found "
                    f"'{raw_news_data_df[col].dt.tz}'."
                )
        # Handle the special case for outlet_id, which can be int or string.
        elif col == 'outlet_id':
            # Check if the dtype is numeric or string-like.
            if not (pd.api.types.is_numeric_dtype(actual_dtype) or \
                    pd.api.types.is_string_dtype(actual_dtype)):
                raise ValueError(
                    f"Critical dtype validation failed for column '{col}': "
                    f"Expected numeric or string dtype, but found '{actual_dtype}'."
                )
        # For all other columns, perform a direct dtype comparison.
        elif str(actual_dtype) != expected_dtype:
            # A dtype mismatch is a critical failure.
            raise ValueError(
                f"Critical dtype validation failed for column '{col}': "
                f"Expected dtype '{expected_dtype}', but found '{actual_dtype}'."
            )

    # --- Primary Key Uniqueness Check ---
    # Check for duplicate values in the 'article_id' column.
    if raw_news_data_df['article_id'].duplicated().any():
        # Duplicate primary keys are a critical data integrity failure.
        raise ValueError(
            "Critical primary key validation failed: "
            "Column 'article_id' contains duplicate values."
        )

    # If all checks pass, return the list of any non-critical warnings.
    return warnings

# ------------------------------------------------------------------------------
# Task 1, Step 2: Validate controlled vocabularies and cross-field consistency
# ------------------------------------------------------------------------------

def _validate_vocabularies_and_consistency(
    raw_news_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates controlled vocabularies and cross-field consistency rules.

    Purpose:
    This function performs critical data quality checks on categorical and
    identifier columns. It ensures that values in key fields adhere to a
    predefined set of allowed options (controlled vocabulary) and that logical
    relationships between fields (like a one-to-one mapping) are maintained.
    This prevents unexpected or invalid data from propagating into the modeling
    pipeline. All vocabulary checks are performed in a case-insensitive manner
    for robustness.

    Inputs:
        raw_news_data_df (pd.DataFrame): The raw news corpus DataFrame.
        config (Dict[str, Any]): The 'data_curation_params' sub-dictionary from
                                 the master configuration, containing the
                                 validation rules and vocabularies.

    Processes:
    1.  **Publication Type Validation:**
        a. Retrieves the allowed `publication_types_vocab` from the config.
        b. Finds all unique, non-null values in the 'publication_type' column.
        c. Performs a case-insensitive comparison to identify any values not in
           the allowed vocabulary.
    2.  **Language Validation:**
        a. Retrieves the allowed `included_languages` from the config.
        b. Performs a case-insensitive check to ensure all languages in the
           'language' column are in the allowed set.
    3.  **Cross-Field Consistency:**
        a. Verifies that a one-to-one mapping exists between 'outlet_id' and
           'source_outlet' by grouping by 'outlet_id' and counting the number
           of unique 'source_outlet' names.

    Outputs:
        (List[str]): A list of string messages describing all validation warnings
                     found. An empty list indicates that all checks in this
                     function passed successfully.
    """
    # --- Input Validation ---
    # Ensure the input is a pandas DataFrame.
    if not isinstance(raw_news_data_df, pd.DataFrame):
        raise TypeError("Input `raw_news_data_df` must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input `config` must be a dictionary.")

    # Initialize a list to collect all validation warnings.
    warnings: List[str] = []

    # --- 1. Controlled Vocabulary Validation for 'publication_type' (Case-Insensitive) ---
    # Retrieve the allowed vocabulary from the config and convert to a lowercase set for efficient lookup.
    publication_vocab_lower = {
        v.lower() for v in config.get('publication_types_vocab', [])
    }
    # Get the unique, non-null, lowercase values present in the DataFrame column.
    actual_publication_types_lower = set(
        raw_news_data_df['publication_type'].dropna().str.lower().unique()
    )
    # Identify any values that are not in the allowed vocabulary using set difference.
    invalid_publication_types = actual_publication_types_lower - publication_vocab_lower
    # If any invalid values are found, create a detailed warning message.
    if invalid_publication_types:
        warnings.append(
            "Vocabulary Warning for 'publication_type': Found values not in "
            f"the controlled vocabulary: {sorted(list(invalid_publication_types))}"
        )

    # --- 2. Controlled Vocabulary Validation for 'language' (Case-Insensitive) ---
    # Retrieve the allowed languages and convert to a lowercase set.
    language_vocab_lower = {
        lang.lower() for lang in config.get('included_languages', [])
    }
    # Get the unique, non-null, lowercase languages present in the DataFrame.
    actual_languages_lower = set(
        raw_news_data_df['language'].dropna().str.lower().unique()
    )
    # Identify any languages that are not in the allowed set.
    invalid_languages = actual_languages_lower - language_vocab_lower
    # If invalid languages are found, create a detailed warning message.
    if invalid_languages:
        warnings.append(
            "Vocabulary Warning for 'language': Found values not in the "
            f"allowed set {sorted(list(language_vocab_lower))}: "
            f"{sorted(list(invalid_languages))}"
        )

    # --- 3. Cross-Field Consistency: One-to-One Mapping Validation ---
    # This check ensures data integrity for outlet identifiers.
    # Group by the canonical 'outlet_id' and count the number of unique human-readable 'source_outlet' names.
    outlet_mapping_counts = raw_news_data_df.groupby('outlet_id')['source_outlet'].nunique()
    # Filter for any 'outlet_id's that map to more than one 'source_outlet', indicating an inconsistency.
    mapping_violations = outlet_mapping_counts[outlet_mapping_counts > 1]
    # If any violations are found, generate a specific warning for each.
    if not mapping_violations.empty:
        # Iterate through each inconsistent 'outlet_id' to create a detailed report.
        for outlet_id in mapping_violations.index:
            # Retrieve all the conflicting names associated with this ID.
            conflicting_names = raw_news_data_df[
                raw_news_data_df['outlet_id'] == outlet_id
            ]['source_outlet'].unique()
            # Create a detailed, actionable warning message.
            warnings.append(
                "Consistency Warning: 'outlet_id' to 'source_outlet' mapping is "
                f"not one-to-one. ID '{outlet_id}' maps to multiple names: "
                f"{sorted(list(conflicting_names))}. Manual curation is required."
            )

    # Return the aggregated list of all warnings found during the validation.
    return warnings

# ------------------------------------------------------------------------------
# Task 1, Step 3: Quantify missingness and temporal integrity
# ------------------------------------------------------------------------------

def _validate_missingness_and_temporal_integrity(
    raw_news_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Quantifies missing data and validates temporal integrity rules.

    This function performs three checks:
    1. Quantifies the percentage of non-null 'section_label' values, grouped
       by outlet and month.
    2. Asserts that 'publication_datetime_utc' is not later than
       'ingestion_timestamp_utc'.
    3. Verifies that all publication datetimes fall within the study's
       configured date range.

    Args:
        raw_news_data_df: The raw news corpus DataFrame.
        config: The master configuration dictionary.

    Returns:
        A tuple containing:
        - A DataFrame with the monthly percentage of non-null 'section_label'
          values per outlet.
        - A list of string messages describing temporal integrity warnings.
    """
    # Initialize a list to collect temporal integrity warnings.
    warnings = []

    # Extract the relevant curation parameters from the config.
    curation_params = config['data_curation_params']

    # --- Missingness Quantification for 'section_label' ---
    # Group by outlet and month, then calculate the mean of non-null values.
    # This gives the percentage of articles with a section label.
    missingness_report = raw_news_data_df.groupby(
        ['outlet_id', pd.Grouper(key='publication_datetime_utc', freq='MS')]
    )['section_label'].apply(lambda s: s.notna().mean()).reset_index()
    # Rename the column for clarity.
    missingness_report.rename(
        columns={'section_label': 'section_label_completeness_ratio'},
        inplace=True
    )

    # --- Temporal Integrity Check 1: Publication vs. Ingestion Time ---
    # Find all records where the publication time is after the ingestion time.
    future_dated_mask = (
        raw_news_data_df['publication_datetime_utc'] >
        raw_news_data_df['ingestion_timestamp_utc']
    )
    # If any such records exist, report them.
    if future_dated_mask.any():
        num_violations = future_dated_mask.sum()
        # Create a warning message summarizing the issue.
        warnings.append(
            f"Temporal Integrity Warning: Found {num_violations} articles where "
            "'publication_datetime_utc' is after 'ingestion_timestamp_utc'."
        )

    # --- Temporal Integrity Check 2: Adherence to Study Date Range ---
    # Convert the start and end date strings from the config to timestamps.
    start_date = pd.to_datetime(curation_params['start_date'], utc=True)
    end_date = pd.to_datetime(curation_params['end_date'], utc=True)
    # Find records with publication dates outside the allowed range.
    out_of_range_mask = ~raw_news_data_df['publication_datetime_utc'].between(
        start_date, end_date, inclusive='both'
    )
    # If any out-of-range records exist, report them.
    if out_of_range_mask.any():
        num_violations = out_of_range_mask.sum()
        # Create a warning message summarizing the issue.
        warnings.append(
            f"Temporal Integrity Warning: Found {num_violations} articles with "
            "'publication_datetime_utc' outside the configured study range "
            f"of [{start_date.date()}, {end_date.date()}]."
        )

    # Return the missingness report and the list of warnings.
    return missingness_report, warnings

# ------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# ------------------------------------------------------------------------------

def validate_raw_news_corpus(
    raw_news_data_df: pd.DataFrame,
    fused_master_input_specification: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the validation of the raw news corpus DataFrame.

    This function serves as a high-level entry point to execute all validation
    steps for the raw news data. It checks schema, dtypes, vocabularies,
    consistency, missingness, and temporal integrity. It aggregates all
    findings into a structured report.

    Args:
        raw_news_data_df: The raw news corpus as a pandas DataFrame.
        fused_master_input_specification: The master configuration dictionary
            containing all study parameters and validation rules.

    Returns:
        A dictionary containing the validation results:
        - 'status': 'Success' or 'Failure'.
        - 'warnings': A list of all non-critical warning messages.
        - 'errors': A list of critical error messages that halted execution.
        - 'missingness_report': A DataFrame detailing 'section_label'
          completeness.
    """
    # Initialize the results dictionary.
    validation_results = {
        "status": "Success",
        "warnings": [],
        "errors": [],
        "missingness_report": pd.DataFrame()
    }

    try:
        # --- Step 1: Validate Schema and Dtypes ---
        # Execute the schema and dtype validation.
        schema_warnings = _validate_schema_and_dtypes(raw_news_data_df)
        # Append any non-critical warnings to the main list.
        validation_results["warnings"].extend(schema_warnings)

        # --- Step 2: Validate Vocabularies and Consistency ---
        # Execute the vocabulary and consistency checks.
        vocab_warnings = _validate_vocabularies_and_consistency(
            raw_news_data_df, fused_master_input_specification['master_config']
        )
        # Append any warnings found.
        validation_results["warnings"].extend(vocab_warnings)

        # --- Step 3: Validate Missingness and Temporal Integrity ---
        # Execute the missingness and temporal integrity checks.
        missingness_df, temporal_warnings = _validate_missingness_and_temporal_integrity(
            raw_news_data_df, fused_master_input_specification['master_config']
        )
        # Store the missingness report.
        validation_results["missingness_report"] = missingness_df
        # Append any warnings found.
        validation_results["warnings"].extend(temporal_warnings)

    # Catch any critical ValueError raised during validation.
    except ValueError as e:
        # If a critical error occurs, update the status and log the error.
        validation_results["status"] = "Failure"
        validation_results["errors"].append(str(e))

    # Return the comprehensive validation report.
    return validation_results


In [None]:
# Task 2 — Validate macro and indicator dataframes

# ==============================================================================
# Task 2: Validate macro and indicator dataframes
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 2, Step 1: Validate raw_macro_data_df structure and quarterly alignment
# ------------------------------------------------------------------------------

def _validate_quarterly_macro_data(
    raw_macro_data_df: pd.DataFrame,
    monthly_indicator_data_df: pd.DataFrame
) -> List[str]:
    """
    Validates the structure, dtypes, and alignment of the quarterly macro DataFrame.

    This function performs rigorous checks on `raw_macro_data_df`:
    1. Asserts the index is a DatetimeIndex with quarter-start frequency ('QS-JAN').
    2. Verifies the presence and float64 dtype of all required columns.
    3. Cross-validates a sample of month-2 indicators (e.g., 'manufacturing_pmi_m2')
       against the authoritative monthly data in `monthly_indicator_data_df` to
       ensure the temporal alignment logic is correct.

    Args:
        raw_macro_data_df: The quarterly DataFrame of macro and indicator data.
        monthly_indicator_data_df: The monthly DataFrame serving as ground truth.

    Returns:
        A list of string messages describing validation warnings or errors. An
        empty list signifies a successful validation.
    """
    # Initialize a list to collect all validation issues.
    issues = []

    # --- Index and Schema Validation ---
    # Verify that the index is a DatetimeIndex.
    if not isinstance(raw_macro_data_df.index, pd.DatetimeIndex):
        issues.append(
            "Critical Error: `raw_macro_data_df` index is not a DatetimeIndex."
        )
        return issues # Halt further checks as they depend on a valid index.

    # Verify the index frequency is Quarter Start ('QS-JAN').
    if raw_macro_data_df.index.freqstr != 'QS-JAN':
        issues.append(
            "Critical Error: `raw_macro_data_df` index frequency is not "
            f"'QS-JAN'. Found '{raw_macro_data_df.index.freqstr}'."
        )

    # Define the expected schema for the quarterly data.
    expected_schema = {
        'yoy_gdp_growth_sports_adj': 'float64',
        'manufacturing_pmi_m2': 'float64',
        'service_pmi_m2': 'float64',
        'kof_biz_situation_m2': 'float64',
        'seco_consumer_sentiment_q': 'float64',
    }

    # Check for missing columns.
    missing_cols = set(expected_schema.keys()) - set(raw_macro_data_df.columns)
    if missing_cols:
        issues.append(f"Critical Error: Missing columns in `raw_macro_data_df`: {missing_cols}.")

    # Check dtypes for existing columns.
    for col, dtype in expected_schema.items():
        if col in raw_macro_data_df.columns and raw_macro_data_df[col].dtype != dtype:
            issues.append(
                f"Critical Error: Column '{col}' has incorrect dtype. "
                f"Expected '{dtype}', found '{raw_macro_data_df[col].dtype}'."
            )

    # If critical schema errors were found, return now.
    if issues:
        return issues

    # --- Cross-Validation of m=2 Alignment ---
    # Define the mapping from quarterly m=2 columns to their monthly source columns.
    m2_mapping = {
        'manufacturing_pmi_m2': 'manufacturing_pmi',
        'service_pmi_m2': 'service_pmi',
        'kof_biz_situation_m2': 'kof_biz_situation',
    }

    # Select a deterministic, reasonably sized sample of quarters to check.
    # Use min to handle cases with fewer than 10 quarters.
    sample_size = min(10, len(raw_macro_data_df))
    if sample_size > 0:
        sample_quarters = raw_macro_data_df.sample(n=sample_size, random_state=42).index

        # Iterate through the sampled quarters to perform the check.
        for quarter_start in sample_quarters:
            # The second month of a quarter starting at `t` is at `t + 1 month`.
            second_month_ts = quarter_start + pd.DateOffset(months=1)

            # Check each m=2 indicator.
            for q_col, m_col in m2_mapping.items():
                # Get the value from the quarterly DataFrame.
                quarterly_value = raw_macro_data_df.loc[quarter_start, q_col]

                # Get the expected value from the monthly DataFrame.
                try:
                    monthly_value = monthly_indicator_data_df.loc[second_month_ts, m_col]
                except KeyError:
                    # If the timestamp doesn't exist in the monthly data, it's an issue.
                    issues.append(
                        f"Alignment Error: For quarter '{quarter_start.date()}', the "
                        f"corresponding month '{second_month_ts.date()}' was not found "
                        f"in `monthly_indicator_data_df`."
                    )
                    continue

                # Compare the values, handling NaNs correctly.
                # Both must be NaN or both must be non-NaN and close.
                if pd.isna(quarterly_value) and pd.isna(monthly_value):
                    continue # This is a correct alignment.

                if pd.isna(quarterly_value) or pd.isna(monthly_value) or \
                   not np.isclose(quarterly_value, monthly_value, rtol=1e-5, atol=1e-8):
                    issues.append(
                        f"Alignment Error: Mismatch for '{q_col}' in quarter "
                        f"'{quarter_start.date()}'. Quarterly value is "
                        f"{quarterly_value}, but expected value from month "
                        f"'{second_month_ts.date()}' is {monthly_value}."
                    )

    return issues

# ------------------------------------------------------------------------------
# Task 2, Step 2: Validate monthly_indicator_data_df and native-frequency alignment
# ------------------------------------------------------------------------------

def _validate_monthly_indicator_data(
    monthly_indicator_data_df: pd.DataFrame,
    evaluation_windows_df: pd.DataFrame
) -> List[str]:
    """
    Validates the structure and historical availability of the monthly indicator DataFrame.

    This function checks `monthly_indicator_data_df` for:
    1. A DatetimeIndex with month-start frequency ('MS').
    2. Presence and float64 dtype of required indicator columns.
    3. Historical NaN patterns, ensuring no data exists for series before their
       official start dates as defined in `evaluation_windows_df`. This prevents
       silent backfilling or interpolation.

    Args:
        monthly_indicator_data_df: The monthly DataFrame of raw indicators.
        evaluation_windows_df: DataFrame defining the valid evaluation windows.

    Returns:
        A list of string messages describing validation warnings or errors.
    """
    issues = []

    # --- Index and Schema Validation ---
    if not isinstance(monthly_indicator_data_df.index, pd.DatetimeIndex):
        issues.append(
            "Critical Error: `monthly_indicator_data_df` index is not a DatetimeIndex."
        )
        return issues

    if monthly_indicator_data_df.index.freqstr != 'MS':
        issues.append(
            "Critical Error: `monthly_indicator_data_df` index frequency is not "
            f"'MS'. Found '{monthly_indicator_data_df.index.freqstr}'."
        )

    expected_cols = ['manufacturing_pmi', 'service_pmi', 'kof_biz_situation']
    for col in expected_cols:
        if col not in monthly_indicator_data_df.columns:
            issues.append(f"Critical Error: Missing column '{col}' in `monthly_indicator_data_df`.")
        elif monthly_indicator_data_df[col].dtype != 'float64':
            issues.append(
                f"Critical Error: Column '{col}' has incorrect dtype. "
                f"Expected 'float64', found '{monthly_indicator_data_df[col].dtype}'."
            )
    if issues:
        return issues

    # --- Historical NaN Pattern Validation ---
    # Map the monthly column names to their identifiers in the evaluation windows table.
    series_map = {
        'service_pmi': 'service_pmi',
        'kof_biz_situation': 'kof_biz_situation'
    }

    for m_col, series_name in series_map.items():
        # Find the corresponding start quarter from the evaluation windows table.
        window_info = evaluation_windows_df[evaluation_windows_df['series_name'] == series_name]
        if window_info.empty:
            issues.append(f"Configuration Error: No evaluation window found for '{series_name}'.")
            continue

        start_quarter = window_info['start_quarter'].iloc[0]

        # Check for any non-NaN data *before* the official start date.
        pre_start_data = monthly_indicator_data_df.loc[:start_quarter - pd.DateOffset(days=1), m_col]
        if pre_start_data.notna().any():
            first_invalid_date = pre_start_data.first_valid_index()
            issues.append(
                f"Historical Data Error: Found non-NaN data for '{m_col}' before its "
                f"official start of {start_quarter.date()}. First invalid data point "
                f"at {first_invalid_date.date()}."
            )

    return issues

# ------------------------------------------------------------------------------
# Task 2, Step 3: Validate release_calendar_df and evaluation_windows_df
# ------------------------------------------------------------------------------

def _validate_metadata_tables(
    release_calendar_df: pd.DataFrame,
    evaluation_windows_df: pd.DataFrame
) -> List[str]:
    """
    Validates the schema and content of critical metadata tables.

    Purpose:
    This function performs a deep validation of the two primary metadata tables
    that govern temporal aspects of the study: the release calendar and the
    evaluation windows. It ensures their structure, content, and temporal
    conventions are perfectly aligned with the study's methodology, preventing
    subtle errors in forecasting and evaluation.

    Inputs:
        release_calendar_df (pd.DataFrame): DataFrame with indicator release dates.
        evaluation_windows_df (pd.DataFrame): DataFrame with indicator evaluation windows.

    Processes:
    1.  **`release_calendar_df` Validation:**
        a. Checks that 'series_name' values belong to a known vocabulary.
        b. Asserts that 'release_datetime_utc' is a timezone-aware UTC datetime.
        c. Asserts that the 'period' column contains valid datetimes
           and that each period's frequency (Month Start vs. Quarter Start)
           matches the known frequency of its corresponding series.
    2.  **`evaluation_windows_df` Validation:**
        a. Checks for the presence of required columns.
        b. Asserts that 'start_quarter' is a valid Quarter Start timestamp.
        c. Verifies that the specific start dates for 'service_pmi' and
           'kof_biz_situation' exactly match the paper's footnotes.

    Outputs:
        (List[str]): A list of string messages describing all validation warnings
                     or errors found. An empty list signifies success.
    """
    # Initialize a list to collect all validation issues.
    issues: List[str] = []

    # --- `release_calendar_df` Validation ---
    # Define the universe of expected series names for the release calendar.
    expected_series_vocab = {
        'manufacturing_pmi', 'service_pmi', 'kof_biz_situation',
        'seco_consumer_sentiment', 'yoy_gdp_growth_sports_adj'
    }
    # Check for any unexpected series names.
    if not set(release_calendar_df['series_name'].unique()).issubset(expected_series_vocab):
        invalid_series = set(release_calendar_df['series_name'].unique()) - expected_series_vocab
        issues.append(f"Config Error: Invalid series_name in `release_calendar_df`: {invalid_series}")

    # Check that the release timestamp is a timezone-aware UTC datetime.
    if not pd.api.types.is_datetime64_any_dtype(release_calendar_df['release_datetime_utc'].dtype) or \
       release_calendar_df['release_datetime_utc'].dt.tz is None or \
       str(release_calendar_df['release_datetime_utc'].dt.tz) != 'UTC':
        issues.append("Critical Error: `release_calendar_df.release_datetime_utc` must be a timezone-aware UTC datetime.")

    # --- `period` Column Frequency Validation ---
    # Define the expected frequency for each time series.
    series_freq_map = {
        'manufacturing_pmi': 'MS',
        'service_pmi': 'MS',
        'kof_biz_situation': 'MS',
        'seco_consumer_sentiment': 'QS-JAN',
        'yoy_gdp_growth_sports_adj': 'QS-JAN'
    }
    # Ensure the period column is a datetime object for validation.
    periods = pd.to_datetime(release_calendar_df['period'], errors='coerce')
    if periods.isna().any():
        issues.append("Critical Error: `release_calendar_df.period` contains values that could not be parsed as dates.")
    else:
        # Iterate through each row to check for frequency alignment.
        for index, row in release_calendar_df.iterrows():
            series_name = row['series_name']
            period_ts = periods.loc[index]
            expected_freq = series_freq_map.get(series_name)

            # Check for month start alignment for monthly series.
            if expected_freq == 'MS' and not period_ts.is_month_start:
                issues.append(
                    f"Frequency Error in `release_calendar_df` at index {index}: "
                    f"Series '{series_name}' requires a month-start period, but found {period_ts.date()}."
                )
            # Check for quarter start alignment for quarterly series.
            elif expected_freq == 'QS-JAN' and not period_ts.is_quarter_start:
                issues.append(
                    f"Frequency Error in `release_calendar_df` at index {index}: "
                    f"Series '{series_name}' requires a quarter-start period, but found {period_ts.date()}."
                )

    # --- `evaluation_windows_df` Validation ---
    # Check for the presence of essential columns.
    if 'series_name' not in evaluation_windows_df.columns or 'start_quarter' not in evaluation_windows_df.columns:
        issues.append("Critical Error: `evaluation_windows_df` is missing required columns ('series_name', 'start_quarter').")
        # Halt further checks on this df if columns are missing.
        return issues

    # Check that the start_quarter column has the correct frequency.
    if evaluation_windows_df['start_quarter'].dt.freqstr != 'QS-JAN':
        issues.append("Critical Error: `evaluation_windows_df.start_quarter` must have quarter-start ('QS-JAN') frequency.")

    # Check the specific, hardcoded start dates from the paper's Table 1 footnotes. This is a critical cross-check.
    expected_starts = {
        'service_pmi': pd.Timestamp('2014-01-01', freq='QS-JAN'),
        'kof_biz_situation': pd.Timestamp('2009-04-01', freq='QS-JAN')
    }
    for series, expected_start in expected_starts.items():
        # Select the row for the specific series.
        actual_start_series = evaluation_windows_df.loc[
            evaluation_windows_df['series_name'] == series, 'start_quarter'
        ]
        # Check if the series is missing from the configuration table.
        if actual_start_series.empty:
            issues.append(f"Config Error: Missing entry for '{series}' in `evaluation_windows_df`.")
        # Check if the start date matches the paper's methodology.
        elif actual_start_series.iloc[0] != expected_start:
            issues.append(
                f"Config Error: Incorrect start_quarter for '{series}'. "
                f"Expected {expected_start.date()}, found {actual_start_series.iloc[0].date()}."
            )

    return issues

# ------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# ------------------------------------------------------------------------------

def validate_macro_data(
    raw_macro_data_df: pd.DataFrame,
    monthly_indicator_data_df: pd.DataFrame,
    release_calendar_df: pd.DataFrame,
    evaluation_windows_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Orchestrates the validation of all macro and indicator DataFrames.

    This function provides a single entry point to validate the suite of
    macroeconomic data tables, ensuring their structural integrity, temporal
    alignment, and consistency with the study's methodological constraints.

    Args:
        raw_macro_data_df: The primary quarterly data for forecasting.
        monthly_indicator_data_df: The source monthly indicator data.
        release_calendar_df: Metadata on indicator release timing.
        evaluation_windows_df: Metadata on valid evaluation periods.

    Returns:
        A dictionary containing the validation results:
        - 'status': 'Success' if no critical errors were found, else 'Failure'.
        - 'issues': A consolidated list of all warning and error messages.
    """
    all_issues = []

    # --- Step 1: Validate quarterly macro data ---
    q_issues = _validate_quarterly_macro_data(raw_macro_data_df, monthly_indicator_data_df)
    all_issues.extend(q_issues)

    # --- Step 2: Validate monthly indicator data ---
    m_issues = _validate_monthly_indicator_data(monthly_indicator_data_df, evaluation_windows_df)
    all_issues.extend(m_issues)

    # --- Step 3: Validate metadata tables ---
    meta_issues = _validate_metadata_tables(release_calendar_df, evaluation_windows_df)
    all_issues.extend(meta_issues)

    # Determine final status based on whether any issues were found.
    status = "Failure" if all_issues else "Success"

    return {"status": status, "issues": all_issues}


In [None]:
# Task 3 — Validate master_config parameters and prompt templates

# ==============================================================================
# Task 3: Validate master_config parameters and prompt templates
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 3, Step 1: Validate data curation and timezone parameters
# ------------------------------------------------------------------------------

def _validate_curation_params(
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the 'data_curation_params' section of the master configuration.

    This function performs a battery of precise checks to ensure that the data
    curation and scoping parameters match the paper's methodology exactly.

    Args:
        config: The 'data_curation_params' sub-dictionary from the master config.

    Returns:
        A list of strings, where each string is a detailed description of a
        validation issue (error or warning). An empty list indicates success.
    """
    issues = []

    # Define ground truth values based on the paper's methodology.
    expected_values = {
        "start_date": "1999-01-01",
        "end_date": "2025-05-31",
        "included_languages": ["de", "fr"],
        "excluded_publication_types": ["tv_guide", "magazine", "radio", "TV"],
    }

    # Check each parameter against its expected value.
    for key, expected in expected_values.items():
        actual = config.get(key)
        # Use set comparison for lists to be order-agnostic.
        if isinstance(expected, list):
            if not isinstance(actual, list) or set(actual) != set(expected):
                issues.append(
                    f"Config Error in 'data_curation_params.{key}': "
                    f"Expected {sorted(expected)}, but found {actual}."
                )
        # Use direct comparison for other types.
        elif actual != expected:
            issues.append(
                f"Config Error in 'data_curation_params.{key}': "
                f"Expected '{expected}', but found '{actual}'."
            )

    # Validate the nested timezone policy.
    tz_policy = config.get('timezone_policy', {})
    if tz_policy.get('store_timezone_aware') is not True:
        issues.append(
            "Config Error in 'data_curation_params.timezone_policy.store_timezone_aware': "
            "Expected True."
        )
    if tz_policy.get('primary_tz') != 'UTC':
        issues.append(
            "Config Error in 'data_curation_params.timezone_policy.primary_tz': "
            "Expected 'UTC'."
        )

    # Issue a warning if the outlet allowlist is empty, as it's meant to be populated.
    if not config.get('outlet_allowlist'):
        issues.append(
            "Config Warning in 'data_curation_params.outlet_allowlist': "
            "List is empty. To replicate the paper's scope of 158 outlets, "
            "this list must be populated."
        )

    return issues

# ------------------------------------------------------------------------------
# Task 3, Step 2: Validate embedding and relevance model parameters
# ------------------------------------------------------------------------------

def _validate_embedding_relevance_params(
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the 'embedding_params' and 'relevance_model_params' sections.

    This function ensures that the parameters defining the feature engineering
    and relevance classification stages are correctly specified.

    Args:
        config: A dictionary containing 'embedding_params' and
                'relevance_model_params'.

    Returns:
        A list of strings describing validation issues.
    """
    issues = []

    # --- Embedding Parameters Validation ---
    emb_params = config.get('embedding_params', {})
    expected_emb = {
        "model_name": "jina-embeddings-v3",
        "embedding_dimension": 1024,
        "max_input_tokens": 8192,
        "inference_mode": "local",
    }
    for key, expected in expected_emb.items():
        if emb_params.get(key) != expected:
            issues.append(
                f"Config Error in 'embedding_params.{key}': "
                f"Expected '{expected}', found '{emb_params.get(key)}'."
            )
    if not emb_params.get('model_hash'):
        issues.append(
            "Config Warning in 'embedding_params.model_hash': Is empty. "
            "Must be filled before execution for reproducibility."
        )

    # --- Relevance Model Parameters Validation ---
    rel_params = config.get('relevance_model_params', {})
    expected_sections = {
        "Business", "Markets", "Economics", "Finance", "Wirtschaft", "Börse",
        "Ökonomie", "Finanz", "Économie", "Marchés", "Affaires", "Finance"
    }
    if set(rel_params.get('positive_class_sections', [])) != expected_sections:
        issues.append("Config Error: 'relevance_model_params.positive_class_sections' does not match the required multilingual set.")

    # Deeply validate the neural network architecture.
    nn_arch = rel_params.get('nn_architecture', {})
    if nn_arch.get('input_dim') != 1024:
        issues.append("Config Error: 'nn_architecture.input_dim' must be 1024.")

    expected_layers = [
        {"type": "Dense", "neurons": 256, "activation": "ReLU"},
        {"type": "Dropout", "rate": 0.3},
        {"type": "Dense", "neurons": 64, "activation": "ReLU"},
        {"type": "Dense", "neurons": 1, "activation": "Sigmoid"}
    ]
    actual_layers = nn_arch.get('layers', [])
    if len(actual_layers) != len(expected_layers) or actual_layers != expected_layers:
         issues.append("Config Error: 'nn_architecture.layers' does not match the specified structure.")

    return issues

# ------------------------------------------------------------------------------
# Task 3, Step 3: Validate sentiment model, aggregation, and econometric parameters
# ------------------------------------------------------------------------------

def _validate_sentiment_econometric_params(
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates sentiment, aggregation, and econometric sections of the config.

    This function validates the core methodological choices for the sentiment
    model and the final econometric evaluation, ensuring alignment with the paper.

    Args:
        config: A dictionary containing the relevant parameter sections.

    Returns:
        A list of strings describing validation issues.
    """
    issues = []

    # --- Sentiment Model Validation ---
    sent_params = config.get('sentiment_model_params', {})
    llm_config = sent_params.get('llm_config', {})
    if llm_config.get('model_identifier') != 'claude-3-5-sonnet-20240620':
        issues.append("Config Error: 'llm_config.model_identifier' is incorrect.")

    # Check for key phrases in the prompt to ensure its integrity.
    prompt_text = llm_config.get('appendix_prompt_financial_markets_en', '')
    required_phrases = [
        "400-500 words long", "NZZ, Tagesanzeiger, Handelszeitung",
        "embedding-friendly wording", "Clear polarization"
    ]
    for phrase in required_phrases:
        if phrase not in prompt_text:
            issues.append(f"Config Error: Key phrase '{phrase}' missing from LLM prompt.")

    # --- Classifier Config Validation ---
    class_config = sent_params.get('classifier_config', {})
    if class_config.get('model_type') != 'LogisticRegression' or \
       class_config.get('penalty') != 'l2' or \
       class_config.get('feature_scaling') != 'none':
        issues.append("Config Error: 'classifier_config' deviates from L2-regularized logistic regression on raw embeddings.")

    # --- Econometric Validation ---
    econ_params = config.get('econometric_validation_params', {})
    if econ_params.get('dependent_variable') != 'yoy_gdp_growth_sports_adj':
        issues.append("Config Error: 'dependent_variable' is incorrect.")
    if set(econ_params.get('forecast_horizons_quarters', [])) != {0, 1, 2}:
        issues.append("Config Error: 'forecast_horizons_quarters' must be [0, 1, 2].")

    info_policy = econ_params.get('information_set_policy', {}).get('use_month_m_for_quarter_t', {})
    if info_policy.get('baseline') != 2 or info_policy.get('early_neos') != 3:
        issues.append("Config Error: 'information_set_policy' for month selection is incorrect.")

    dm_test_config = econ_params.get('significance_test_config', {})
    if dm_test_config.get('test_name') != 'Diebold-Mariano' or \
       dm_test_config.get('modification') != 'HAC standard errors':
        issues.append("Config Error: 'significance_test_config' is incorrect.")
    if dm_test_config.get('bandwidth_q') is not None:
        issues.append(
            "Config Warning: 'bandwidth_q' is pre-set. It should be None to allow "
            "for rule-of-thumb calculation during the test."
        )

    eval_windows = econ_params.get('evaluation_windows_by_indicator', {})
    if eval_windows.get('service_pmi', {}).get('start_quarter') != '2014-01-01' or \
       eval_windows.get('kof_biz_situation', {}).get('start_quarter') != '2009-04-01':
        issues.append("Config Error: 'evaluation_windows_by_indicator' start dates do not match paper footnotes.")

    return issues

# ------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# ------------------------------------------------------------------------------

def validate_master_config(
    fused_master_input_specification: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the validation of the entire master configuration dictionary.

    This function serves as a single pre-flight check for all study parameters,
    ensuring that the configuration aligns perfectly with the paper's
    methodology before any computation begins. It aggregates issues from all
    parameter sections into a single, comprehensive report.

    Args:
        fused_master_input_specification: The master configuration dictionary
            containing all study parameters.

    Returns:
        A dictionary containing the validation results:
        - 'status': 'Success' if no errors were found, else 'Failure'.
        - 'issues': A consolidated list of all warning and error messages.
    """
    # Extract the master config dictionary to be validated.
    master_config = fused_master_input_specification.get('master_config', {})
    if not master_config:
        return {"status": "Failure", "issues": ["Critical Error: 'master_config' key not found in input."]}

    # --- Execute validation for each parameter section ---
    # Step 1: Curation and Timezone Parameters
    curation_issues = _validate_curation_params(
        master_config.get('data_curation_params', {})
    )

    # Step 2: Embedding and Relevance Model Parameters
    embedding_relevance_issues = _validate_embedding_relevance_params(
        {
            'embedding_params': master_config.get('embedding_params', {}),
            'relevance_model_params': master_config.get('relevance_model_params', {})
        }
    )

    # Step 3: Sentiment, Aggregation, and Econometric Parameters
    sentiment_econometric_issues = _validate_sentiment_econometric_params(
        {
            'sentiment_model_params': master_config.get('sentiment_model_params', {}),
            'aggregation_params': master_config.get('aggregation_params', {}),
            'econometric_validation_params': master_config.get('econometric_validation_params', {})
        }
    )

    # Consolidate all issues found across the different sections.
    all_issues = (
        curation_issues +
        embedding_relevance_issues +
        sentiment_econometric_issues
    )

    # Determine the final status. Any issue containing "Error" is critical.
    is_failure = any("Error" in issue for issue in all_issues)
    status = "Failure" if is_failure else "Success"

    # Return the final, structured validation report.
    return {"status": status, "issues": all_issues}


In [None]:
# Task 4 — Cleanse the news corpus by enforcing scope, language, and publication-type filters

# ===========================================================================================
# Task 4: Cleanse the news corpus by enforcing scope, language, and publication-type filters
# ===========================================================================================

# ------------------------------------------------------------------------------
# Task 4, Step 1: Apply temporal, language, and publication-type filters
# ------------------------------------------------------------------------------

def _apply_scope_filters(
    df: pd.DataFrame,
    config: Dict[str, Any],
    audit_log: Dict[str, Any]
) -> pd.DataFrame:
    """
    Applies deterministic filters for time, language, and publication type.

    This function sequentially filters the input DataFrame based on the rules
    defined in the configuration, creating a complete audit trail of the process.

    Args:
        df: The input DataFrame to be filtered.
        config: The 'data_curation_params' sub-dictionary.
        audit_log: A dictionary to which filter counts will be added.

    Returns:
        A new DataFrame containing only the records that pass all filters.
    """
    # Record the initial number of articles for the audit trail.
    initial_count = len(df)
    audit_log['initial_article_count'] = initial_count

    # --- 1. Temporal Filter ---
    # Convert date strings from config to timezone-aware UTC timestamps.
    start_date = pd.to_datetime(config['start_date'], utc=True)
    end_date = pd.to_datetime(config['end_date'], utc=True)
    # Create a boolean mask for articles within the specified date range.
    temporal_mask = df['publication_datetime_utc'].between(
        start_date, end_date, inclusive='both'
    )
    # Apply the filter.
    df = df[temporal_mask]
    # Log the result of this filter stage.
    count_after_temporal = len(df)
    audit_log['filter_temporal'] = {
        'retained': count_after_temporal,
        'removed': initial_count - count_after_temporal
    }

    # --- 2. Language Filter ---
    # Create a boolean mask for articles in the allowed languages.
    language_mask = df['language'].isin(config['included_languages'])
    # Apply the filter.
    df = df[language_mask]
    # Log the result.
    count_after_language = len(df)
    audit_log['filter_language'] = {
        'retained': count_after_language,
        'removed': count_after_temporal - count_after_language
    }

    # --- 3. Publication Type Filter ---
    # Create a boolean mask to EXCLUDE specified publication types.
    pub_type_mask = ~df['publication_type'].isin(config['excluded_publication_types'])
    # Apply the filter.
    df = df[pub_type_mask]
    # Log the result.
    count_after_pub_type = len(df)
    audit_log['filter_publication_type'] = {
        'retained': count_after_pub_type,
        'removed': count_after_language - count_after_pub_type
    }

    # --- 4. Outlet Allowlist Filter (Conditional) ---
    # This filter only runs if the allowlist in the config is populated.
    if config.get('outlet_allowlist'):
        # Create a boolean mask for articles from allowed outlets.
        outlet_mask = df['outlet_id'].isin(config['outlet_allowlist'])
        # Apply the filter.
        df = df[outlet_mask]
        # Log the result.
        count_after_outlet = len(df)
        audit_log['filter_outlet_allowlist'] = {
            'retained': count_after_outlet,
            'removed': count_after_pub_type - count_after_outlet
        }
    else:
        # If the list is empty, log that the filter was skipped.
        audit_log['filter_outlet_allowlist'] = {'status': 'skipped', 'retained': len(df)}

    return df

# ------------------------------------------------------------------------------
# Task 4, Step 2: Enforce non-null constraints and remove invalid records
# ------------------------------------------------------------------------------

def _enforce_non_null_constraints(
    df: pd.DataFrame,
    audit_log: Dict[str, Any]
) -> pd.DataFrame:
    """
    Removes records with null or empty critical fields.

    This function drops rows where 'publication_datetime_utc' is null, or where
    'full_text' is null or contains only whitespace.

    Args:
        df: The filtered DataFrame.
        audit_log: The audit log to be updated.

    Returns:
        A new DataFrame with invalid records removed.
    """
    # Record the count before this cleansing step.
    count_before = len(df)

    # --- 1. Drop rows with null critical timestamps ---
    df.dropna(subset=['publication_datetime_utc'], inplace=True)

    # --- 2. Drop rows with null or whitespace-only text ---
    # Create a mask for rows where 'full_text' is null.
    null_text_mask = df['full_text'].isna()
    # Create a mask for rows where 'full_text' is just whitespace.
    whitespace_text_mask = df['full_text'].str.strip().eq('')
    # Combine the masks to identify all rows to be removed.
    invalid_text_mask = null_text_mask | whitespace_text_mask

    # Apply the filter to keep only valid rows.
    df = df[~invalid_text_mask]

    # Log the number of records removed.
    count_after = len(df)
    audit_log['cleanse_non_null'] = {
        'retained': count_after,
        'removed': count_before - count_after
    }

    return df

# ------------------------------------------------------------------------------
# Task 4, Step 3: Handle duplicates with deterministic policy
# ------------------------------------------------------------------------------

def _compute_fingerprint(
    text_series: pd.Series
) -> pd.Series:
    """
    Computes a deterministic SHA-256 fingerprint for each text entry in a Series.

    Purpose:
    This function is designed to create a unique and consistent identifier for
    unstructured text content. It first applies a strict normalization routine
    to eliminate superficial variations (e.g., case, whitespace) and then
    computes a cryptographic hash (SHA-256) of the normalized text. This
    fingerprint is essential for accurately identifying and removing duplicate
    articles from the corpus.

    Inputs:
        text_series (pd.Series): A pandas Series containing the raw, unstructured
                                 text data (e.g., the 'full_text' column of the
                                 news DataFrame). The Series is expected to have
                                 a string dtype.

    Processes:
    1.  Input Validation: Checks if the input is a pandas Series.
    2.  Normalization:
        a. Converts all text to lowercase to ensure case-insensitivity.
        b. Collapses all sequences of one or more whitespace characters
           (including tabs, newlines) into a single space.
        c. Strips any leading or trailing whitespace from the text.
    3.  Hashing:
        a. Iterates through each normalized text entry.
        b. If the entry is not null (pd.notna), it encodes the string to
           UTF-8 bytes.
        c. Computes the SHA-256 hash of the byte string.
        d. Returns the hexadecimal representation of the hash.
        e. If the entry is null, it propagates the null value (NaN/None).

    Outputs:
        (pd.Series): A new pandas Series of the same index as the input, where
                     each element is the computed SHA-256 hexadecimal digest
                     (string) corresponding to the text in the input Series, or
                     None if the input text was null.

    Error Handling:
        - Raises a TypeError if the input is not a pandas Series.
        - The underlying pandas string methods and the apply method handle
          element-wise operations gracefully, propagating nulls where appropriate.
    """
    # --- Input Validation ---
    # Ensure the input is a pandas Series to support vectorized string operations.
    if not isinstance(text_series, pd.Series):
        raise TypeError("Input `text_series` must be a pandas Series.")

    # --- Text Normalization ---
    # This sequence of vectorized operations creates a canonical string representation.
    # 1. Convert to lowercase for case-insensitivity.
    # 2. Replace any sequence of one or more whitespace characters with a single space.
    # 3. Remove any leading or trailing whitespace.
    normalized_text: pd.Series = (
        text_series.str.lower()
        .str.replace(r'\s+', ' ', regex=True)
        .str.strip()
    )

    # --- Hashing ---
    # Define a helper function for hashing to be used with .apply().
    def hash_text(text: Optional[str]) -> Optional[str]:
        """Encodes and hashes a single string, handling nulls."""
        # Check if the text is valid (not None, NaN, etc.).
        if pd.notna(text):
            # Encode the normalized string into a UTF-8 byte sequence.
            encoded_text = text.encode('utf-8')
            # Compute the SHA-256 hash of the byte sequence.
            hasher = hashlib.sha256(encoded_text)
            # Return the 64-character hexadecimal representation of the hash.
            return hasher.hexdigest()
        # If the input text is null, return None to propagate the null.
        return None

    # Apply the hashing function to each element in the normalized Series.
    # This produces the final Series of content fingerprints.
    fingerprint_series: pd.Series = normalized_text.apply(hash_text)

    return fingerprint_series

def _handle_duplicates(
    df: pd.DataFrame,
    audit_log: Dict[str, Any]
) -> pd.DataFrame:
    """
    Removes duplicate articles based on a content fingerprint.

    It uses the 'dedupe_fingerprint' column if available, otherwise it computes
    one on the fly. Duplicates are resolved by keeping the earliest ingested article.

    Args:
        df: The DataFrame to be deduplicated.
        audit_log: The audit log to be updated.

    Returns:
        A new, deduplicated DataFrame.
    """
    # Record the count before deduplication.
    count_before = len(df)

    # Determine the column to use for deduplication.
    fingerprint_col = 'dedupe_fingerprint'
    if fingerprint_col not in df.columns:
        # If the fingerprint column doesn't exist, compute it.
        audit_log['deduplication_fingerprint_status'] = 'computed_on_the_fly'
        df[fingerprint_col] = _compute_fingerprint(df['full_text'])
    else:
        audit_log['deduplication_fingerprint_status'] = 'used_existing_column'

    # Deduplicate:
    # 1. Sort by ingestion timestamp to ensure we keep the *first* record seen.
    # 2. Drop duplicates based on the fingerprint, keeping the 'first' entry.
    df_deduplicated = df.sort_values('ingestion_timestamp_utc').drop_duplicates(
        subset=[fingerprint_col], keep='first'
    )

    # Log the number of duplicates removed.
    count_after = len(df_deduplicated)
    audit_log['cleanse_duplicates'] = {
        'retained': count_after,
        'removed': count_before - count_after
    }

    # If the fingerprint was computed on the fly, drop the temporary column.
    if audit_log['deduplication_fingerprint_status'] == 'computed_on_the_fly':
        df_deduplicated = df_deduplicated.drop(columns=[fingerprint_col])

    return df_deduplicated

# ------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# ------------------------------------------------------------------------------

def cleanse_news_corpus(
    raw_news_data_df: pd.DataFrame,
    fused_master_input_specification: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the end-to-end cleansing of the raw news corpus.

    This function executes a sequence of filtering and cleansing steps to
    prepare the raw news data for analysis, strictly following the methodology
    defined in the configuration. It produces a cleansed DataFrame and a
    detailed audit log of all operations.

    Args:
        raw_news_data_df: The raw news corpus as a pandas DataFrame.
        fused_master_input_specification: The master configuration dictionary.

    Returns:
        A tuple containing:
        - A cleansed pandas DataFrame, ready for the embedding stage.
        - A dictionary serving as a detailed audit log of the cleansing process.
    """
    # Make a copy to avoid modifying the original DataFrame.
    df = raw_news_data_df.copy()

    # Initialize the audit log.
    audit_log = {}

    # Extract the relevant configuration section.
    curation_params = fused_master_input_specification['master_config']['data_curation_params']

    # --- Step 1: Apply scope filters ---
    df_filtered = _apply_scope_filters(df, curation_params, audit_log)

    # --- Step 2: Enforce non-null constraints ---
    df_non_null = _enforce_non_null_constraints(df_filtered, audit_log)

    # --- Step 3: Handle duplicates ---
    df_clean = _handle_duplicates(df_non_null, audit_log)

    # Add final count to the audit log.
    audit_log['final_article_count'] = len(df_clean)

    # Return the final cleansed DataFrame and the complete audit log.
    return df_clean, audit_log


In [None]:
# Task 5 — Prepare temporal metadata and produce corpus audit manifest

# ==============================================================================
# Task 5: Prepare temporal metadata and produce corpus audit manifest
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 5, Step 1: Standardize all timestamps to UTC and verify timezone awareness
# ------------------------------------------------------------------------------

def _assert_timestamp_integrity(
    df: pd.DataFrame
) -> None:
    """
    Asserts the integrity of all timestamp columns in the DataFrame.

    Purpose:
    This function serves as a final, critical guardrail after all cleansing
    operations. It verifies that all specified timestamp columns are of a
    datetime dtype and are explicitly localized to UTC. Any deviation from this
    standard would compromise all subsequent time-based operations and is
    treated as a fatal error.

    Inputs:
        df (pd.DataFrame): The cleansed news corpus DataFrame.

    Processes:
    1.  Defines a list of timestamp columns to be checked.
    2.  Iterates through each column.
    3.  Asserts that the column's dtype is a pandas datetime type.
    4.  Asserts that the column's timezone attribute (`.dt.tz`) is not None.
    5.  Asserts that the timezone is unequivocally 'UTC'.

    Outputs:
        None: The function returns None if all assertions pass.

    Error Handling:
        Raises ValueError: If any timestamp column fails any of the integrity
                           checks, indicating a critical flaw in the upstream
                           data processing pipeline.
    """
    # Define the list of columns that must be timezone-aware UTC datetimes.
    timestamp_columns = [
        'publication_datetime_utc',
        'ingestion_timestamp_utc',
        'last_modified_timestamp_utc'
    ]

    # Iterate through each required timestamp column for validation.
    for col in timestamp_columns:
        # Check 1: Ensure the column exists and has a datetime-like dtype.
        if not pd.api.types.is_datetime64_any_dtype(df[col].dtype):
            raise ValueError(
                f"Timestamp Integrity Error: Column '{col}' is not a datetime "
                f"dtype. Found '{df[col].dtype}'."
            )

        # Check 2: Ensure the datetime column is timezone-aware.
        if df[col].dt.tz is None:
            raise ValueError(
                f"Timestamp Integrity Error: Column '{col}' is timezone-naive. "
                "All timestamps must be timezone-aware."
            )

        # Check 3: Ensure the timezone is specifically UTC.
        if str(df[col].dt.tz) != 'UTC':
            raise ValueError(
                f"Timestamp Integrity Error: Column '{col}' is not in UTC. "
                f"Found timezone '{df[col].dt.tz}'."
            )

# ------------------------------------------------------------------------------
# Task 5, Step 2: Compute temporal partitions for early-release windows
# ------------------------------------------------------------------------------

def _add_temporal_partitions(
    df: pd.DataFrame
) -> pd.DataFrame:
    """
    Enriches the DataFrame with temporal features for aggregation.

    Purpose:
    This function adds several derived columns based on the authoritative
    'publication_datetime_utc' timestamp. These new columns are essential for
    performing efficient monthly aggregations and for constructing the
    early-release variants of the NEOS indicator. The use of vectorized
    pandas operations ensures high performance.

    Inputs:
        df (pd.DataFrame): The cleansed and timestamp-verified DataFrame.

    Processes:
    1.  Creates 'year_month' (e.g., '2023-04') for monthly grouping.
    2.  Creates 'day_of_month' (integer 1-31) for intra-month filtering.
    3.  Creates boolean flags for articles published within the first 7, 14,
        and 21 days of the month, which are required for the early-release
        NEOS variants.

    Outputs:
        (pd.DataFrame): A new DataFrame with the added temporal columns. The
                        original DataFrame is not modified.
    """
    # Create a copy to avoid modifying the original DataFrame in place.
    df_enriched = df.copy()

    # --- Create 'year_month' column for efficient monthly grouping ---
    # The format 'YYYY-MM' is standard and sorts correctly as a string.
    df_enriched['year_month'] = df_enriched['publication_datetime_utc'].dt.strftime('%Y-%m')

    # --- Create 'day_of_month' column for intra-month windowing ---
    # Extracts the day as an integer from the timestamp.
    df_enriched['day_of_month'] = df_enriched['publication_datetime_utc'].dt.day

    # --- Create boolean flags for early-release windows ---
    # These flags enable fast filtering for indicator construction later.
    # Flag for articles published on or before the 7th day of the month.
    df_enriched['is_in_first_7_days'] = df_enriched['day_of_month'] <= 7
    # Flag for articles published on or before the 14th day of the month.
    df_enriched['is_in_first_14_days'] = df_enriched['day_of_month'] <= 14
    # Flag for articles published on or before the 21st day of the month.
    df_enriched['is_in_first_21_days'] = df_enriched['day_of_month'] <= 21

    return df_enriched

# ------------------------------------------------------------------------------
# Task 5, Step 3: Generate corpus audit manifest
# ------------------------------------------------------------------------------

def _generate_corpus_audit_manifest(
    df_clean: pd.DataFrame,
    df_raw: pd.DataFrame
) -> Dict[str, Any]:
    """
    Generates a comprehensive audit manifest of the cleansed corpus.

    Purpose:
    This function produces a structured dictionary containing key summary
    statistics and distributions of the final, cleansed corpus. This manifest
    is a critical artifact for reproducibility, data quality assessment, and
    understanding the characteristics of the data entering the modeling pipeline.

    Inputs:
        df_clean (pd.DataFrame): The final, cleansed, and temporally-enriched DataFrame.
        df_raw (pd.DataFrame): The original, pre-filter DataFrame for comparison.

    Processes:
    1.  Calculates total article counts before and after cleansing.
    2.  Computes breakdowns of the clean corpus by language and outlet.
    3.  Generates time-series DataFrames of monthly article counts, both total
        and by language.
    4.  Calculates 'section_label' completeness ratios per outlet and over time.

    Outputs:
        (Dict[str, Any]): A dictionary where keys are descriptive names of
                          audits and values are pandas DataFrames or scalars
                          containing the summary statistics.
    """
    # Initialize the manifest dictionary.
    manifest = {}

    # --- Overall Counts ---
    # Record the number of articles before and after the entire cleansing process.
    manifest['total_articles_pre_filter'] = len(df_raw)
    manifest['total_articles_post_filter'] = len(df_clean)

    # --- Corpus Composition Breakdowns ---
    # Breakdown by language.
    manifest['articles_by_language'] = df_clean['language'].value_counts().to_frame('count')
    # Breakdown by outlet.
    manifest['articles_by_outlet'] = df_clean['outlet_id'].value_counts().to_frame('count')

    # --- Temporal Distributions ---
    # Total monthly article counts over time.
    manifest['monthly_article_counts_total'] = (
        df_clean.groupby('year_month')['article_id']
        .count()
        .to_frame('count')
        .sort_index()
    )
    # Monthly article counts broken down by language.
    manifest['monthly_article_counts_by_language'] = (
        df_clean.groupby(['year_month', 'language'])['article_id']
        .count()
        .unstack()
        .sort_index()
        .fillna(0)
        .astype(int)
    )

    # --- Data Quality Metrics ---
    # 'section_label' completeness ratio for each outlet.
    manifest['section_label_completeness_by_outlet'] = (
        df_clean.groupby('outlet_id')['section_label']
        .apply(lambda s: s.notna().mean())
        .to_frame('completeness_ratio')
        .sort_values('completeness_ratio', ascending=False)
    )
    # 'section_label' completeness ratio over time.
    manifest['section_label_completeness_monthly'] = (
        df_clean.groupby('year_month')['section_label']
        .apply(lambda s: s.notna().mean())
        .to_frame('completeness_ratio')
        .sort_index()
    )

    return manifest

# ------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# ------------------------------------------------------------------------------

def prepare_corpus_for_embedding(
    clean_df: pd.DataFrame,
    raw_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the final preparation of the corpus before embedding.

    This function serves as the final step in the data preparation pipeline. It
    first performs a critical, final validation of timestamp integrity. It then
    enriches the data with temporal features required for downstream analysis
    and finally produces a comprehensive audit manifest documenting the state
    of the prepared corpus.

    Args:
        clean_df (pd.DataFrame): The cleansed news corpus DataFrame from Task 4.
        raw_df (pd.DataFrame): The original, raw news corpus DataFrame, required
                               for comparative statistics in the audit manifest.

    Returns:
        A tuple containing:
        - (pd.DataFrame): The fully prepared DataFrame, enriched with temporal
          partitions and ready for the embedding process.
        - (Dict[str, Any]): The corpus audit manifest, a dictionary containing
          key summary statistics as DataFrames and scalars.

    Raises:
        ValueError: If the timestamp integrity check fails.
    """
    # --- Step 1: Final Timestamp Integrity Assertion ---
    # This is a critical guardrail; if it fails, the pipeline stops.
    _assert_timestamp_integrity(clean_df)

    # --- Step 2: Add Temporal Partitions ---
    # Enrich the DataFrame with necessary date/time components.
    df_enriched = _add_temporal_partitions(clean_df)

    # --- Step 3: Generate the Corpus Audit Manifest ---
    # Create the final summary document of the prepared data.
    audit_manifest = _generate_corpus_audit_manifest(df_enriched, raw_df)

    # Return the enriched data and its corresponding audit manifest.
    return df_enriched, audit_manifest


In [None]:
# Task 6 — Generate document embeddings with jina-embeddings-v3 and compute token statistics

# ==========================================================================================
# Task 6: Generate document embeddings with jina-embeddings-v3 and compute token statistics
# ==========================================================================================

# ------------------------------------------------------------------------------
# Task 6, Steps 1 & 2: Tokenize, Embed, and Store with Provenance
# ------------------------------------------------------------------------------

def _run_embedding_inference(
    df: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str
) -> Tuple[pd.DataFrame, str, str, Dict[str, Any]]:
    """
    Runs the end-to-end document embedding pipeline.

    Purpose:
    This function is the computational core of the feature engineering stage. It
    loads the specified Transformer model, processes the text corpus in batches
    to manage memory, generates document embeddings, and computes token-level
    statistics. The outputs are saved to disk in an efficient format, and a
    detailed provenance log is created.

    Inputs:
        df (pd.DataFrame): The prepared DataFrame containing the 'full_text' and
                           'article_id' columns.
        config (Dict[str, Any]): The 'embedding_params' sub-dictionary from the
                                 master configuration.
        output_dir (str): Path to the directory where artifacts will be saved.

    Processes:
    1.  Sets up the device (GPU if available, else CPU).
    2.  Loads the SentenceTransformer model specified in the config.
    3.  Creates a detailed provenance dictionary with model and tokenizer info.
    4.  Initializes an HDF5 file for out-of-core storage of embeddings.
    5.  Iterates through the input DataFrame in batches:
        a. Extracts a batch of texts.
        b. Uses the model's tokenizer to get token counts and applies the
           truncation policy.
        c. Uses the model to encode the texts into 1024-dim vectors.
        d. Appends the computed embeddings to the HDF5 file.
        e. Collects token statistics and article IDs for the crosswalk table.
    6.  Saves the crosswalk table (article_id -> embedding_row_index) and
        updates the input DataFrame with token statistics.

    Outputs:
        A tuple containing:
        - (pd.DataFrame): The input DataFrame augmented with 'token_count' and
          'truncation_flag' columns.
        - (str): The file path to the saved HDF5 embeddings file.
        - (str): The file path to the saved crosswalk table (CSV).
        - (Dict[str, Any]): A provenance dictionary with model details.
    """
    # --- 1. Setup and Provenance ---
    # Determine the computation device (prefer CUDA GPU if available).
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Using device: {device}")

    # Extract model parameters from the configuration.
    model_name = config['model_name']
    batch_size = config.get('batch_size', 32) # Default to 32 if not specified
    max_tokens = config['max_input_tokens']

    # Load the SentenceTransformer model. This will also download it if not cached.
    # The model is moved to the selected device.
    model = SentenceTransformer(model_name, device=device)

    # The tokenizer is an attribute of the loaded model.
    tokenizer = model.tokenizer

    # Create a provenance log for reproducibility.
    provenance = {
        'model_name': model_name,
        'model_class': model.__class__.__name__,
        'tokenizer_class': tokenizer.__class__.__name__,
        'max_input_tokens': max_tokens,
        'batch_size': batch_size,
        'embedding_dimension': model.get_sentence_embedding_dimension(),
    }

    # --- 2. Initialize Output Artifacts ---
    # Define file paths for the outputs.
    os.makedirs(output_dir, exist_ok=True)
    embeddings_path = os.path.join(output_dir, 'document_embeddings.h5')
    crosswalk_path = os.path.join(output_dir, 'embeddings_crosswalk.csv')

    # Initialize the HDF5 file for storing embeddings.
    # We use a resizable dataset to append batches.
    with h5py.File(embeddings_path, 'w') as f:
        f.create_dataset(
            'embeddings',
            shape=(0, provenance['embedding_dimension']),
            maxshape=(None, provenance['embedding_dimension']),
            dtype='float32',
            chunks=True # Enable chunking for efficient partial I/O
        )

    # --- 3. Batch Processing Loop ---
    # Prepare lists to store metadata collected during the loop.
    all_texts = df['full_text'].tolist()
    all_article_ids = df['article_id'].tolist()
    token_stats = []

    # Process the corpus in batches to manage memory.
    for i in tqdm(range(0, len(all_texts), batch_size), desc="Embedding Articles"):
        # Extract the current batch of texts.
        batch_texts = all_texts[i:i + batch_size]

        # --- Step 1 (logic): Tokenization and Statistics ---
        # Tokenize the batch to get token counts before potential truncation.
        # We do not truncate here, just count the raw tokens.
        tokenized_batch = tokenizer.batch_encode_plus(
            batch_texts,
            add_special_tokens=True,
            return_tensors=None, # Return lists of token IDs
            truncation=False # Do not truncate for the count
        )

        # Calculate token counts and truncation flags for the batch.
        for j, input_ids in enumerate(tokenized_batch['input_ids']):
            count = len(input_ids)
            token_stats.append({
                'article_id': all_article_ids[i+j],
                'token_count': count,
                'truncation_flag': count > max_tokens
            })

        # --- Step 2 (logic): Embedding Inference ---
        # Encode the batch. The model's internal tokenizer will handle truncation
        # based on its configured max_seq_length, which we align with our limit.
        model.max_seq_length = max_tokens
        batch_embeddings = model.encode(
            batch_texts,
            batch_size=len(batch_texts), # Inner batch size
            show_progress_bar=False, # Disable inner progress bar
            convert_to_numpy=True,
            normalize_embeddings=False # Keep raw embeddings
        )

        # --- 4. Store Batch Results ---
        # Append the computed embeddings to the HDF5 file.
        with h5py.File(embeddings_path, 'a') as f:
            dset = f['embeddings']
            # Resize the dataset to accommodate the new batch.
            dset.resize(dset.shape[0] + len(batch_embeddings), axis=0)
            # Write the new batch of embeddings to the end of the dataset.
            dset[-len(batch_embeddings):] = batch_embeddings.astype('float32')

    # --- 5. Finalize and Save Metadata ---
    # Convert the collected token statistics into a DataFrame.
    token_stats_df = pd.DataFrame(token_stats)

    # Merge the token statistics back into the original DataFrame.
    df_augmented = df.merge(token_stats_df, on='article_id', how='left')

    # Create and save the crosswalk table.
    # The row index in the HDF5 file corresponds to the original DataFrame order.
    crosswalk_df = pd.DataFrame({
        'article_id': all_article_ids,
        'embedding_row_index': range(len(all_article_ids))
    })
    crosswalk_df.to_csv(crosswalk_path, index=False)

    return df_augmented, embeddings_path, crosswalk_path, provenance

# ------------------------------------------------------------------------------
# Task 6, Step 3: Validate embedding quality and detect drift
# ------------------------------------------------------------------------------

def _validate_embedding_quality(
    embeddings_path: str,
    crosswalk_path: str,
    df_augmented: pd.DataFrame
) -> Dict[str, Any]:
    """
    Performs quality validation and temporal drift analysis on embeddings.

    Purpose:
    This function provides a crucial post-mortem analysis of the generated
    embeddings. It computes summary statistics to check for numerical issues
    and analyzes the temporal evolution of the average embedding to detect
    potential data or model drift over time.

    Inputs:
        embeddings_path (str): Path to the HDF5 file containing embeddings.
        crosswalk_path (str): Path to the CSV crosswalk table.
        df_augmented (pd.DataFrame): DataFrame with temporal metadata.

    Processes:
    1.  Computes global summary statistics (mean, std, min, max) by reading
        the HDF5 file in chunks to remain memory-efficient.
    2.  Checks for any NaN or infinity values in the embeddings.
    3.  Merges metadata with the crosswalk to link embeddings to months/languages.
    4.  Calculates the mean embedding vector for each month.
    5.  Computes the cosine similarity between each monthly mean and the global
        mean to create a drift metric time series.

    Outputs:
        (Dict[str, Any]): A dictionary containing the diagnostic report,
                          including summary statistics and a DataFrame of the
                          temporal drift analysis.
    """
    # Initialize the diagnostic report.
    diagnostics = {}

    # --- 1. Summary Statistics (Memory-Efficient) ---
    # Use an iterative approach to calculate stats without loading all data.
    n_rows, n_dims = 0, 0
    sum_vec, sum_sq_vec = None, None
    min_val, max_val = np.inf, -np.inf
    has_nan_inf = False

    with h5py.File(embeddings_path, 'r') as f:
        dset = f['embeddings']
        n_rows, n_dims = dset.shape
        if sum_vec is None:
            sum_vec = np.zeros(n_dims, dtype=np.float64)
            sum_sq_vec = np.zeros(n_dims, dtype=np.float64)

        # Read the dataset in chunks.
        chunk_size = 10000
        for i in tqdm(range(0, n_rows, chunk_size), desc="Validating Embeddings"):
            chunk = dset[i:i + chunk_size]
            if np.any(~np.isfinite(chunk)):
                has_nan_inf = True

            sum_vec += np.sum(chunk, axis=0)
            sum_sq_vec += np.sum(chunk**2, axis=0)
            min_val = min(min_val, np.min(chunk))
            max_val = max(max_val, np.max(chunk))

    # Calculate final statistics.
    global_mean = sum_vec / n_rows
    global_std = np.sqrt(sum_sq_vec / n_rows - global_mean**2)

    diagnostics['summary_stats'] = {
        'num_embeddings': n_rows,
        'embedding_dim': n_dims,
        'global_mean_norm': np.linalg.norm(global_mean),
        'global_std_mean': np.mean(global_std),
        'min_value': min_val,
        'max_value': max_val,
        'contains_nan_or_inf': has_nan_inf
    }

    # --- 2. Temporal Drift Analysis ---
    # Load crosswalk and merge with temporal metadata.
    crosswalk_df = pd.read_csv(crosswalk_path)
    metadata_df = df_augmented[['article_id', 'year_month']].merge(
        crosswalk_df, on='article_id'
    )

    monthly_means = {}
    with h5py.File(embeddings_path, 'r') as f:
        dset = f['embeddings']
        # Group by month and get the corresponding embedding indices.
        for month, group in tqdm(metadata_df.groupby('year_month'), desc="Analyzing Drift"):
            indices = group['embedding_row_index'].values
            # Retrieve embeddings for the month and compute the mean.
            # Reading by index list is efficient in HDF5.
            monthly_embeddings = dset[sorted(indices)]
            monthly_means[month] = np.mean(monthly_embeddings, axis=0)

    # Calculate cosine similarity between each monthly mean and the global mean.
    drift_data = []
    for month, month_mean in monthly_means.items():
        # Reshape for scipy's cosine function.
        similarity = 1 - torch.nn.functional.cosine_similarity(
            torch.from_numpy(month_mean).unsqueeze(0),
            torch.from_numpy(global_mean).unsqueeze(0)
        ).item()
        drift_data.append({'year_month': month, 'cosine_similarity_to_global': similarity})

    diagnostics['temporal_drift'] = pd.DataFrame(drift_data).sort_values('year_month').set_index('year_month')

    return diagnostics

# ------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# ------------------------------------------------------------------------------

def generate_document_embeddings(
    prepared_df: pd.DataFrame,
    fused_master_input_specification: Dict[str, Any],
    output_directory: str
) -> Tuple[pd.DataFrame, str, str, Dict[str, Any]]:
    """
    Orchestrates the document embedding and validation pipeline.

    This function manages the end-to-end process of converting a prepared text
    corpus into document embeddings. It handles the computationally intensive
    inference step, saves the artifacts to disk, and performs a subsequent
    quality validation.

    Args:
        prepared_df (pd.DataFrame): The fully prepared DataFrame from Task 5.
        fused_master_input_specification (Dict[str, Any]): The master config.
        output_directory (str): The directory to save output artifacts.

    Returns:
        A tuple containing:
        - (pd.DataFrame): The DataFrame augmented with token statistics.
        - (str): Path to the HDF5 file containing the embeddings.
        - (str): Path to the CSV file for the article-to-embedding crosswalk.
        - (Dict[str, Any]): A dictionary containing the full diagnostic report.
    """
    # Suppress a common warning from sentence-transformers about tokenization length.
    # We handle this explicitly, so the warning is not needed.
    warnings.filterwarnings("ignore", message="Token indices sequence length is longer than the specified maximum sequence length")

    # Extract the relevant configuration section.
    embedding_config = fused_master_input_specification['master_config']['embedding_params']

    # --- Steps 1 & 2: Run Inference and Store Artifacts ---
    # This single function handles tokenization, embedding, and storage.
    df_augmented, embeddings_path, crosswalk_path, provenance = _run_embedding_inference(
        df=prepared_df,
        config=embedding_config,
        output_dir=output_directory
    )

    # --- Step 3: Validate Embedding Quality ---
    # Perform post-hoc validation on the generated artifacts.
    diagnostics = _validate_embedding_quality(
        embeddings_path=embeddings_path,
        crosswalk_path=crosswalk_path,
        df_augmented=df_augmented
    )

    # Add provenance information to the final diagnostic report.
    diagnostics['provenance'] = provenance

    # Restore default warning behavior.
    warnings.resetwarnings()

    return df_augmented, embeddings_path, crosswalk_path, diagnostics


In [None]:
# Task 7 — Construct weak labels for relevance classification and prepare train/validation split

# ==============================================================================================
# Task 7: Construct weak labels for relevance classification and prepare train/validation split
# ==============================================================================================

# ------------------------------------------------------------------------------
# Task 7, Step 1: Map section_label to binary weak labels
# ------------------------------------------------------------------------------

def _create_weak_labels(
    df_augmented: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Creates a DataFrame with weak binary labels based on 'section_label'.

    Purpose:
    This function implements the weak supervision strategy from the paper. It
    filters for articles that have a non-null 'section_label' and assigns a
    binary label (1 for economics-related, 0 for other) based on whether the
    section name appears in a predefined list of positive keywords.

    Inputs:
        df_augmented (pd.DataFrame): The DataFrame containing the full corpus
                                     metadata, including 'section_label'.
        config (Dict[str, Any]): The 'relevance_model_params' sub-dictionary
                                 from the master config.

    Processes:
    1.  Filters the input DataFrame to retain only rows with a non-null
        'section_label'.
    2.  Normalizes the 'section_label' column (lowercase, strip whitespace) for
        robust matching.
    3.  Creates a lowercase set of positive section keywords from the config for
        efficient lookup.
    4.  Assigns a new column 'y_econ' with a value of 1 if the normalized
        section label is in the positive set, and 0 otherwise.

    Outputs:
        (pd.DataFrame): A new DataFrame containing only the labelable articles,
                        with the addition of the 'y_econ' binary label column.
    """
    # --- 1. Filter for labelable articles ---
    # Select only the rows where a section label is present.
    labeled_df = df_augmented[df_augmented['section_label'].notna()].copy()

    # --- 2. Prepare positive class vocabulary ---
    # Get the list of positive sections from the config.
    positive_sections: List[str] = config['positive_class_sections']
    # Convert to a lowercase set for efficient, case-insensitive matching.
    positive_sections_set: Set[str] = {s.lower() for s in positive_sections}

    # --- 3. Assign binary labels ---
    # Normalize the 'section_label' column for matching.
    normalized_labels = labeled_df['section_label'].str.lower().str.strip()
    # Create the 'y_econ' column. The result of .isin() is a boolean Series,
    # which is cast to integer (True -> 1, False -> 0).
    labeled_df['y_econ'] = normalized_labels.isin(positive_sections_set).astype(int)

    return labeled_df

# ------------------------------------------------------------------------------
# Task 7, Step 2: Identify outlets with reliable section coverage and restrict training set
# ------------------------------------------------------------------------------

def _filter_by_outlet_coverage(
    labeled_df: pd.DataFrame,
    full_df: pd.DataFrame,
    coverage_threshold: float = 0.5
) -> pd.DataFrame:
    """
    Filters the labeled dataset to include only articles from reliable outlets.

    Purpose:
    Some outlets may provide 'section_label' metadata sporadically or not at all.
    Training on data from such outlets could introduce noise. This function
    identifies outlets that provide section labels for a sufficiently high
    proportion of their articles (defined by `coverage_threshold`) and restricts
    the training dataset to only these "reliable" sources.

    Inputs:
        labeled_df (pd.DataFrame): The DataFrame of articles with weak labels.
        full_df (pd.DataFrame): The full, cleansed corpus DataFrame, used to
                                calculate accurate coverage ratios.
        coverage_threshold (float): The minimum ratio of labeled to total
                                    articles for an outlet to be considered reliable.

    Processes:
    1.  Calculates the total number of articles for each 'outlet_id' in the full corpus.
    2.  Calculates the number of labeled articles for each 'outlet_id'.
    3.  Computes the coverage ratio for each outlet.
    4.  Identifies the set of 'outlet_id's that meet the coverage threshold.
    5.  Filters the input `labeled_df` to keep only articles from these reliable outlets.

    Outputs:
        (pd.DataFrame): A filtered DataFrame containing high-quality labeled data.
    """
    # --- 1. Calculate total articles per outlet from the full corpus ---
    total_counts = full_df['outlet_id'].value_counts()

    # --- 2. Calculate labeled articles per outlet ---
    labeled_counts = labeled_df['outlet_id'].value_counts()

    # --- 3. Compute coverage ratio ---
    # Combine the two series into a DataFrame for calculation.
    coverage_df = pd.DataFrame({'total': total_counts, 'labeled': labeled_counts}).fillna(0)
    # The ratio is the number of labeled articles divided by the total.
    coverage_df['coverage_ratio'] = coverage_df['labeled'] / coverage_df['total']

    # --- 4. Identify reliable outlets ---
    # Find the outlets where the coverage ratio meets the threshold.
    reliable_outlets = coverage_df[coverage_df['coverage_ratio'] >= coverage_threshold].index.tolist()

    # --- 5. Filter the labeled dataset ---
    # Keep only the articles from the identified reliable outlets.
    high_quality_labeled_df = labeled_df[labeled_df['outlet_id'].isin(reliable_outlets)]

    return high_quality_labeled_df

# ------------------------------------------------------------------------------
# Task 7, Step 3: Perform temporal train/validation split to avoid leakage
# ------------------------------------------------------------------------------

def _perform_temporal_split(
    df: pd.DataFrame,
    validation_split_ratio: float
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Splits a time-indexed DataFrame into training and validation sets temporally.

    Purpose:
    To prevent look-ahead bias, a random split is inappropriate for time-series
    data. This function strictly adheres to a temporal splitting methodology:
    it sorts the data by time and allocates the earliest data for training and
    the most recent data for validation. This simulates a real-world scenario
    where a model is trained on the past and evaluated on the future.

    Inputs:
        df (pd.DataFrame): The DataFrame to be split, containing a
                           'publication_datetime_utc' column.
        validation_split_ratio (float): The proportion of the data to allocate
                                        to the validation set (e.g., 0.2 for 20%).

    Processes:
    1.  Sorts the entire DataFrame chronologically by 'publication_datetime_utc'.
    2.  Calculates the integer index at which to split the data.
    3.  Uses `.iloc` to slice the DataFrame into training and validation sets.
    4.  Performs a critical assertion to guarantee no time overlap between the two sets.
    5.  Logs the size and class distribution of each set for transparency.

    Outputs:
        A tuple containing:
        - (pd.DataFrame): The training set (earlier data).
        - (pd.DataFrame): The validation set (later data).
    """
    # --- 1. Temporal Sorting ---
    # This is the most critical step to ensure a valid temporal split.
    df_sorted = df.sort_values('publication_datetime_utc').reset_index(drop=True)

    # --- 2. Calculate Split Point ---
    # Determine the index that separates the training and validation data.
    split_index = int(len(df_sorted) * (1 - validation_split_ratio))

    # --- 3. Slice the DataFrame ---
    # The training set contains all data up to the split point.
    train_df = df_sorted.iloc[:split_index]
    # The validation set contains all data from the split point onwards.
    val_df = df_sorted.iloc[split_index:]

    # --- 4. Verification ---
    # This assertion is a crucial safeguard against implementation errors.
    if not train_df.empty and not val_df.empty:
        assert train_df['publication_datetime_utc'].max() <= val_df['publication_datetime_utc'].min()

    # --- 5. Logging ---
    # Provide transparent reporting on the outcome of the split.
    print(f"Temporal split resulted in:")
    print(f"  - Training set size: {len(train_df)}")
    print(f"  - Validation set size: {len(val_df)}")
    if 'y_econ' in train_df.columns:
        print(f"  - Training set class distribution:\n{train_df['y_econ'].value_counts(normalize=True)}")
        print(f"  - Validation set class distribution:\n{val_df['y_econ'].value_counts(normalize=True)}")

    return train_df, val_df

# ------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# ------------------------------------------------------------------------------

def prepare_relevance_training_data(
    df_augmented: pd.DataFrame,
    crosswalk_path: str,
    embeddings_path: str,
    fused_master_input_specification: Dict[str, Any]
) -> Tuple[np.ndarray, pd.Series, np.ndarray, pd.Series]:
    """
    Orchestrates the full pipeline for creating relevance model training data.

    Purpose:
    This function manages the entire process of preparing data for the relevance
    classifier. It translates noisy metadata into weak labels, selects a
    high-quality subset of articles from reliable sources, performs a
    temporally-sound train/validation split to prevent look-ahead bias, and
    retrieves the corresponding document embeddings from storage. A critical
    feature of this implementation is the robust alignment of features (X) and
    labels (y) to prevent data shuffling errors.

    Inputs:
        df_augmented (pd.DataFrame): The fully prepared corpus DataFrame from Task 5/6.
        crosswalk_path (str): Path to the CSV crosswalk table which maps
                              'article_id' to 'embedding_row_index'.
        embeddings_path (str): Path to the HDF5 file containing all embeddings.
        fused_master_input_specification (Dict[str, Any]): The master config.

    Processes:
    1.  Calls a helper to create weak binary labels from 'section_label' metadata.
    2.  Calls a helper to filter the labeled data, keeping only articles from
        outlets with high metadata coverage.
    3.  Calls a helper to perform a strict temporal split into training and
        validation sets.
    4.  Retrieves the HDF5 row indices for the articles in each set.
    5.  Reads the corresponding embedding vectors from the HDF5 file using a
        sorted list of indices for efficiency.
    6.  Performs a robust re-indexing of the labels to guarantee
        perfect alignment with the sorted order of the retrieved embeddings.
    7.  Returns the final, perfectly aligned training and validation matrices/vectors.

    Outputs:
        A tuple containing the prepared data in a format ready for model training:
        - (np.ndarray): X_train, the matrix of training embeddings.
        - (pd.Series): y_train, the series of training labels, perfectly aligned with X_train.
        - (np.ndarray): X_val, the matrix of validation embeddings.
        - (pd.Series): y_val, the series of validation labels, perfectly aligned with X_val.
    """
    # --- Input Validation ---
    if not isinstance(df_augmented, pd.DataFrame):
        raise TypeError("`df_augmented` must be a pandas DataFrame.")
    # Further input validation would check for required columns.

    # --- Extract Configuration ---
    # Get the parameters for the relevance model and its training procedure.
    relevance_config = fused_master_input_specification['master_config']['relevance_model_params']
    training_config = relevance_config['training_params']

    # --- Step 1: Create weak labels from section metadata ---
    # This creates a new DataFrame containing only articles with section labels.
    labeled_df = _create_weak_labels(df_augmented, relevance_config)

    # --- Step 2: Filter for articles from reliable outlets ---
    # This refines the labeled set to a high-quality subset for training.
    high_quality_df = _filter_by_outlet_coverage(labeled_df, df_augmented, 0.5)

    # --- Step 3: Perform the temporal train/validation split ---
    # This splits the data into past (train) and future (validation) sets.
    train_meta_df, val_meta_df = _perform_temporal_split(
        high_quality_df,
        training_config['validation_split']
    )

    # --- Step 4: Retrieve Embeddings and Align Labels ---
    # Load the crosswalk to map article_ids to HDF5 row indices.
    crosswalk_df = pd.read_csv(crosswalk_path)

    # Merge with metadata to get indices for our train/val sets.
    train_merged = train_meta_df.merge(crosswalk_df, on='article_id')
    val_merged = val_meta_df.merge(crosswalk_df, on='article_id')

    # Get the raw HDF5 row indices for each set.
    train_indices = train_merged['embedding_row_index'].values
    val_indices = val_merged['embedding_row_index'].values

    # Sort the indices for efficient reading.
    sorted_train_indices = np.sort(train_indices)
    sorted_val_indices = np.sort(val_indices)

    # Open the HDF5 file to read the embedding data.
    with h5py.File(embeddings_path, 'r') as f:
        embeddings_dset = f['embeddings']
        # Retrieve the feature matrices (X) using the sorted indices.
        # This defines the final order of the samples in our training data.
        X_train = embeddings_dset[sorted_train_indices]
        X_val = embeddings_dset[sorted_val_indices]

    # Now, reorder the labels (y) to match the exact order of the feature matrices.
    # First, set the index of the metadata to the HDF5 row index.
    train_reindexed = train_merged.set_index('embedding_row_index')
    val_reindexed = val_merged.set_index('embedding_row_index')

    # Then, use `.loc` with the *sorted* indices to select and reorder the labels.
    y_train = train_reindexed.loc[sorted_train_indices]['y_econ']
    y_val = val_reindexed.loc[sorted_val_indices]['y_econ']

    # --- Final Assertion ---
    # This safeguard confirms that the number of samples in features and labels match.
    assert len(X_train) == len(y_train), "Mismatch in training set feature/label count."
    assert len(X_val) == len(y_val), "Mismatch in validation set feature/label count."

    print(f"Successfully prepared and aligned data for relevance model training.")

    return X_train, y_train, X_val, y_val


In [None]:
# Task 8 — Train the MLP relevance classifier with early stopping and balanced class weighting

# =============================================================================================
# Task 8: Train the MLP relevance classifier with early stopping and balanced class weighting
# =============================================================================================

# ------------------------------------------------------------------------------
# Task 8, Step 1: Specify the MLP architecture and initialize
# ------------------------------------------------------------------------------

def _build_mlp_model(
    config: Dict[str, Any],
    seed: int
) -> tf.keras.Model:
    """
    Builds a Keras Sequential model based on a declarative configuration.

    Purpose:
    This function translates the `nn_architecture` specification from the master
    configuration into a concrete `tf.keras.Model` instance. It ensures
    reproducibility by setting a global random seed before model construction,
    which governs weight initialization and dropout behavior.

    Inputs:
        config (Dict[str, Any]): The 'relevance_model_params' sub-dictionary,
                                 containing the 'nn_architecture' key.
        seed (int): The global random seed for reproducibility.

    Processes:
    1.  Sets the TensorFlow global random seed.
    2.  Initializes a `tf.keras.Sequential` model.
    3.  Dynamically adds an Input layer based on 'input_dim' from the config.
    4.  Iterates through the list of layer specifications in the config.
    5.  For each specification, it adds the corresponding Keras layer (Dense or
        Dropout) with the specified parameters.

    Outputs:
        (tf.keras.Model): The constructed, but not yet compiled, Keras model.

    Error Handling:
        Raises ValueError: If the configuration specifies an unsupported layer type.
    """
    # --- 1. Set Seed for Reproducibility ---
    # This ensures that weight initialization and dropout are deterministic.
    tf.random.set_seed(seed)

    # --- 2. Initialize Sequential Model ---
    # The model will be a simple stack of layers.
    model = tf.keras.Sequential()

    # Extract the architecture specification from the config.
    nn_arch = config['nn_architecture']

    # --- 3. Add Input Layer ---
    # Define the shape of the input vectors (1024-dimensional embeddings).
    model.add(tf.keras.layers.Input(shape=(nn_arch['input_dim'],)))

    # --- 4. Dynamically Add Hidden and Output Layers ---
    # Map string identifiers from the config to actual Keras layer classes.
    layer_mapping = {
        'Dense': tf.keras.layers.Dense,
        'Dropout': tf.keras.layers.Dropout
    }

    # Iterate through the layer specifications in the config.
    for layer_spec in nn_arch['layers']:
        layer_type_str = layer_spec['type']
        # Find the corresponding Keras layer class.
        layer_class = layer_mapping.get(layer_type_str)

        if layer_class is None:
            raise ValueError(f"Unsupported layer type in config: '{layer_type_str}'")

        # Create a dictionary of parameters for the layer constructor.
        params = layer_spec.copy()
        del params['type'] # Remove the type key as it's not a layer argument.

        # Add the instantiated layer to the model.
        model.add(layer_class(**params))

    return model

# ------------------------------------------------------------------------------
# Task 8, Step 2: Configure training with balanced class weighting and temporal validation
# ------------------------------------------------------------------------------

def _configure_training_procedure(
    y_train: pd.Series,
    config: Dict[str, Any]
) -> Tuple[tf.keras.optimizers.Optimizer, str, Dict[int, float], List[tf.keras.callbacks.Callback]]:
    """
    Configures all components required for the model training process.

    Purpose:
    This function prepares the optimizer, loss function, class weights, and
    callbacks needed for `model.fit()`. It implements the 'balanced' class
    weighting strategy to counteract label imbalance and sets up early stopping
    to prevent overfitting and select the best model based on the out-of-time
    validation set.

    Inputs:
        y_train (pd.Series): The training labels, used to calculate class weights.
        config (Dict[str, Any]): The 'training_params' sub-dictionary.

    Processes:
    1.  Instantiates the Adam optimizer with the configured learning rate.
    2.  Calculates class weights to give more importance to the minority class.
    3.  Configures the EarlyStopping callback to monitor validation AUC and
        restore the best weights upon completion.

    Outputs:
        A tuple containing:
        - (tf.keras.optimizers.Optimizer): The configured Adam optimizer.
        - (str): The name of the loss function ('BinaryCrossentropy').
        - (Dict[int, float]): The dictionary of class weights.
        - (List[tf.keras.callbacks.Callback]): A list of callbacks to use during training.
    """
    # --- 1. Configure Optimizer and Loss ---
    # Instantiate the Adam optimizer with the specified learning rate.
    optimizer = tf.keras.optimizers.Adam(learning_rate=config['learning_rate'])
    # The loss function is binary cross-entropy for binary classification.
    loss = config['loss_function']

    # --- 2. Calculate Class Weights ---
    # This counteracts the effect of imbalanced labels in the training data.
    class_counts = y_train.value_counts()
    total_samples = len(y_train)

    # Formula for balanced weights: weight = (total_samples / (n_classes * count_for_class))
    weight_for_0 = (total_samples / (2 * class_counts.get(0, 1)))
    weight_for_1 = (total_samples / (2 * class_counts.get(1, 1)))
    class_weight = {0: weight_for_0, 1: weight_for_1}
    print(f"Computed class weights: {class_weight}")

    # --- 3. Configure Callbacks ---
    # Early stopping prevents overfitting and saves the best version of the model.
    early_stopping_callback = tf.keras.callbacks.EarlyStopping(
        monitor=config['early_stopping_metric'], # e.g., 'val_auc'
        patience=config['early_stopping_patience'], # e.g., 5 epochs
        mode='max', # We want to maximize the area under the curve.
        restore_best_weights=True, # CRITICAL: ensures the final model has the best weights.
        verbose=1
    )
    callbacks = [early_stopping_callback]

    return optimizer, loss, class_weight, callbacks

# ------------------------------------------------------------------------------
# Task 8, Step 3: Train, validate, and persist the model
# ------------------------------------------------------------------------------

def _train_and_persist_model(
    model: tf.keras.Model,
    X_train: np.ndarray,
    y_train: pd.Series,
    X_val: np.ndarray,
    y_val: pd.Series,
    class_weight: Dict[int, float],
    callbacks: List[tf.keras.callbacks.Callback],
    model_path: str,
    epochs: int = 100,
    batch_size: int = 64
) -> Tuple[tf.keras.callbacks.History, Dict[str, Any]]:
    """
    Executes the model training, evaluates the final model, and saves it to disk.

    Purpose:
    This function encapsulates the core `fit`, `evaluate`, and `save` lifecycle.
    It returns a detailed history of the training process and a summary of the
    final model's performance on the held-out validation set.

    Inputs:
        model (tf.keras.Model): The compiled Keras model.
        X_train, y_train: The training features and labels.
        X_val, y_val: The validation features and labels.
        class_weight (Dict[int, float]): The computed class weights.
        callbacks (List): Keras callbacks, including early stopping.
        model_path (str): The file path where the trained model will be saved.
        epochs (int): The maximum number of training epochs.
        batch_size (int): The number of samples per gradient update.

    Outputs:
        A tuple containing:
        - (tf.keras.callbacks.History): The training history object.
        - (Dict[str, Any]): A dictionary of final validation metrics.
    """
    # --- 1. Train the Model ---
    # The `fit` method executes the training loop.
    history = model.fit(
        X_train,
        y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        class_weight=class_weight,
        callbacks=callbacks,
        verbose=2 # Print one line per epoch.
    )

    # --- 2. Evaluate the Final Model ---
    # Evaluate the model on the validation set (using the best weights restored by the callback).
    print("\nEvaluating final model on the validation set...")
    final_metrics = model.evaluate(X_val, y_val, return_dict=True)

    # Generate a more detailed classification report and confusion matrix.
    y_pred_proba = model.predict(X_val).flatten()
    y_pred_class = (y_pred_proba > 0.5).astype(int)

    report = classification_report(y_val, y_pred_class, output_dict=True)
    conf_matrix = confusion_matrix(y_val, y_pred_class)

    final_metrics['classification_report'] = report
    final_metrics['confusion_matrix'] = conf_matrix.tolist() # Convert to list for serialization

    print(f"Final Validation Metrics: {final_metrics}")

    # --- 3. Persist the Model ---
    # Save the entire model (architecture, weights, optimizer state).
    model.save(model_path)
    print(f"Trained model saved to: {model_path}")

    return history, final_metrics

# ------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# ------------------------------------------------------------------------------

def train_relevance_classifier(
    X_train: np.ndarray,
    y_train: pd.Series,
    X_val: np.ndarray,
    y_val: pd.Series,
    fused_master_input_specification: Dict[str, Any],
    output_path: str
) -> Tuple[str, Dict[str, Any]]:
    """
    Orchestrates the end-to-end training of the MLP relevance classifier.

    This function manages the entire model training workflow: building the
    architecture from the config, configuring the training procedure with best
    practices, executing the training loop, and saving the final artifact.

    Args:
        X_train, y_train: Training features and labels from Task 7.
        X_val, y_val: Validation features and labels from Task 7.
        fused_master_input_specification (Dict[str, Any]): The master config.
        output_path (str): The file path to save the final trained model.

    Returns:
        A tuple containing:
        - (str): The path to the saved model.
        - (Dict[str, Any]): A dictionary containing the training history and
          final validation metrics.
    """
    # Extract relevant configuration sections.
    relevance_config = fused_master_input_specification['master_config']['relevance_model_params']
    training_config = relevance_config['training_params']
    seed = fused_master_input_specification['master_config']['reproducibility']['random_seeds']['global_seed']

    # --- Step 1: Build the model architecture ---
    model = _build_mlp_model(relevance_config, seed)
    model.summary()

    # --- Step 2: Configure the training procedure ---
    optimizer, loss, class_weight, callbacks = _configure_training_procedure(
        y_train, training_config
    )

    # Compile the model, making it ready for training.
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=[tf.keras.metrics.AUC(name='auc'), 'accuracy']
    )

    # --- Step 3: Train, evaluate, and persist the model ---
    history, final_metrics = _train_and_persist_model(
        model=model,
        X_train=X_train,
        y_train=y_train,
        X_val=X_val,
        y_val=y_val,
        class_weight=class_weight,
        callbacks=callbacks,
        model_path=output_path
    )

    # --- 4. Package Results ---
    # Create a comprehensive results dictionary for logging and analysis.
    results = {
        "model_path": output_path,
        "validation_metrics": final_metrics,
        "training_history": history.history
    }

    return output_path, results


In [None]:
# Task 9 — Apply the relevance classifier to the full corpus and select economics-related articles

# ================================================================================================
# Task 9: Apply the relevance classifier to the full corpus and select economics-related articles
# ================================================================================================

# ------------------------------------------------------------------------------
# Task 9, Step 1: Score all articles with the trained relevance classifier
# ------------------------------------------------------------------------------

def _score_all_articles(
    model_path: str,
    embeddings_path: str,
    inference_batch_size: int = 1024
) -> np.ndarray:
    """
    Scores all articles in the corpus using the trained relevance classifier.

    Purpose:
    This function performs large-scale inference, applying the trained MLP
    classifier to every document embedding in the corpus. To handle potentially
    millions of articles without exhausting memory, it reads the embeddings from
    the HDF5 file in batches, runs prediction on each batch, and aggregates the
    resulting probabilities.

    Inputs:
        model_path (str): The file path to the saved Keras model from Task 8.
        embeddings_path (str): The path to the HDF5 file containing the full
                               embedding matrix.
        inference_batch_size (int): The number of embeddings to process in a
                                    single prediction batch.

    Processes:
    1.  Loads the pre-trained Keras model.
    2.  Opens the HDF5 embeddings file for reading.
    3.  Iterates through the embedding dataset in batches of size
        `inference_batch_size`.
    4.  For each batch, calls `model.predict()` to get the relevance probabilities.
    5.  Collects the probabilities from all batches.
    6.  Concatenates the batch results into a single NumPy array.

    Outputs:
        (np.ndarray): A 1D NumPy array of float values (probabilities), where
                      the i-th element is the relevance score for the i-th
                      article in the embedding file.
    """
    # --- 1. Load the trained model ---
    print(f"Loading relevance classifier from: {model_path}")
    model = tf.keras.models.load_model(model_path)

    # --- 2. Perform Batch Inference ---
    # This list will store the probability arrays from each batch.
    all_probabilities = []

    # Open the HDF5 file for reading.
    with h5py.File(embeddings_path, 'r') as f:
        embeddings_dset = f['embeddings']
        num_articles = embeddings_dset.shape[0]

        # Iterate through the dataset in batches.
        for i in tqdm(range(0, num_articles, inference_batch_size), desc="Scoring article relevance"):
            # Read a batch of embeddings from the file.
            batch_embeddings = embeddings_dset[i:i + inference_batch_size]

            # Run prediction on the batch.
            # The `predict` method is highly optimized for this task.
            batch_probs = model.predict(batch_embeddings, batch_size=inference_batch_size)

            # Flatten the output from (batch_size, 1) to (batch_size,) and append.
            all_probabilities.append(batch_probs.flatten())

    # --- 3. Aggregate Results ---
    # Concatenate the list of batch arrays into a single, large NumPy array.
    full_probability_vector = np.concatenate(all_probabilities)

    return full_probability_vector

# ------------------------------------------------------------------------------
# Task 9, Step 2: Threshold probabilities to define the relevant subset R
# ------------------------------------------------------------------------------

def _identify_relevant_indices(
    probabilities: np.ndarray,
    threshold: float
) -> np.ndarray:
    """
    Identifies the indices of relevant articles based on a probability threshold.

    Purpose:
    This function applies the classification rule to the predicted probabilities,
    translating the continuous scores into a binary decision (relevant vs. not
    relevant). It returns the indices of the articles that pass the relevance
    filter.

    Inputs:
        probabilities (np.ndarray): The 1D array of relevance scores for all articles.
        threshold (float): The classification threshold (e.g., 0.5). Articles with
                           a score >= this value are considered relevant.

    Processes:
    1.  Performs a vectorized boolean comparison to create a mask of relevant articles.
    2.  Uses `np.where()` to extract the integer indices of the `True` values in the mask.

    Outputs:
        (np.ndarray): A 1D NumPy array of integer indices corresponding to the
                      rows of the relevant articles in the original embedding file.
    """
    # --- 1. Create a boolean mask ---
    # This is a highly efficient vectorized operation.
    is_relevant_mask = probabilities >= threshold

    # --- 2. Extract indices ---
    # np.where() returns a tuple of arrays; we need the first element.
    relevant_indices = np.where(is_relevant_mask)[0]

    print(f"Identified {len(relevant_indices)} relevant articles "
          f"out of {len(probabilities)} total ({(len(relevant_indices)/len(probabilities)):.2%}).")

    return relevant_indices

# ------------------------------------------------------------------------------
# Task 9, Step 3: Persist relevance scores and audit the filtered corpus
# ------------------------------------------------------------------------------

def _persist_scores_and_audit(
    df_augmented: pd.DataFrame,
    crosswalk_df: pd.DataFrame,
    all_probabilities: np.ndarray,
    relevant_indices: np.ndarray,
    output_dir: str
) -> Tuple[pd.DataFrame, str, Dict[str, Any]]:
    """
    Saves all scores, creates the filtered DataFrame, and generates an audit report.

    Purpose:
    This function finalizes the filtering process by creating and saving the key
    output artifacts: a file containing the relevance score for every article
    (for full transparency), the final filtered DataFrame of relevant articles,
    and a summary audit of the filtered corpus.

    Inputs:
        df_augmented (pd.DataFrame): The full metadata DataFrame.
        crosswalk_df (pd.DataFrame): The table mapping article_id to embedding index.
        all_probabilities (np.ndarray): The relevance scores for all articles.
        relevant_indices (np.ndarray): The indices of the relevant articles.
        output_dir (str): Directory to save the output files.

    Outputs:
        A tuple containing:
        - (pd.DataFrame): The filtered DataFrame (`df_relevant`) containing metadata
          for only the relevant articles.
        - (str): The path to the saved file of all relevance scores.
        - (Dict[str, Any]): An audit dictionary summarizing the filtered corpus.
    """
    # --- 1. Persist All Relevance Scores ---
    # Create a DataFrame to hold the scores, indexed by the embedding row index.
    scores_df = pd.DataFrame(
        {'p_econ': all_probabilities},
        index=pd.Index(range(len(all_probabilities)), name='embedding_row_index')
    )
    # Merge with the crosswalk to link scores to article_ids.
    full_scores_df = crosswalk_df.merge(scores_df, on='embedding_row_index')

    # Save to a performant file format like Feather or Parquet.
    scores_path = os.path.join(output_dir, 'relevance_scores.feather')
    full_scores_df[['article_id', 'p_econ']].to_feather(scores_path)
    print(f"All relevance scores saved to: {scores_path}")

    # --- 2. Create the Filtered DataFrame of Relevant Articles ---
    # Get the article_ids of the relevant articles using the indices.
    relevant_article_ids = crosswalk_df.loc[relevant_indices, 'article_id'].values

    # Filter the main metadata DataFrame to keep only these articles.
    # Using .isin() on a set is highly efficient.
    df_relevant = df_augmented[df_augmented['article_id'].isin(set(relevant_article_ids))].copy()

    # --- 3. Generate Audit Report ---
    # Create a summary of the final, relevant corpus.
    audit_report = {
        'total_relevant_articles': len(df_relevant),
        'relevant_articles_by_language': df_relevant['language'].value_counts().to_dict(),
        'monthly_relevant_article_counts': df_relevant['year_month'].value_counts().sort_index().to_dict()
    }

    print("--- Relevance Filter Audit ---")
    print(f"Total relevant articles: {audit_report['total_relevant_articles']}")
    print(f"Breakdown by language: {audit_report['relevant_articles_by_language']}")

    return df_relevant, scores_path, audit_report

# ------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# ------------------------------------------------------------------------------

def filter_corpus_by_relevance(
    df_augmented: pd.DataFrame,
    model_path: str,
    embeddings_path: str,
    crosswalk_path: str,
    fused_master_input_specification: Dict[str, Any],
    output_directory: str
) -> Tuple[pd.DataFrame, str, Dict[str, Any]]:
    """
    Orchestrates the application of the relevance classifier to the full corpus.

    This function manages the end-to-end workflow of scoring all articles for
    relevance, applying a threshold to filter them, and saving the resulting
    artifacts and audit reports.

    Args:
        df_augmented (pd.DataFrame): The full, cleansed, and augmented corpus metadata.
        model_path (str): Path to the trained relevance classifier model.
        embeddings_path (str): Path to the HDF5 file of all embeddings.
        crosswalk_path (str): Path to the CSV crosswalk table.
        fused_master_input_specification (Dict[str, Any]): The master config.
        output_directory (str): Directory to save output artifacts.

    Returns:
        A tuple containing:
        - (pd.DataFrame): A new DataFrame containing metadata for only the
          articles classified as relevant.
        - (str): The path to the file containing the relevance scores for all articles.
        - (Dict[str, Any]): A final audit report on the filtered corpus.
    """
    # Extract the classification threshold from the configuration.
    threshold = fused_master_input_specification['master_config']['relevance_model_params']['classification_threshold']

    # --- Step 1: Score all articles using the trained model ---
    all_probabilities = _score_all_articles(model_path, embeddings_path)

    # --- Step 2: Identify indices of relevant articles ---
    relevant_indices = _identify_relevant_indices(all_probabilities, threshold)

    # --- Step 3: Persist scores, create filtered DataFrame, and audit ---
    crosswalk_df = pd.read_csv(crosswalk_path)
    df_relevant, scores_path, audit_report = _persist_scores_and_audit(
        df_augmented=df_augmented,
        crosswalk_df=crosswalk_df,
        all_probabilities=all_probabilities,
        relevant_indices=relevant_indices,
        output_dir=output_directory
    )

    return df_relevant, scores_path, audit_report


In [None]:
# Task 10 — Generate 256 synthetic articles via Claude 3.5 Sonnet using exact Appendix prompts

# =============================================================================================
# Task 10: Generate 256 synthetic articles via Claude 3.5 Sonnet using exact Appendix prompts
# =============================================================================================

# ------------------------------------------------------------------------------
# Task 10, Step 1: Configure LLM API and fix all generation parameters
# ------------------------------------------------------------------------------

def _call_llm_api(
    client: Anthropic,
    prompt: str,
    llm_config: Dict[str, Any],
    max_retries: int = 3,
    initial_backoff: float = 2.0
) -> str:
    """
    Calls the Anthropic API with a given prompt and handles transient errors.

    Purpose:
    This function encapsulates a single, robust call to the LLM API. It constructs
    the request payload from the configuration, sends the request, and implements
    a retry mechanism with exponential backoff to gracefully handle common
    transient issues like rate limiting or network errors.

    Inputs:
        client (Anthropic): An initialized Anthropic API client.
        prompt (str): The full text of the prompt to send to the LLM.
        llm_config (Dict[str, Any]): The 'llm_config' sub-dictionary containing
                                     model identifier and generation parameters.
        max_retries (int): The maximum number of times to retry a failed API call.
        initial_backoff (float): The initial delay (in seconds) for the retry mechanism.

    Outputs:
        (str): The text content of the LLM's response.

    Error Handling:
        Raises RuntimeError: If the API call fails after all retry attempts.
    """
    # Extract generation parameters from the configuration.
    generation_params = llm_config['generation_params']

    # The 'seed' parameter is experimental and might not be supported by all models.
    # We include it only if it's present in the config.
    api_kwargs = {
        "model": llm_config['model_identifier'],
        "max_tokens": generation_params['max_tokens'],
        "temperature": generation_params['temperature'],
        "messages": [{"role": "user", "content": prompt}]
    }
    if 'seed' in generation_params:
        api_kwargs['seed'] = generation_params['seed']

    # --- Retry Logic ---
    # This loop makes the generation process resilient to transient network/API issues.
    for attempt in range(max_retries):
        try:
            # Make the API call to the messages endpoint.
            response = client.messages.create(**api_kwargs)
            # The response content is in the first block of the content list.
            return response.content[0].text
        except APIError as e:
            # If an API error occurs, log it and prepare to retry.
            print(f"API Error on attempt {attempt + 1}/{max_retries}: {e}. Retrying...")
            # If this was the last attempt, re-raise the error as a RuntimeError.
            if attempt == max_retries - 1:
                raise RuntimeError(f"API call failed after {max_retries} attempts.") from e
            # Wait for an exponentially increasing amount of time before the next retry.
            time.sleep(initial_backoff * (2 ** attempt))

    # This line should be unreachable but is included for logical completeness.
    raise RuntimeError("Exited retry loop unexpectedly.")

# ---------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2: Generate synthetic articles using the Appendix financial-markets prompt and domain-parameterized variants
# ---------------------------------------------------------------------------------------------------------------------------

def _execute_generation_plan(
    client: Anthropic,
    config: Dict[str, Any],
    output_path: str
) -> pd.DataFrame:
    """
    Generates the full synthetic dataset according to a predefined plan.

    Purpose:
    This function orchestrates the entire generation process. It determines how
    many articles are needed, constructs the precise prompts for each, calls the
    LLM via a helper function, and saves the results incrementally to ensure
    resumability and prevent data loss.

    Inputs:
        client (Anthropic): The initialized Anthropic API client.
        config (Dict[str, Any]): The 'sentiment_model_params' sub-dictionary.
        output_path (str): The path to the CSV file where results are saved.

    Processes:
    1.  Defines the generation plan (number of articles per domain/sentiment).
    2.  Checks if a partial results file exists at `output_path` to resume.
    3.  Iterates through the remaining generation tasks.
    4.  For each task, constructs the appropriate prompt using templates.
    5.  Calls the robust `_call_llm_api` function to get the generated text.
    6.  Parses the response (assuming the LLM provides multiple articles as requested).
    7.  Appends the new articles to the results list and saves to disk.

    Outputs:
        (pd.DataFrame): A DataFrame containing the complete set of 256 synthetic articles.
    """
    # Extract relevant configuration details.
    synth_config = config['synthetic_dataset_config']
    llm_config = config['llm_config']

    # --- 1. Define the Generation Plan ---
    # Calculate how many articles are needed for each combination of domain and sentiment.
    num_per_sentiment = synth_config['num_articles_total'] // 2
    num_domains = len(synth_config['domain_coverage'])
    num_per_combo = num_per_sentiment // num_domains

    # Create a list of all generation tasks to be completed.
    generation_plan = []
    for domain in synth_config['domain_coverage']:
        for sentiment in ['positive', 'negative']:
            generation_plan.extend([(domain, sentiment)] * num_per_combo)

    # --- 2. Handle Resumability ---
    # Check if a partial file exists to resume from a previous run.
    if os.path.exists(output_path):
        results_df = pd.read_csv(output_path)
        print(f"Resuming generation. Found {len(results_df)} existing articles.")
    else:
        results_df = pd.DataFrame()

    # Filter the plan to only include tasks that have not yet been completed.
    if not results_df.empty:
        completed_counts = results_df.groupby(['domain', 'sentiment_label']).size()

        remaining_plan = []
        for domain, sentiment in generation_plan:
            # Convert sentiment to label (1 for positive, 0 for negative)
            label = 1 if sentiment == 'positive' else 0
            completed = completed_counts.get((domain, label), 0)
            if completed > 0:
                completed_counts.loc[(domain, label)] -= 1
            else:
                remaining_plan.append((domain, sentiment))
        generation_plan = remaining_plan

    # --- 3. Iterative Generation Loop ---
    generated_records = results_df.to_dict('records')

    # The prompt asks for 3 positive and 3 negative articles, so we process in chunks.
    # For simplicity here, we generate one article per API call.
    for i, (domain, sentiment) in enumerate(tqdm(generation_plan, desc="Generating Synthetic Articles")):
        # Construct the prompt for the current task.
        if domain == "financial_markets":
            prompt = llm_config['appendix_prompt_financial_markets_en']
        else:
            template = llm_config['topic_parameterized_prompt_template_en']
            prompt = template.format(AREA=domain)

        # Modify prompt to ask for a single article for simplicity and control.
        single_article_prompt = (
            f"Please generate one single business article with a clearly '{sentiment}' outlook "
            f"on the topic of '{domain}'.\n"
            f"Follow all the style and content requirements from this reference prompt:\n\n---\n\n{prompt}"
        )

        # Call the API to generate the text.
        generated_text = _call_llm_api(client, single_article_prompt, llm_config)

        # Store the result with its metadata.
        generated_records.append({
            'synthetic_article_id': str(uuid.uuid4()),
            'domain': domain,
            'sentiment_label': 1 if sentiment == 'positive' else 0,
            'full_text': generated_text.strip(),
            'prompt_template': 'appendix' if domain == 'financial_markets' else 'parameterized',
        })

        # Periodically save results to disk to prevent data loss.
        if (i + 1) % 5 == 0 or (i + 1) == len(generation_plan):
            pd.DataFrame(generated_records).to_csv(output_path, index=False)

    return pd.DataFrame(generated_records)

# ------------------------------------------------------------------------------
# Task 10, Step 3: Validate and curate synthetic outputs
# ------------------------------------------------------------------------------

def _validate_synthetic_corpus(
    df: pd.DataFrame,
    word_count_range: Tuple[int, int] = (350, 550)
) -> List[str]:
    """
    Validates the generated synthetic corpus for quality and uniqueness.

    Purpose:
    This function serves as a quality assurance check on the LLM's output. It
    verifies that the generated articles meet the length requirements and that
    there are no exact duplicates in the dataset.

    Inputs:
        df (pd.DataFrame): The DataFrame of generated synthetic articles.
        word_count_range (Tuple[int, int]): The acceptable min/max word count.

    Outputs:
        (List[str]): A list of validation issue descriptions. An empty list
                     indicates the corpus passed all checks.
    """
    issues = []

    # --- 1. Word Count Validation ---
    # Calculate word count for each article.
    df['word_count'] = df['full_text'].str.split().str.len()
    # Find articles outside the acceptable range.
    out_of_range = df[~df['word_count'].between(word_count_range[0], word_count_range[1])]
    if not out_of_range.empty:
        for _, row in out_of_range.iterrows():
            issues.append(
                f"Validation Warning: Article '{row['synthetic_article_id']}' has "
                f"word count of {row['word_count']}, outside range {word_count_range}."
            )

    # --- 2. Duplicate Validation ---
    # Use the same fingerprinting logic from Task 4 to find exact duplicates.
    # fingerprints = _compute_fingerprint(df['full_text'])
    # Note: Assuming _compute_fingerprint is imported. For standalone, we replicate:
    fingerprints = df['full_text'].str.lower().str.replace(r'\s+', ' ', regex=True).str.strip().apply(
        lambda x: hashlib.sha256(x.encode('utf-8')).hexdigest() if pd.notna(x) else None
    )

    if fingerprints.duplicated().any():
        num_duplicates = fingerprints.duplicated().sum()
        issues.append(f"Validation Error: Found {num_duplicates} duplicate articles based on content fingerprint.")

    return issues

# ------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# ------------------------------------------------------------------------------

def generate_synthetic_articles(
    fused_master_input_specification: Dict[str, Any],
    output_path: str
) -> str:
    """
    Orchestrates the end-to-end generation of the synthetic article dataset.

    This function manages the entire workflow: setting up the API client,
    executing a resumable generation plan, and validating the final output for
    quality and uniqueness.

    Args:
        fused_master_input_specification (Dict[str, Any]): The master config.
        output_path (str): The file path to save the final synthetic corpus CSV.

    Returns:
        (str): The path to the validated synthetic corpus file.

    Raises:
        ValueError: If the final generated corpus fails validation checks.
        ImportError: If the ANTHROPIC_API_KEY environment variable is not set.
    """
    # --- 1. Configure API Client ---
    # Retrieve API key from environment variables for security.
    api_key = os.environ.get("ANTHROPIC_API_KEY")
    if not api_key:
        raise ImportError("ANTHROPIC_API_KEY environment variable not set.")
    client = Anthropic(api_key=api_key)

    # Extract the relevant configuration section.
    config = fused_master_input_specification['master_config']['sentiment_model_params']

    # --- 2. Execute Generation Plan ---
    # This function handles the core generation loop and is resumable.
    synthetic_df = _execute_generation_plan(client, config, output_path)

    # --- 3. Validate Final Corpus ---
    # Perform final quality checks on the complete dataset.
    validation_issues = _validate_synthetic_corpus(synthetic_df)

    if validation_issues:
        # If issues are found, print them and raise an error.
        print("--- Synthetic Corpus Validation Failed ---")
        for issue in validation_issues:
            print(issue)
        raise ValueError(
            "Generated synthetic corpus failed validation. Please inspect the "
            f"output file at '{output_path}', correct the issues, and re-run."
        )

    print(f"Synthetic corpus successfully generated and validated. Saved to: {output_path}")

    return output_path


In [None]:
# Task 11 — Embed the synthetic articles and perform UMAP diagnostic visualization

# ================================================================================
# Task 11: Embed the synthetic articles and perform UMAP diagnostic visualization
# ================================================================================

# ------------------------------------------------------------------------------
# Task 11, Step 1: Tokenize and embed synthetic texts with the same model
# ------------------------------------------------------------------------------

def _embed_synthetic_articles(
    synthetic_df: pd.DataFrame,
    model_name: str
) -> np.ndarray:
    """
    Generates embeddings for the synthetic articles using the specified model.

    Purpose:
    This function is responsible for converting the text of the 256 synthetic
    articles into high-dimensional vectors. It is CRITICAL that this function
    uses the exact same embedding model ('jina-embeddings-v3') as was used for
    the main corpus to ensure that both real and synthetic articles reside in the
    same semantic vector space.

    Inputs:
        synthetic_df (pd.DataFrame): The DataFrame containing the generated
                                     synthetic articles, including a 'full_text' column.
        model_name (str): The identifier of the SentenceTransformer model to use.

    Processes:
    1.  Determines the optimal computation device (GPU/CPU).
    2.  Loads the specified SentenceTransformer model.
    3.  Extracts the list of texts from the DataFrame.
    4.  Calls the model's `encode` method to generate embeddings for all texts
        in a single, efficient batch.

    Outputs:
        (np.ndarray): A NumPy array of shape (256, 1024) containing the
                      document embeddings for the synthetic articles. The order
                      of rows corresponds to the order of articles in the input DataFrame.
    """
    # --- 1. Setup Device and Model ---
    # Ensure consistency by using the same device logic as the main embedding task.
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Loading embedding model '{model_name}' onto device '{device}'.")

    # Load the specified SentenceTransformer model.
    model = SentenceTransformer(model_name, device=device)

    # --- 2. Prepare Texts ---
    # Extract the 'full_text' column into a list for the encode method.
    texts_to_embed = synthetic_df['full_text'].tolist()

    # --- 3. Generate Embeddings ---
    # Since the dataset is small (256 articles), we can encode it in one go.
    print("Generating embeddings for synthetic articles...")
    synthetic_embeddings = model.encode(
        texts_to_embed,
        batch_size=len(texts_to_embed), # Process all in one batch
        show_progress_bar=True,
        convert_to_numpy=True,
        normalize_embeddings=False
    )

    return synthetic_embeddings

# ------------------------------------------------------------------------------
# Task 11, Step 2: Validate embedding consistency with real article embeddings
# ------------------------------------------------------------------------------

def _validate_embedding_consistency(
    synthetic_embeddings: np.ndarray,
    real_embedding_stats: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    Performs a sanity check on the quality and distribution of synthetic embeddings.

    Purpose:
    This function provides a quick diagnostic to catch gross errors in the
    embedding process. It checks for numerical integrity (no NaNs/infs) and
    compares summary statistics of the synthetic embeddings against those of the
    real corpus embeddings (if provided) to flag major distributional discrepancies.

    Inputs:
        synthetic_embeddings (np.ndarray): The (256, 1024) array of embeddings.
        real_embedding_stats (Optional[Dict]): A dictionary of summary stats from
                                               the real corpus (from Task 6 diagnostics).

    Outputs:
        (Dict[str, Any]): A dictionary containing the summary statistics of the
                          synthetic embeddings.

    Error Handling:
        Raises ValueError: If any non-finite values (NaN or infinity) are found.
    """
    # --- 1. Numerical Integrity Check ---
    # This is a critical check before any further computation.
    if not np.all(np.isfinite(synthetic_embeddings)):
        raise ValueError("Synthetic embeddings contain non-finite values (NaN or infinity).")

    # --- 2. Compute Summary Statistics ---
    mean_vec = np.mean(synthetic_embeddings, axis=0)
    std_vec = np.std(synthetic_embeddings, axis=0)

    summary_stats = {
        'num_embeddings': synthetic_embeddings.shape[0],
        'embedding_dim': synthetic_embeddings.shape[1],
        'global_mean_norm': np.linalg.norm(mean_vec),
        'global_std_mean': np.mean(std_vec),
        'min_value': np.min(synthetic_embeddings),
        'max_value': np.max(synthetic_embeddings),
    }

    # --- 3. Comparative Reporting (if stats are provided) ---
    if real_embedding_stats:
        print("\n--- Embedding Consistency Check ---")
        print(f"{'Metric':<20} | {'Synthetic':<15} | {'Real Corpus':<15}")
        print("-" * 55)
        for key in ['global_mean_norm', 'global_std_mean', 'min_value', 'max_value']:
            synth_val = summary_stats.get(key, 0)
            real_val = real_embedding_stats.get(key, 0)
            print(f"{key:<20} | {synth_val:<15.4f} | {real_val:<15.4f}")
        print("-" * 55)

    return summary_stats

# ---------------------------------------------------------------------------------------
# Task 11, Step 3: (Optional diagnostic) Perform UMAP projection for Chart 2 replication
# ---------------------------------------------------------------------------------------

def _create_umap_visualization(
    synthetic_embeddings: np.ndarray,
    synthetic_df: pd.DataFrame,
    umap_params: Dict[str, Any],
    output_path: str
) -> None:
    """
    Creates and saves an annotated UMAP visualization of the synthetic embeddings.

    Purpose:
    This function replicates Chart 2 from the paper. It uses the UMAP algorithm
    to project the high-dimensional embeddings into a 2D space, allowing for
    visual inspection of the separation between positive and negative sentiment
    clusters. Crucially, it annotates the plot with text snippets from
    representative articles to provide qualitative context for the clusters,
    offering tangible evidence that the model is capturing semantic sentiment.

    Inputs:
        synthetic_embeddings (np.ndarray): The (256, 1024) embedding matrix.
        synthetic_df (pd.DataFrame): The DataFrame of synthetic articles, containing
                                     'sentiment_label' and 'full_text'.
        umap_params (Dict[str, Any]): Parameters for the UMAP reducer, including
                                      'random_state' for reproducibility.
        output_path (str): The file path to save the generated plot (e.g., '.png').

    Processes:
    1.  Initializes the UMAP reducer with a fixed random state for reproducibility.
    2.  Fits the reducer to the embeddings and transforms them into a 2D space.
    3.  Creates a scatter plot of the 2D points, colored by sentiment.
    4.  Selects one positive and one negative example article.
    5.  Adds styled text annotations with snippets from these
        articles, pointing to their corresponding locations in the plot.
    6.  Saves the final, publication-quality figure to disk.
    """
    # --- Input Validation ---
    if not isinstance(synthetic_embeddings, np.ndarray) or synthetic_embeddings.ndim != 2:
        raise TypeError("`synthetic_embeddings` must be a 2D NumPy array.")
    if not isinstance(synthetic_df, pd.DataFrame):
        raise TypeError("`synthetic_df` must be a pandas DataFrame.")
    if len(synthetic_embeddings) != len(synthetic_df):
        raise ValueError("Mismatch between number of embeddings and number of articles.")

    print("Performing UMAP projection for diagnostic visualization...")
    # --- 1. Initialize and Fit UMAP ---
    # Using the random_state is critical for a reproducible plot.
    reducer = umap.UMAP(
        n_neighbors=umap_params.get('n_neighbors', 15),
        min_dist=umap_params.get('min_dist', 0.1),
        n_components=2,
        metric='cosine', # Cosine distance is often effective for text embeddings.
        random_state=umap_params['random_state']
    )

    # Fit the reducer to the data and transform it to 2D.
    embedding_2d = reducer.fit_transform(synthetic_embeddings)

    # --- 2. Create the Plotting DataFrame ---
    # Combine 2D coordinates with labels for easy plotting.
    plot_df = pd.DataFrame(embedding_2d, columns=['UMAP_1', 'UMAP_2'])
    plot_df['Sentiment'] = synthetic_df['sentiment_label'].map({0: 'Negative Outlook', 1: 'Positive Outlook'})

    # --- 3. Create the Scatter Plot ---
    # Set up the plot aesthetics for a professional look.
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=(12, 10))

    # Create the main scatter plot.
    sns.scatterplot(
        data=plot_df,
        x='UMAP_1',
        y='UMAP_2',
        hue='Sentiment',
        palette={'Negative Outlook': '#d62728', 'Positive Outlook': '#2ca02c'}, # Red/Green
        ax=ax,
        s=50,
        alpha=0.8,
        edgecolor='k',
        linewidth=0.5
    )

    # --- 4. Add Annotations ---
    # This new section adds the text snippets to replicate Chart 2.

    # Deterministically select one positive and one negative example.
    pos_example = synthetic_df[synthetic_df['sentiment_label'] == 1].iloc[0]
    neg_example = synthetic_df[synthetic_df['sentiment_label'] == 0].iloc[0]

    # Get their corresponding 2D coordinates.
    pos_coords = embedding_2d[pos_example.name]
    neg_coords = embedding_2d[neg_example.name]

    # Create text snippets.
    pos_snippet = ' '.join(pos_example['full_text'].split()[:20]) + '...'
    neg_snippet = ' '.join(neg_example['full_text'].split()[:20]) + '...'

    # Define styles for the annotation boxes.
    bbox_props = dict(boxstyle="round,pad=0.5", fc="ivory", ec="gray", lw=1, alpha=0.9)
    arrow_props = dict(arrowstyle="->", connectionstyle="arc3,rad=0.1", ec="black")

    # Add the annotation for the positive example.
    ax.annotate(
        pos_snippet,
        xy=pos_coords,
        xytext=(pos_coords[0] - 3, pos_coords[1] + 2), # Manual offset for placement
        bbox=bbox_props,
        arrowprops=arrow_props,
        fontsize=9,
        wrap=True,
        ha='center'
    )

    # Add the annotation for the negative example.
    ax.annotate(
        neg_snippet,
        xy=neg_coords,
        xytext=(neg_coords[0] + 3, neg_coords[1] - 2), # Manual offset for placement
        bbox=bbox_props,
        arrowprops=arrow_props,
        fontsize=9,
        wrap=True,
        ha='center'
    )

    # --- 5. Finalize and Save ---
    # Set plot titles and labels for clarity.
    ax.set_title("UMAP Projection of Synthetic Articles' Embeddings", fontsize=16, fontweight='bold')
    ax.set_xlabel("UMAP Dimension 1", fontsize=12)
    ax.set_ylabel("UMAP Dimension 2", fontsize=12)
    ax.legend(title='Economic Outlook', title_fontsize='13', fontsize='11')

    # Save the figure to the specified path with high resolution.
    fig.savefig(output_path, dpi=300, bbox_inches='tight')
    print(f"UMAP visualization with annotations saved to: {output_path}")
    # Close the figure to free up memory.
    plt.close(fig)

# ------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# ------------------------------------------------------------------------------

def process_synthetic_embeddings(
    synthetic_corpus_path: str,
    fused_master_input_specification: Dict[str, Any],
    output_directory: str,
    real_embedding_diagnostics: Optional[Dict[str, Any]] = None
) -> Tuple[np.ndarray, pd.Series, str]:
    """
    Orchestrates the embedding and visualization of the synthetic corpus.

    This function manages the workflow of loading the synthetic articles,
    generating their embeddings using the same model as the main corpus,
    validating their quality, and producing a diagnostic UMAP visualization.

    Args:
        synthetic_corpus_path (str): Path to the CSV of synthetic articles.
        fused_master_input_specification (Dict[str, Any]): The master config.
        output_directory (str): Directory to save the UMAP plot.
        real_embedding_diagnostics (Optional[Dict]): The diagnostic report from
            Task 6, used for comparative validation.

    Returns:
        A tuple containing:
        - (np.ndarray): The (256, 1024) matrix of synthetic embeddings (Z).
        - (pd.Series): The corresponding sentiment labels (y).
        - (str): The path to the saved UMAP visualization plot.
    """
    # --- 1. Load Data ---
    # Load the previously generated synthetic articles.
    synthetic_df = pd.read_csv(synthetic_corpus_path)

    # Extract configuration parameters.
    embedding_config = fused_master_input_specification['master_config']['embedding_params']
    umap_config = fused_master_input_specification['master_config']['umap_diagnostic_params']

    # --- Step 1: Generate Embeddings ---
    # Ensure the same model is used as for the main corpus.
    synthetic_embeddings = _embed_synthetic_articles(
        synthetic_df,
        embedding_config['model_name']
    )

    # --- Step 2: Validate Embedding Quality ---
    # Perform a sanity check on the generated embeddings.
    _validate_embedding_consistency(
        synthetic_embeddings,
        real_embedding_diagnostics.get('summary_stats') if real_embedding_diagnostics else None
    )

    # --- Step 3: Create UMAP Visualization ---
    # Define the output path for the plot.
    os.makedirs(output_directory, exist_ok=True)
    umap_plot_path = os.path.join(output_directory, 'chart2_umap_synthetic_embeddings.png')

    # Generate and save the plot.
    _create_umap_visualization(
        synthetic_embeddings=synthetic_embeddings,
        labels=synthetic_df['sentiment_label'],
        umap_params=umap_config,
        output_path=umap_plot_path
    )

    # --- 4. Prepare and Return Final Outputs ---
    # The labels corresponding to the embeddings.
    synthetic_labels = synthetic_df['sentiment_label']

    return synthetic_embeddings, synthetic_labels, umap_plot_path


In [None]:
# Task 12 — Train L2-regularized logistic regression on synthetic embeddings via cross-validation

# ===============================================================================================
# Task 12: Train L2-regularized logistic regression on synthetic embeddings via cross-validation
# ===============================================================================================

# -------------------------------------------------------------------------------------
# Task 12, Step 2 (Combined with Step 1): Perform cross-validation to select optimal C
# -------------------------------------------------------------------------------------

def _find_optimal_regularization(
    X: np.ndarray,
    y: pd.Series,
    config: Dict[str, Any],
    seed: int
) -> Tuple[float, Dict[str, Any]]:
    """
    Performs stratified K-fold cross-validation to find the optimal C.

    Purpose:
    In a high-dimensional setting (p > n), regularization is essential to prevent
    overfitting. This function systematically searches for the best regularization
    strength by evaluating model performance across a grid of `C` values (where
    C is the inverse of the regularization strength lambda). It uses stratified
    K-fold cross-validation to ensure that each fold maintains the same class
    balance as the overall dataset, which is critical for reliable performance
    estimation on a small, balanced dataset.

    Inputs:
        X (np.ndarray): The (256, 1024) matrix of synthetic embeddings.
        y (pd.Series): The (256,) series of binary sentiment labels.
        config (Dict[str, Any]): The 'classifier_config' sub-dictionary.
        seed (int): The global random seed for reproducible shuffling in CV.

    Processes:
    1.  Initializes a LogisticRegression model with L2 penalty as specified.
    2.  Defines the hyperparameter grid for 'C' from the config.
    3.  Sets up a StratifiedKFold cross-validator for robust evaluation.
    4.  Initializes and fits GridSearchCV to perform the exhaustive search.
    5.  Extracts the best hyperparameter and the full cross-validation results.

    Outputs:
        A tuple containing:
        - (float): The optimal value for the hyperparameter C.
        - (Dict[str, Any]): A dictionary containing the detailed cross-validation
          results from GridSearchCV.
    """
    # --- 1. Initialize the Model (Step 1 logic) ---
    # The model is L2-regularized (Ridge) logistic regression.
    # The solver 'lbfgs' is efficient and supports L2 penalty.
    # The random_state ensures deterministic behavior if the solver uses randomness.
    log_reg = LogisticRegression(
        penalty=config['penalty'],
        solver=config['solver'],
        random_state=seed,
        max_iter=1000 # Increase max_iter for convergence on high-dim data
    )

    # --- 2. Define the Cross-Validation Strategy ---
    # Stratified K-Fold is essential for maintaining class balance in each fold.
    # Shuffling with a random_state ensures the fold splits are reproducible.
    cv_strategy = StratifiedKFold(
        n_splits=config['cross_validation_folds'],
        shuffle=True,
        random_state=seed
    )

    # --- 3. Set up and Run Grid Search ---
    # GridSearchCV automates the process of training and evaluating the model
    # for each hyperparameter combination across all CV folds.
    print("Starting GridSearchCV to find optimal regularization strength 'C'...")
    grid_search = GridSearchCV(
        estimator=log_reg,
        param_grid=config['hyperparameter_grid'],
        cv=cv_strategy,
        scoring='neg_log_loss', # Log-loss is a proper scoring rule for probabilistic models.
        verbose=1,
        n_jobs=-1 # Use all available CPU cores.
    )

    # Fit the grid search to the synthetic data.
    grid_search.fit(X, y)

    # --- 4. Extract and Report Results ---
    # The best hyperparameter found during the search.
    optimal_c = grid_search.best_params_['C']
    print(f"GridSearchCV complete. Optimal C = {optimal_c}")
    print(f"Best cross-validation score (neg_log_loss): {grid_search.best_score_:.4f}")

    return optimal_c, grid_search.cv_results_

# ------------------------------------------------------------------------------------------
# Task 12, Step 3: Fit the final model on the full synthetic dataset and persist parameters
# ------------------------------------------------------------------------------------------

def _fit_and_persist_final_model(
    X: np.ndarray,
    y: pd.Series,
    optimal_c: float,
    config: Dict[str, Any],
    seed: int,
    output_path: str
) -> LogisticRegression:
    """
    Fits the final logistic regression model and saves it to disk.

    Purpose:
    After identifying the optimal hyperparameter via cross-validation, this
    function trains a new model on the *entire* synthetic dataset. This approach
    leverages all available data to obtain the best possible estimates for the
    model's coefficients. The final, trained model object is then serialized
    to disk for later use in the inference pipeline.

    Inputs:
        X (np.ndarray): The full (256, 1024) matrix of synthetic embeddings.
        y (pd.Series): The full (256,) series of binary sentiment labels.
        optimal_c (float): The best value for C found via cross-validation.
        config (Dict[str, Any]): The 'classifier_config' sub-dictionary.
        seed (int): The global random seed for reproducibility.
        output_path (str): The file path to save the trained model.

    Outputs:
        (LogisticRegression): The trained scikit-learn model object.
    """
    # --- 1. Initialize the Final Model ---
    # Instantiate the model with the exact same parameters as before, but now
    # with the optimal 'C' value determined by the grid search.
    final_model = LogisticRegression(
        penalty=config['penalty'],
        solver=config['solver'],
        C=optimal_c,
        random_state=seed,
        max_iter=1000
    )

    # --- 2. Fit on the Full Dataset ---
    # Train the model on all 256 synthetic samples.
    print(f"Fitting final model on the full synthetic dataset with C={optimal_c}...")
    final_model.fit(X, y)
    print("Final model fitting complete.")

    # --- 3. Persist the Model ---
    # Save the trained model object to the specified path using joblib.
    # This is the standard and most robust way to save scikit-learn models.
    joblib.dump(final_model, output_path)
    print(f"Final sentiment classifier model saved to: {output_path}")

    return final_model

# ------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# ------------------------------------------------------------------------------

def train_sentiment_classifier(
    synthetic_embeddings: np.ndarray,
    synthetic_labels: pd.Series,
    fused_master_input_specification: Dict[str, Any],
    output_path: str
) -> Tuple[str, Dict[str, Any]]:
    """
    Orchestrates the training of the L2-regularized sentiment classifier.

    This function manages the complete workflow for training the sentiment model:
    1. It performs a cross-validated grid search to find the optimal
       regularization strength.
    2. It then trains a final model on the entire synthetic dataset using this
       optimal hyperparameter.
    3. It saves the final trained model to disk for downstream inference tasks.

    Args:
        synthetic_embeddings (np.ndarray): The (256, 1024) matrix of embeddings.
        synthetic_labels (pd.Series): The (256,) series of sentiment labels.
        fused_master_input_specification (Dict[str, Any]): The master config.
        output_path (str): The file path to save the final trained model.

    Returns:
        A tuple containing:
        - (str): The path to the saved model file.
        - (Dict[str, Any]): A results dictionary containing the optimal 'C' and
          the detailed cross-validation results.
    """
    # --- Input Validation ---
    if not isinstance(synthetic_embeddings, np.ndarray) or synthetic_embeddings.shape != (256, 1024):
        raise ValueError("`synthetic_embeddings` must be a NumPy array of shape (256, 1024).")
    if not isinstance(synthetic_labels, pd.Series) or len(synthetic_labels) != 256:
        raise ValueError("`synthetic_labels` must be a pandas Series of length 256.")

    # --- Extract Configuration ---
    classifier_config = fused_master_input_specification['master_config']['sentiment_model_params']['classifier_config']
    seed = fused_master_input_specification['master_config']['reproducibility']['random_seeds']['global_seed']

    # --- Step 1 & 2: Find Optimal Regularization via CV ---
    optimal_c, cv_results = _find_optimal_regularization(
        X=synthetic_embeddings,
        y=synthetic_labels,
        config=classifier_config,
        seed=seed
    )

    # --- Step 3: Fit and Persist the Final Model ---
    _fit_and_persist_final_model(
        X=synthetic_embeddings,
        y=synthetic_labels,
        optimal_c=optimal_c,
        config=classifier_config,
        seed=seed,
        output_path=output_path
    )

    # --- 4. Package and Return Results ---
    # Create a comprehensive results dictionary for logging and analysis.
    results = {
        "model_path": output_path,
        "optimal_c": optimal_c,
        "cv_results": {k: v.tolist() for k, v in cv_results.items()} # Convert numpy arrays for serialization
    }

    return output_path, results


In [None]:
# Task 13 — Score all relevant real articles with the trained sentiment model

# ==============================================================================
# Task 13: Score all relevant real articles with the trained sentiment model
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 13, Steps 1 & 2: Load model, retrieve embeddings, and compute probabilities
# ------------------------------------------------------------------------------

def _run_sentiment_inference_in_batches(
    model: LogisticRegression,
    embeddings_path: str,
    relevant_indices: np.ndarray,
    inference_batch_size: int = 16384
) -> np.ndarray:
    """
    Scores relevant article embeddings in batches using the trained sentiment model.

    Purpose:
    This function applies the trained logistic regression model to the embeddings
    of all economics-relevant articles. To handle a potentially large number of
    articles (millions) without exhausting system memory, it reads the required
    embeddings from the HDF5 file and performs prediction in manageable batches.

    Inputs:
        model (LogisticRegression): The loaded, trained scikit-learn model object.
        embeddings_path (str): The path to the HDF5 file containing the full
                               embedding matrix for the entire corpus.
        relevant_indices (np.ndarray): A sorted NumPy array of integer row indices
                                       specifying which embeddings to score.
        inference_batch_size (int): The number of embeddings to process in a
                                    single prediction batch.

    Processes:
    1.  Opens the HDF5 embeddings file.
    2.  Iterates through the `relevant_indices` array in chunks of size
        `inference_batch_size`.
    3.  For each chunk of indices, it reads the corresponding embedding vectors
        from the HDF5 file. This is an efficient, non-contiguous read.
    4.  Calls `model.predict_proba()` on the batch of embeddings to get the
        sentiment probabilities for both classes.
    5.  Selects the probability for the positive class (class 1).
    6.  Collects the probabilities from all batches.
    7.  Concatenates the batch results into a single, final NumPy array.

    Outputs:
        (np.ndarray): A 1D NumPy array of sentiment probabilities (`p_pos`),
                      ordered to correspond to the `relevant_indices` array.
    """
    # This list will store the probability arrays from each processed batch.
    all_probabilities: List[np.ndarray] = []

    # Open the HDF5 file for reading.
    with h5py.File(embeddings_path, 'r') as f:
        embeddings_dset = f['embeddings']

        # Iterate through the sorted list of relevant indices in batches.
        for i in tqdm(range(0, len(relevant_indices), inference_batch_size), desc="Scoring article sentiment"):
            # Get the indices for the current batch.
            batch_indices = relevant_indices[i:i + inference_batch_size]

            # Read the corresponding embedding vectors for this batch.
            # HDF5 is highly optimized for this type of indexed slicing.
            batch_embeddings = embeddings_dset[batch_indices]

            # Use predict_proba to get the probability for each class.
            # Equation: p_i = 1 / (1 + exp(-(w^T x_i + b)))
            batch_probs_all_classes = model.predict_proba(batch_embeddings)

            # We only need the probability of the positive class (class 1).
            # The columns are ordered [class_0_prob, class_1_prob].
            batch_probs_positive_class = batch_probs_all_classes[:, 1]

            # Append the results for this batch to our list.
            all_probabilities.append(batch_probs_positive_class)

    # Concatenate the list of batch arrays into a single, large NumPy array.
    full_probability_vector = np.concatenate(all_probabilities)

    return full_probability_vector

# ------------------------------------------------------------------------------
# Task 13, Step 3: Validate score distribution and persist for downstream aggregation
# ------------------------------------------------------------------------------

def _assemble_and_persist_scored_articles(
    df_relevant: pd.DataFrame,
    probabilities: np.ndarray,
    output_path: str
) -> pd.DataFrame:
    """
    Assembles the final scored articles DataFrame, validates it, and saves it.

    Purpose:
    This function combines the metadata of the relevant articles with their newly
    computed sentiment scores. It performs critical validation checks on the scores
    and persists the final, enriched DataFrame to a performant file format. This
    artifact is the direct input for the final indicator aggregation stage.

    Inputs:
        df_relevant (pd.DataFrame): The DataFrame containing metadata for only
                                    the relevant articles.
        probabilities (np.ndarray): The 1D array of sentiment scores, ordered to
                                    match the rows in `df_relevant`.
        output_path (str): The file path to save the final scored articles data.

    Processes:
    1.  Creates a new DataFrame from `df_relevant` to avoid modifying the original.
    2.  Assigns the `probabilities` array to a new column, 'p_pos'.
    3.  Validates that all scores are non-null and fall within the [0, 1] range.
    4.  Prints descriptive statistics of the score distribution as a diagnostic.
    5.  Saves the validated DataFrame to the specified path using the Feather format.

    Outputs:
        (pd.DataFrame): The final, validated DataFrame of scored articles.
    """
    # --- 1. Assemble the DataFrame ---
    # Create a copy to ensure the original df_relevant is not modified.
    scored_articles_df = df_relevant.copy()
    # Assign the computed probabilities. The order is guaranteed by the orchestrator.
    scored_articles_df['p_pos'] = probabilities

    # --- 2. Validate the Scores ---
    # Check for any null probabilities, which would indicate an error.
    if scored_articles_df['p_pos'].isna().any():
        raise ValueError("Sentiment scores contain unexpected null values.")
    # Check that all probabilities are within the valid [0, 1] range.
    if not scored_articles_df['p_pos'].between(0, 1).all():
        raise ValueError("Sentiment scores contain values outside the valid [0, 1] range.")

    # --- 3. Diagnostic Reporting ---
    # Print summary statistics for the sentiment score distribution.
    print("\n--- Sentiment Score Distribution Summary ---")
    print(scored_articles_df['p_pos'].describe())
    print("----------------------------------------")

    # --- 4. Persist the DataFrame ---
    # Select key columns for the final artifact.
    columns_to_save = [
        'article_id', 'publication_datetime_utc', 'language', 'outlet_id', 'p_pos'
    ]
    # Save to Feather format for fast read/write operations.
    scored_articles_df[columns_to_save].to_feather(output_path)
    print(f"Scored articles DataFrame saved to: {output_path}")

    return scored_articles_df

# ------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# ------------------------------------------------------------------------------

def score_relevant_articles(
    df_relevant: pd.DataFrame,
    sentiment_model_path: str,
    embeddings_path: str,
    crosswalk_path: str,
    output_path: str
) -> str:
    """
    Orchestrates the sentiment scoring of all relevant articles.

    This function manages the end-to-end workflow of applying the trained
    sentiment classifier to the embeddings of all economics-relevant articles.
    It handles loading the model, retrieving the correct data in batches,
    running inference, and saving the final, validated output.

    Args:
        df_relevant (pd.DataFrame): DataFrame of metadata for relevant articles (from Task 9).
        sentiment_model_path (str): Path to the trained sentiment classifier model.
        embeddings_path (str): Path to the HDF5 file of all embeddings.
        crosswalk_path (str): Path to the CSV crosswalk table.
        output_path (str): File path to save the final scored articles data.

    Returns:
        (str): The path to the saved file of scored articles.
    """
    # --- Step 1: Load Model and Retrieve Embedding Indices ---
    # Load the trained scikit-learn model from disk.
    print(f"Loading sentiment classifier from: {sentiment_model_path}")
    model: LogisticRegression = joblib.load(sentiment_model_path)

    # Load the crosswalk and filter it to get the HDF5 row indices for our relevant articles.
    crosswalk_df = pd.read_csv(crosswalk_path)
    relevant_ids = set(df_relevant['article_id'])
    relevant_crosswalk = crosswalk_df[crosswalk_df['article_id'].isin(relevant_ids)]

    # IMPORTANT: Sort the df_relevant and the indices to ensure perfect alignment.
    # This creates a canonical order that will be preserved through scoring.
    df_relevant_sorted = df_relevant.set_index('article_id').loc[relevant_crosswalk['article_id']].reset_index()
    relevant_indices_sorted = relevant_crosswalk['embedding_row_index'].values

    # --- Step 2: Compute Article-Level Sentiment Probabilities in Batches ---
    probabilities = _run_sentiment_inference_in_batches(
        model=model,
        embeddings_path=embeddings_path,
        relevant_indices=relevant_indices_sorted
    )

    # --- Step 3: Assemble, Validate, and Persist the Final DataFrame ---
    _assemble_and_persist_scored_articles(
        df_relevant=df_relevant_sorted,
        probabilities=probabilities,
        output_path=output_path
    )

    return output_path


In [None]:
# Task 14 — Construct monthly NEOS and early-release variants by temporal aggregation

# ===================================================================================
# Task 14: Construct monthly NEOS and early-release variants by temporal aggregation
# ===================================================================================

# ------------------------------------------------------------------------------
# Task 14, Helper Function for Aggregation
# ------------------------------------------------------------------------------

def _aggregate_scores_to_monthly(
    df: pd.DataFrame,
    suffix: str
) -> pd.DataFrame:
    """
    Aggregates article-level scores to a monthly time series.

    Purpose:
    This is a core helper function that takes a DataFrame of scored articles
    and computes the monthly average sentiment score (the indicator value) and
    the total number of articles for that month. It handles months with no
    articles by producing NaNs, ensuring a continuous time series index.

    Inputs:
        df (pd.DataFrame): A DataFrame containing 'publication_datetime_utc' and 'p_pos'.
        suffix (str): A suffix to append to the output column names (e.g., '_baseline').

    Processes:
    1.  Sets 'publication_datetime_utc' as the index.
    2.  Groups the DataFrame by month-start frequency ('MS').
    3.  For each month, it calculates the mean of 'p_pos' and the count of articles.
    4.  Renames the columns with the provided suffix for clarity.

    Outputs:
        (pd.DataFrame): A monthly time series DataFrame with columns for the
                        indicator value and the article count.
    """
    # Return an empty DataFrame if the input is empty.
    if df.empty:
        return pd.DataFrame()

    # Set the timestamp as the index for time-based grouping.
    df_indexed = df.set_index('publication_datetime_utc')

    # Group by month-start frequency and apply aggregations.
    # 'mean' calculates the NEOS value.
    # 'size' calculates the number of articles in the group.
    monthly_agg = df_indexed.groupby(pd.Grouper(freq='MS')).agg(
        indicator_value=('p_pos', 'mean'),
        article_count=('p_pos', 'size')
    )

    # Rename columns to be descriptive using the provided suffix.
    monthly_agg.rename(columns={
        'indicator_value': f'NEOS{suffix}',
        'article_count': f'count{suffix}'
    }, inplace=True)

    return monthly_agg

# ------------------------------------------------------------------------------
# Task 14, Steps 1 & 2: Compute baseline and early-release NEOS variants
# ------------------------------------------------------------------------------

def _construct_monthly_indicators(
    scored_articles_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Constructs the baseline monthly NEOS and all early-release variants.

    Purpose:
    This function generates the main set of monthly time-series indicators. It
    calculates the full-month baseline NEOS and then iteratively calculates the
    timelier variants based on articles published within the first 7, 14, and
    21 days of each month.

    Inputs:
        scored_articles_df (pd.DataFrame): The DataFrame of all scored relevant articles.
        config (Dict[str, Any]): The 'aggregation_params' sub-dictionary.

    Processes:
    1.  Derives 'day_of_month' from the publication timestamp.
    2.  Calls the aggregation helper on the full dataset to get the baseline NEOS.
    3.  Loops through the specified early-release day cutoffs (e.g., [7, 14, 21]).
    4.  In each loop, it filters the articles for that window and calls the
        aggregation helper.
    5.  Joins all resulting time series into a single, comprehensive monthly DataFrame.

    Outputs:
        (pd.DataFrame): A DataFrame indexed by month, with columns for each
                        NEOS variant and its corresponding article count.
    """
    # --- 1. Prepare Data ---
    # The 'day_of_month' is needed for filtering early-release windows.
    df = scored_articles_df.copy()
    df['day_of_month'] = df['publication_datetime_utc'].dt.day

    # --- 2. Compute Baseline NEOS (Step 1) ---
    # This is the main indicator using all articles in a given month.
    # Equation: NEOS_m = (1/|I_m|) * sum_{i in I_m} p_i
    baseline_neos = _aggregate_scores_to_monthly(df, suffix='_baseline')

    # --- 3. Compute Early-Release Variants (Step 2) ---
    # This list will hold the DataFrames for each variant.
    all_indicators = [baseline_neos]
    early_release_days = config.get('early_release_variants_days', [7, 14, 21])

    # Loop through each specified cutoff day.
    for k in early_release_days:
        # Equation: NEOS_m^(k) = (1/|I_m^(<=k)|) * sum_{i in I_m^(<=k)} p_i
        # Filter the DataFrame to include only articles published on or before day k.
        df_filtered = df[df['day_of_month'] <= k]

        # Call the aggregation helper on the filtered subset.
        early_release_neos = _aggregate_scores_to_monthly(df_filtered, suffix=f'_{k}d')

        # Add the resulting time series to our list.
        all_indicators.append(early_release_neos)

    # --- 4. Combine All Indicators ---
    # Join all the monthly time series together on their common DatetimeIndex.
    # The join='outer' ensures we keep all months from all series.
    final_monthly_df = pd.concat(all_indicators, axis=1)

    # Fill NaN counts with 0 for clarity. NEOS values remain NaN for empty months.
    count_cols = [col for col in final_monthly_df.columns if 'count' in col]
    final_monthly_df[count_cols] = final_monthly_df[count_cols].fillna(0).astype(int)

    return final_monthly_df

# ------------------------------------------------------------------------------
# Task 14, Step 3: (Optional) Construct daily month-to-date NEOS
# ------------------------------------------------------------------------------

def _construct_daily_mtd_indicator(
    scored_articles_df: pd.DataFrame
) -> Optional[pd.DataFrame]:
    """
    Constructs a daily, month-to-date (MTD) version of the NEOS indicator.

    Purpose:
    This function generates the high-frequency diagnostic series required to
    replicate Chart 4. For each day, it calculates the average sentiment of all
    articles published so far within that month. This provides a real-time view
    of how the indicator evolves intra-month.

    Inputs:
        scored_articles_df (pd.DataFrame): The DataFrame of all scored relevant articles.

    Outputs:
        (Optional[pd.DataFrame]): A daily time series DataFrame with the MTD NEOS
                                  and cumulative article count, or None if the
                                  input is empty.
    """
    if scored_articles_df.empty:
        return None

    # Sort by time to ensure correct cumulative calculations.
    df = scored_articles_df.sort_values('publication_datetime_utc').copy()

    # Create a 'year_month' column for grouping.
    df['year_month'] = df['publication_datetime_utc'].dt.to_period('M')

    # --- Calculate Cumulative Values Within Each Month ---
    # Group by month and then calculate the cumulative sum of scores and a cumulative count.
    df['cumulative_score_sum'] = df.groupby('year_month')['p_pos'].cumsum()
    df['cumulative_article_count'] = df.groupby('year_month').cumcount() + 1

    # The MTD NEOS at the time of each article is the cumulative sum / cumulative count.
    df['NEOS_mtd_at_article_time'] = df['cumulative_score_sum'] / df['cumulative_article_count']

    # --- Convert to a Daily Series ---
    # Set the timestamp as the index.
    df_indexed = df.set_index('publication_datetime_utc')

    # Create a daily series by taking the *last* calculated MTD value for each day.
    daily_series = df_indexed.resample('D').last()

    # Forward-fill the values to carry the last known MTD value over days with no new articles.
    daily_series_ffilled = daily_series[['NEOS_mtd_at_article_time', 'cumulative_article_count']].ffill()

    # Rename for clarity.
    daily_series_ffilled.rename(columns={
        'NEOS_mtd_at_article_time': 'NEOS_mtd',
        'cumulative_article_count': 'count_mtd'
    }, inplace=True)

    return daily_series_ffilled

# ------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# ------------------------------------------------------------------------------

def construct_neos_indicators(
    scored_articles_path: str,
    fused_master_input_specification: Dict[str, Any]
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
    """
    Orchestrates the temporal aggregation of article scores into NEOS indicators.

    This function loads the article-level sentiment scores and manages the
    process of creating all required time-series indicators: the baseline monthly
    NEOS, the early-release variants, and the optional daily month-to-date series
    for diagnostic purposes.

    Args:
        scored_articles_path (str): Path to the Feather file of scored articles.
        fused_master_input_specification (Dict[str, Any]): The master config.

    Returns:
        A tuple containing:
        - (pd.DataFrame): A DataFrame of all monthly NEOS indicators and counts.
        - (Optional[pd.DataFrame]): A DataFrame of the daily MTD indicator, or
          None if it was not computed.
    """
    # --- 1. Load Data ---
    # Load the output from the previous sentiment scoring task.
    print(f"Loading scored articles from: {scored_articles_path}")
    scored_articles_df = pd.read_feather(scored_articles_path)

    # Extract the relevant configuration section.
    agg_config = fused_master_input_specification['master_config']['aggregation_params']

    # --- 2. Construct Monthly Indicators (Steps 1 & 2) ---
    # This function handles both the baseline and all early-release variants.
    monthly_indicators_df = _construct_monthly_indicators(scored_articles_df, agg_config)

    # --- 3. Construct Daily Diagnostic Indicator (Step 3, Optional) ---
    # This is controlled by a flag in the config.
    daily_indicator_df = None
    if agg_config.get('month_to_date_series', False):
        print("Constructing daily month-to-date diagnostic series...")
        daily_indicator_df = _construct_daily_mtd_indicator(scored_articles_df)

    return monthly_indicators_df, daily_indicator_df


In [None]:
# Task 15 — Align monthly NEOS to quarterly predictors and prepare comparator indicators

# ======================================================================================
# Task 15: Align monthly NEOS to quarterly predictors and prepare comparator indicators
# ======================================================================================

# ------------------------------------------------------------------------------
# Task 15, Helper Function for Temporal Alignment
# ------------------------------------------------------------------------------

def _align_monthly_to_quarterly(
    monthly_df: pd.DataFrame,
    month_of_quarter: int
) -> pd.DataFrame:
    """
    Aligns a monthly time series to a quarterly frequency by selecting a specific month.

    Purpose:
    This function implements the core "information set" logic. Instead of
    averaging, it selects the data from a specific month within each quarter
    (e.g., the 2nd month) to represent the entire quarter. This simulates the
    data available to a forecaster at a specific point in time.

    Inputs:
        monthly_df (pd.DataFrame): A DataFrame with a monthly DatetimeIndex ('MS').
        month_of_quarter (int): The month to select from each quarter (1, 2, or 3).

    Processes:
    1.  Adds a 'quarter' column, mapping each month to its quarter's start date.
    2.  Adds a 'month_of_quarter' column (1, 2, or 3).
    3.  Filters the DataFrame to keep only the rows for the specified month of each quarter.
    4.  Sets the 'quarter' column as the new DatetimeIndex, resulting in a
        quarterly time series.

    Outputs:
        (pd.DataFrame): A new DataFrame with a quarterly DatetimeIndex ('QS-JAN').
    """
    # --- Input Validation ---
    if not isinstance(monthly_df.index, pd.DatetimeIndex) or monthly_df.index.freqstr != 'MS':
        raise ValueError("Input `monthly_df` must have a DatetimeIndex with 'MS' frequency.")
    if month_of_quarter not in [1, 2, 3]:
        raise ValueError("`month_of_quarter` must be 1, 2, or 3.")

    # Create a working copy.
    df = monthly_df.copy()

    # --- 1. Map Months to Quarters ---
    # This creates a timestamp for the first day of the quarter for each monthly observation.
    df['quarter'] = df.index.to_period('Q').to_timestamp()

    # --- 2. Identify Month within Quarter ---
    # The formula (month - 1) % 3 + 1 correctly maps [1..12] to [1,2,3,1,2,3,...].
    df['month_of_quarter'] = (df.index.month - 1) % 3 + 1

    # --- 3. Filter for the Selected Month ---
    # Keep only the rows corresponding to the desired month of the quarter.
    quarterly_df = df[df['month_of_quarter'] == month_of_quarter].copy()

    # --- 4. Set the New Quarterly Index ---
    # Set the 'quarter' column as the index and drop the helper columns.
    quarterly_df = quarterly_df.set_index('quarter').drop(
        columns=['month_of_quarter']
    )
    # Ensure the new index has the correct frequency attribute.
    quarterly_df.index.freq = 'QS-JAN'

    return quarterly_df

# ------------------------------------------------------------------------------
# Task 15, Steps 1, 2, & 3: Align all series and merge into final dataset
# ------------------------------------------------------------------------------

def prepare_forecasting_dataset(
    monthly_neos_df: pd.DataFrame,
    monthly_indicator_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame,
    fused_master_input_specification: Dict[str, Any]
) -> pd.DataFrame:
    """
    Constructs the final quarterly dataset for the forecasting exercise.

    Purpose:
    This orchestrator function is the final data assembly step. It takes all
    monthly and quarterly source data and meticulously aligns them according to
    the paper's information set policy. It creates the definitive, wide-format
    DataFrame that will be the input for all subsequent econometric analysis,
    ensuring all variables are correctly timed and aligned.

    Inputs:
        monthly_neos_df (pd.DataFrame): Monthly NEOS indicators from Task 14.
        monthly_indicator_df (pd.DataFrame): Monthly comparator indicators (PMI, KOF).
        raw_macro_data_df (pd.DataFrame): Raw quarterly data (GDP, SECO).
        fused_master_input_specification (Dict[str, Any]): The master config.

    Processes:
    1.  **Align NEOS Variants:** Applies the information set policy (m=2 for
        baseline, m=3 for early-release) to the monthly NEOS series.
    2.  **Align Comparators:** Applies the baseline policy (m=2) to the monthly
        comparator indicators (PMI, KOF).
    3.  **Combine Predictors:** Merges all newly aligned quarterly predictors into
        a single DataFrame.
    4.  **Final Merge:** Merges the predictor DataFrame with the raw quarterly
        data, which contains the dependent variable (GDP).
    5.  **Create Lagged Variable:** Creates the lagged dependent variable (y_{t-1}),
        which is a required regressor in the forecasting models.
    6.  **Finalize:** Returns a clean DataFrame with a complete quarterly index,
        ready for econometric modeling.

    Outputs:
        (pd.DataFrame): The final, fully aligned quarterly dataset for forecasting.
    """
    # --- 1. Align NEOS Indicators (Step 1) ---
    # Extract the information set policy from the configuration.
    info_policy = fused_master_input_specification['master_config']['econometric_validation_params']['information_set_policy']
    m_baseline = info_policy['use_month_m_for_quarter_t']['baseline']
    m_early = info_policy['use_month_m_for_quarter_t']['early_neos']

    # Align the baseline NEOS using the m=2 rule.
    neos_baseline_q = _align_monthly_to_quarterly(
        monthly_neos_df[['NEOS_baseline']], m_baseline
    )

    # Align the early-release variants using the m=3 rule.
    early_cols = [col for col in monthly_neos_df.columns if '_d' in col and 'NEOS' in col]
    neos_early_q = _align_monthly_to_quarterly(
        monthly_neos_df[early_cols], m_early
    )

    # --- 2. Align Comparator Indicators (Step 2) ---
    # Align the monthly comparators using the same baseline m=2 rule.
    comparators_q = _align_monthly_to_quarterly(
        monthly_indicator_df, m_baseline
    )
    # Rename columns to the '_m2' convention for clarity.
    comparators_q.columns = [f"{col}_m2" for col in comparators_q.columns]

    # --- 3. Combine All Predictors ---
    # Join all the newly created quarterly predictor series.
    all_predictors_q = neos_baseline_q.join(
        [neos_early_q, comparators_q], how='outer'
    )

    # --- 4. Final Merge with Dependent Variable (Step 3) ---
    # Merge the predictors with the raw macro data (which includes GDP and SECO).
    # We select all columns from the raw data to ensure we keep everything.
    final_df = raw_macro_data_df.join(all_predictors_q, how='outer')

    # --- 5. Create Lagged Dependent Variable ---
    # The forecasting equation y_{t+h} = f(y_{t-1}, ...) requires the first lag of GDP.
    # The .shift(1) operator correctly creates this lagged series.
    dep_var = fused_master_input_specification['master_config']['econometric_validation_params']['dependent_variable']
    final_df[f'{dep_var}_lag1'] = final_df[dep_var].shift(1)

    # --- 6. Finalize and Return ---
    # Sort the index to ensure chronological order.
    final_df.sort_index(inplace=True)

    print("Final forecasting dataset constructed with shape:", final_df.shape)

    return final_df


In [None]:
# Task 16 — Execute pseudo-out-of-sample (POOS) expanding-window forecasts for horizons h ∈ {0,1,2}

# ==================================================================================================
# Task 16: Execute pseudo-out-of-sample (POOS) expanding-window forecasts for horizons h in {0,1,2}
# ==================================================================================================

# ------------------------------------------------------------------------------
# Task 16, Helper Function for a single POOS run
# ------------------------------------------------------------------------------

def _run_expanding_window_regression(
    df: pd.DataFrame,
    dep_var: str,
    dep_var_lag1: str,
    indicator_col: Optional[str],
    horizon: int,
    initial_window_size: int
) -> List[Dict[str, Any]]:
    """
    Executes a single pseudo-out-of-sample expanding-window forecast.

    Purpose:
    This function is the core of the POOS exercise. It simulates a real-time
    forecasting process for a single model specification. It iterates through time,
    at each step using only past and current data to fit a model and predict a
    future value. This rigorously prevents any look-ahead bias.

    Inputs:
        df (pd.DataFrame): The analysis-ready quarterly DataFrame. It must contain
                           the dependent variable, its lag, and the indicator.
        dep_var (str): The name of the dependent variable column (e.g., 'yoy_gdp_growth').
        dep_var_lag1 (str): The name of the lagged dependent variable column.
        indicator_col (Optional[str]): The name of the indicator column. If None,
                                       an AR(1) benchmark model is estimated.
        horizon (int): The forecast horizon `h` in quarters (0, 1, or 2).
        initial_window_size (int): The number of initial quarters for the first estimation.

    Processes:
    1.  Constructs the appropriate regression formula (AR(1) or AR(1) + indicator).
    2.  Creates the shifted dependent variable `y_{t+h}`.
    3.  Iterates from the end of the initial window to the end of the sample.
    4.  In each iteration `t`:
        a. Defines the expanding training window (all data up to `t`).
        b. Fits an OLS model on the training window.
        c. Predicts the value for `y_{t+h}` using the regressors at time `t`.
        d. Calculates the forecast error.
        e. Stores the actual value, forecast, error, and timestamp.

    Outputs:
        (List[Dict[str, Any]]): A list of dictionaries, each containing the
                                detailed results for a single forecast origin.
    """
    # --- 1. Prepare Data and Formula ---
    # The target variable y_{t+h} is the future value of the dependent variable.
    # We create it by shifting the original series backwards by `h` periods.
    target_col = f"{dep_var}_h{horizon}"
    df[target_col] = df[dep_var].shift(-horizon)

    # Construct the regression formula using statsmodels' formula syntax.
    if indicator_col:
        # Equation (1): y_{t+h} = alpha + beta*y_{t-1} + gamma*x_t + epsilon_t
        formula = f"{target_col} ~ {dep_var_lag1} + `{indicator_col}`"
    else:
        # AR(1) Benchmark: y_{t+h} = alpha + beta*y_{t-1} + epsilon_t
        formula = f"{target_col} ~ {dep_var_lag1}"

    # --- 2. POOS Loop ---
    # This list will store the results from each forecast origin.
    results = []

    # The loop starts after the initial estimation window and must end `h`
    # periods before the end to have an actual value to compare against.
    end_point = len(df) - horizon
    start_point = initial_window_size

    # Iterate through each possible forecast origin date `t`.
    for t in range(start_point, end_point):
        # The training data is all data from the beginning up to the current time `t`.
        # This implements the "expanding window".
        train_window = df.iloc[:t+1]

        # The information set for prediction is the very last row of the window.
        predict_data = train_window.iloc[-1:]

        # Check if we have enough data to fit the model (no NaNs in the window).
        # This handles missing indicator values at the start of the sample.
        if train_window[[dep_var_lag1, indicator_col] if indicator_col else [dep_var_lag1]].dropna().empty:
            continue

        try:
            # Fit the OLS model on the current training window.
            model = smf.ols(formula, data=train_window).fit()

            # Generate the out-of-sample forecast for y_{t+h} using data at time `t`.
            forecast = model.predict(predict_data).iloc[0]

            # Get the actual realized value of y_{t+h}.
            actual = predict_data[target_col].iloc[0]

            # Store the results if the forecast is valid.
            if pd.notna(forecast) and pd.notna(actual):
                results.append({
                    'forecast_origin': predict_data.index[0],
                    'horizon': horizon,
                    'model': indicator_col if indicator_col else 'AR(1)',
                    'actual': actual,
                    'forecast': forecast,
                    'error': actual - forecast
                })
        except Exception as e:
            # Catch potential estimation errors (e.g., perfect multicollinearity).
            print(f"Warning: Could not fit model for origin {predict_data.index[0]} and indicator {indicator_col}. Error: {e}")

    return results

# ------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# ------------------------------------------------------------------------------

def execute_poos_forecasts(
    forecasting_df: pd.DataFrame,
    fused_master_input_specification: Dict[str, Any],
    evaluation_windows_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Orchestrates the entire pseudo-out-of-sample (POOS) forecasting exercise.

    This function iterates through all specified indicators and forecast horizons,
    running the expanding-window regression for each. It correctly handles
    indicators with limited historical availability by restricting the evaluation
    period, ensuring a fair and methodologically sound comparison.

    Args:
        forecasting_df (pd.DataFrame): The final quarterly dataset from Task 15.
        fused_master_input_specification (Dict[str, Any]): The master config.
        evaluation_windows_df (pd.DataFrame): Defines valid evaluation periods.

    Returns:
        (pd.DataFrame): A comprehensive DataFrame containing the forecast errors
                        for every model, horizon, and time point.
    """
    # --- 1. Extract Configuration ---
    econ_params = fused_master_input_specification['master_config']['econometric_validation_params']
    dep_var = econ_params['dependent_variable']
    dep_var_lag1 = f"{dep_var}_lag1"
    horizons = econ_params['forecast_horizons_quarters']
    initial_window = econ_params['poos_initial_window_quarters']

    # --- 2. Define Models to Evaluate ---
    # Identify all potential predictor columns in the dataset.
    all_cols = forecasting_df.columns
    indicator_cols = [
        col for col in all_cols if 'NEOS' in col or '_m2' in col or 'seco' in col
    ]
    # The benchmark model is represented by `None`.
    models_to_run = [None] + indicator_cols

    # --- 3. Main Evaluation Loop ---
    all_results = []

    # Use tqdm for a master progress bar over all models and horizons.
    pbar = tqdm(total=len(models_to_run) * len(horizons), desc="Executing POOS Forecasts")

    # Loop over each model specification (AR(1) and each indicator).
    for indicator in models_to_run:
        # --- Step 3 (logic): Handle limited-availability indicators ---
        # Default to the full DataFrame.
        df_for_run = forecasting_df.copy()

        # Check if the current indicator has a special evaluation window.
        indicator_name_for_window = indicator
        if indicator and '_m2' in indicator: # Map e.g. 'service_pmi_m2' to 'service_pmi'
            indicator_name_for_window = indicator.replace('_m2', '')

        window_spec = evaluation_windows_df[evaluation_windows_df['series_name'] == indicator_name_for_window]

        if not window_spec.empty:
            # If a window is defined, slice the DataFrame to that period.
            start_q = window_spec['start_quarter'].iloc[0]
            print(f"Applying limited evaluation window for '{indicator}': starting from {start_q.date()}")
            df_for_run = df_for_run.loc[start_q:]

        # Loop over each forecast horizon (h=0, 1, 2).
        for h in horizons:
            # --- Step 2 (logic): Run the expanding-window regression ---
            # This helper function performs the core POOS loop for one model.
            model_results = _run_expanding_window_regression(
                df=df_for_run,
                dep_var=dep_var,
                dep_var_lag1=dep_var_lag1,
                indicator_col=indicator,
                horizon=h,
                initial_window_size=initial_window
            )
            all_results.extend(model_results)
            pbar.update(1)

    pbar.close()

    # --- 4. Finalize and Return ---
    # Convert the list of result dictionaries into a final DataFrame.
    if not all_results:
        print("Warning: No forecast results were generated. Check data availability and window sizes.")
        return pd.DataFrame()

    results_df = pd.DataFrame(all_results)

    return results_df


In [None]:
# Task 17 — Compute RMSE ratios for all indicators and horizons

# ==============================================================================
# Task 17: Compute RMSE ratios for all indicators and horizons
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 17, Steps 1, 2, & 3: Compute RMSEs, Ratios, and Tabulate
# ------------------------------------------------------------------------------

def compute_rmse_ratios(
    forecast_errors_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes RMSE ratios for all indicators against the AR(1) benchmark.

    Purpose:
    This function is the primary evaluation engine of the forecasting exercise.
    It takes the raw forecast errors and calculates the main performance metric:
    the ratio of the indicator model's Root Mean Squared Error (RMSE) to the
    benchmark AR(1) model's RMSE. A ratio below 1 signifies that the indicator
    improves forecast accuracy. The function is meticulously designed to ensure
    a fair comparison by using the exact same set of forecast origins for each
    indicator-benchmark pair.

    Inputs:
        forecast_errors_df (pd.DataFrame): The long-format DataFrame of forecast
                                          errors from Task 16. Must contain
                                          'model', 'horizon', 'forecast_origin',
                                          and 'error' columns.

    Processes:
    1.  Pivots the input DataFrame to align errors from all models by their
        'forecast_origin' date for each horizon.
    2.  Separates the AR(1) benchmark errors.
    3.  Iterates through each indicator model and each forecast horizon.
    4.  For each combination:
        a. **[Step 1]** Identifies the common sample of forecast origins where
           *both* the indicator model and the AR(1) model produced a valid forecast.
        b. Calculates the squared errors for both models on this common sample.
        c. Computes the RMSE for both models using the formula:
           RMSE = sqrt(mean(squared_errors)).
        d. **[Step 2]** Computes the RMSE ratio: RMSE_indicator / RMSE_AR(1).
        e. Stores the indicator name, horizon, ratio, and the common sample size (T).
    5.  **[Step 3]** Assembles the collected results into a final, wide-format
        DataFrame that mirrors the structure of Table 1 in the paper.

    Outputs:
        (pd.DataFrame): A DataFrame with indicators as the index, horizons as
                        columns, and RMSE ratios as values. It also includes
                        columns for the sample size used in each calculation.
    """
    # --- Input Validation ---
    if forecast_errors_df.empty:
        print("Warning: Input `forecast_errors_df` is empty. Cannot compute RMSE ratios.")
        return pd.DataFrame()

    # This list will store the summary results for each model/horizon pair.
    summary_results: List[Dict[str, Any]] = []

    # Get the unique horizons and model names from the data.
    horizons = sorted(forecast_errors_df['horizon'].unique())
    # Exclude the benchmark from the list of indicator models to iterate over.
    indicator_models = sorted([m for m in forecast_errors_df['model'].unique() if m != 'AR(1)'])

    # --- Main Loop: Iterate through each indicator and horizon ---
    for horizon in horizons:
        # Filter the errors for the current horizon.
        horizon_errors = forecast_errors_df[forecast_errors_df['horizon'] == horizon]

        # Pivot the table to align errors by date.
        # This makes finding the common sample trivial.
        pivoted_errors = horizon_errors.pivot_table(
            index='forecast_origin', columns='model', values='error'
        )

        # Extract the benchmark AR(1) errors.
        ar1_errors = pivoted_errors['AR(1)']

        for indicator in indicator_models:
            # Skip if the indicator column doesn't exist for this horizon (can happen with limited windows).
            if indicator not in pivoted_errors.columns:
                continue

            # --- Step 1 (logic): Find common sample and compute RMSEs ---
            # Create a temporary DataFrame with just the indicator and benchmark errors.
            comparison_df = pivoted_errors[[indicator, 'AR(1)']].copy()

            # Drop all rows with any NaNs to get the common sample of forecast origins.
            common_sample_errors = comparison_df.dropna()

            # Get the size of the common sample, T.
            sample_size = len(common_sample_errors)

            if sample_size == 0:
                continue # Cannot compute ratio if there's no overlapping data.

            # Calculate the Mean Squared Error for both models on the common sample.
            mse_indicator = np.mean(common_sample_errors[indicator] ** 2)
            mse_ar1 = np.mean(common_sample_errors['AR(1)'] ** 2)

            # Calculate the Root Mean Squared Error.
            # Equation: RMSE = sqrt(mean(error^2))
            rmse_indicator = np.sqrt(mse_indicator)
            rmse_ar1 = np.sqrt(mse_ar1)

            # --- Step 2 (logic): Compute RMSE Ratio ---
            # Equation: R_h = RMSE_indicator,h / RMSE_AR(1),h
            # Handle the edge case of division by zero.
            rmse_ratio = rmse_indicator / rmse_ar1 if rmse_ar1 > 0 else np.nan

            # --- Store the results ---
            summary_results.append({
                'Indicator': indicator,
                'Horizon': f'h={horizon}',
                'RMSE_Ratio': rmse_ratio,
                'N_obs': sample_size
            })

    # --- Step 3: Tabulate the Final Results ---
    # Convert the list of results into a DataFrame.
    if not summary_results:
        print("Warning: No valid RMSE ratios could be computed.")
        return pd.DataFrame()

    results_long_df = pd.DataFrame(summary_results)

    # Pivot the table to create the final wide-format output.
    # This creates a table with Indicators as rows and Horizons as columns.
    rmse_ratio_table = results_long_df.pivot(
        index='Indicator', columns='Horizon', values='RMSE_Ratio'
    )

    # Create a parallel table for the sample sizes.
    sample_size_table = results_long_df.pivot(
        index='Indicator', columns='Horizon', values='N_obs'
    )
    sample_size_table.columns = [f"N_{col}" for col in sample_size_table.columns]

    # Join the two tables to have ratios and sample sizes side-by-side.
    final_table = rmse_ratio_table.join(sample_size_table)

    # Reorder columns for clarity.
    ordered_cols = []
    for h in horizons:
        ordered_cols.append(f'h={h}')
        ordered_cols.append(f'N_h={h}')
    final_table = final_table[ordered_cols]

    print("\n--- RMSE Ratio Results (Indicator vs. AR(1) Benchmark) ---")
    print(final_table)

    return final_table


In [None]:
# Task 18 — Perform the modified Diebold–Mariano test with HAC robust standard errors

# ===================================================================================
# Task 18: Perform the modified Diebold–Mariano test with HAC robust standard errors
# ===================================================================================

# ------------------------------------------------------------------------------
# Task 18, Helper Function for a single DM Test
# ------------------------------------------------------------------------------

def _compute_dm_test(
    errors_model1: pd.Series,
    errors_model2: pd.Series,
    horizon: int
) -> Tuple[float, float]:
    """
    Computes a single Diebold-Mariano test with HAC robust standard errors.

    Purpose:
    This function performs a statistical test of the null hypothesis that two
    sets of forecasts have equal predictive accuracy. It is designed for
    out-of-sample forecast evaluation and is robust to the serial correlation
    that is naturally present in multi-step-ahead forecast errors.

    Inputs:
        errors_model1 (pd.Series): Forecast errors from the first model (e.g., indicator).
        errors_model2 (pd.Series): Forecast errors from the second model (e.g., benchmark).
                                   Must be indexed identically to errors_model1.
        horizon (int): The forecast horizon `h`. Used to determine the HAC bandwidth.

    Processes:
    1.  **[Step 1]** Computes the squared errors for both models and calculates the
        loss differential series: d_t = error1_t^2 - error2_t^2.
    2.  **[Step 2]** Estimates the HAC-robust long-run variance of the mean of the
        loss differential series using the Newey-West estimator with a Bartlett
        kernel. The bandwidth `q` is set to `h-1` for h>0, a standard choice for
        h-step-ahead forecasts.
    3.  **[Step 3]** Computes the Diebold-Mariano test statistic and its two-sided
        p-value under the standard normal asymptotic distribution.

    Outputs:
        A tuple containing:
        - (float): The Diebold-Mariano test statistic.
        - (float): The corresponding p-value.
    """
    # --- Step 1: Compute Loss Differential ---
    # The loss is the squared forecast error.
    loss1 = errors_model1**2
    loss2 = errors_model2**2
    # The loss differential series, d_t.
    loss_diff = loss1 - loss2

    # The mean of the loss differential, d-bar.
    d_bar = loss_diff.mean()

    # --- Step 2: Estimate HAC-Robust Variance (Newey-West) ---
    # The bandwidth `q` for the HAC estimator. For h-step-ahead forecasts,
    # the errors can be serially correlated up to lag h-1.
    # For h=0 and h=1, q=0 (no autocorrelation correction needed).
    bandwidth = max(0, horizon - 1)

    # Get the number of observations, T.
    T = len(loss_diff)

    # Calculate the autocovariances of the loss differential series.
    # `acovf` computes gamma_0, gamma_1, ..., gamma_q.
    gamma = tsa.acovf(loss_diff, nlag=bandwidth, fft=False)

    # Equation: Var_hat(d_bar) = (1/T) * [gamma_0 + 2 * sum_{l=1 to q} w_l * gamma_l]
    # where w_l = 1 - l / (q + 1) is the Bartlett kernel weight.
    variance_sum = gamma[0] # Start with gamma_0
    for l in range(1, bandwidth + 1):
        # Bartlett kernel weight.
        weight = 1 - (l / (bandwidth + 1))
        variance_sum += 2 * weight * gamma[l]

    # The final HAC-robust variance of the mean.
    hac_variance = variance_sum / T

    # --- Step 3: Compute DM Statistic and p-value ---
    # Handle the case where variance is zero or negative (numerical instability).
    if hac_variance <= 0:
        return np.nan, np.nan

    # Equation: DM = d_bar / sqrt(Var_hat(d_bar))
    dm_statistic = d_bar / np.sqrt(hac_variance)

    # The p-value is calculated from the standard normal distribution (asymptotic).
    # Equation: p = 2 * (1 - Phi(|DM|))
    # `norm.sf` is the survival function (1 - CDF), which is more accurate for large values.
    p_value = 2 * norm.sf(np.abs(dm_statistic))

    return dm_statistic, p_value

# ------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# ------------------------------------------------------------------------------

def perform_diebold_mariano_tests(
    forecast_errors_df: pd.DataFrame,
    rmse_ratios_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Orchestrates the Diebold-Mariano testing for all indicators and horizons.

    This function systematically compares each indicator model's forecast
    accuracy against the AR(1) benchmark. It calculates the DM test statistic
    and p-value for each comparison and appends these results to the existing
    RMSE ratio table, creating a comprehensive final evaluation summary.

    Args:
        forecast_errors_df (pd.DataFrame): The long-format DataFrame of forecast
                                          errors from Task 16.
        rmse_ratios_df (pd.DataFrame): The summary table of RMSE ratios from Task 17.

    Returns:
        (pd.DataFrame): The `rmse_ratios_df` augmented with columns for the
                        DM test p-values for each horizon.
    """
    # --- Input Validation ---
    if forecast_errors_df.empty:
        print("Warning: Input `forecast_errors_df` is empty. Cannot perform DM tests.")
        return rmse_ratios_df.copy()

    # This list will store the p-values to be added to the results table.
    dm_results: List[Dict[str, Any]] = []

    # Get the unique horizons and indicator models from the data.
    horizons = sorted(forecast_errors_df['horizon'].unique())
    indicator_models = sorted([m for m in forecast_errors_df['model'].unique() if m != 'AR(1)'])

    # --- Main Loop: Iterate through each indicator and horizon ---
    for horizon in horizons:
        # Filter the errors for the current horizon.
        horizon_errors = forecast_errors_df[forecast_errors_df['horizon'] == horizon]

        # Pivot to align errors by date.
        pivoted_errors = horizon_errors.pivot_table(
            index='forecast_origin', columns='model', values='error'
        )

        # Extract the benchmark AR(1) errors.
        ar1_errors = pivoted_errors['AR(1)']

        for indicator in indicator_models:
            if indicator not in pivoted_errors.columns:
                continue

            # Find the common sample where both models have valid forecasts.
            comparison_df = pivoted_errors[[indicator, 'AR(1)']].dropna()

            if comparison_df.empty:
                continue

            # --- Call the DM test helper function ---
            # This performs all three steps for the current comparison.
            dm_stat, p_val = _compute_dm_test(
                errors_model1=comparison_df[indicator],
                errors_model2=comparison_df['AR(1)'],
                horizon=horizon
            )

            # Store the resulting p-value.
            dm_results.append({
                'Indicator': indicator,
                'Horizon': f'h={horizon}',
                'p_value': p_val
            })

    # --- Assemble Final Table ---
    # Convert the list of results into a DataFrame.
    if not dm_results:
        print("Warning: No DM test results were generated.")
        return rmse_ratios_df.copy()

    p_values_long_df = pd.DataFrame(dm_results)

    # Pivot the p-values into a wide-format table.
    p_values_table = p_values_long_df.pivot(
        index='Indicator', columns='Horizon', values='p_value'
    )
    # Rename columns for clarity.
    p_values_table.columns = [f"p_val_{col}" for col in p_values_table.columns]

    # Join the p-values with the original RMSE ratio table.
    final_table = rmse_ratios_df.join(p_values_table)

    # Reorder columns to group ratios and p-values by horizon.
    ordered_cols = []
    for h in horizons:
        h_str = f'h={h}'
        if h_str in final_table.columns:
            ordered_cols.append(h_str)
            ordered_cols.append(f'p_val_{h_str}')
        # Also keep the sample size columns.
        if f'N_{h_str}' in final_table.columns:
            ordered_cols.append(f'N_{h_str}')

    final_table = final_table[ordered_cols]

    print("\n--- Final Evaluation Table (RMSE Ratios and DM p-values) ---")
    print(final_table)

    return final_table


In [None]:
# Task 19 — Construct the lexicon-based baseline indicator (German articles only)

# ===============================================================================
# Task 19: Construct the lexicon-based baseline indicator (German articles only)
# ===============================================================================

# ------------------------------------------------------------------------------
# Task 19, Step 1: Prepare the Barbaglia et al. (2025) lexicon
# ------------------------------------------------------------------------------

def _load_and_prepare_lexicon(
    lexicon_path: str
) -> Tuple[Set[str], Set[str]]:
    """
    Loads and prepares the translated German sentiment lexicon.

    Purpose:
    This function reads the pre-translated lexicon file, validates its structure,
    and processes it into a data structure optimized for fast lookups during the
    scoring phase. It separates the words into distinct sets for positive and
    negative sentiment.

    Inputs:
        lexicon_path (str): The file path to the CSV containing the translated
                            lexicon. The CSV must have 'word_de' and 'sentiment'
                            columns.

    Processes:
    1.  Loads the lexicon from the specified CSV path.
    2.  Validates the presence of the required 'word_de' and 'sentiment' columns.
    3.  Converts all words in the 'word_de' column to lowercase for case-insensitive matching.
    4.  Filters the lexicon to create a set of unique positive words.
    5.  Filters the lexicon to create a set of unique negative words.

    Outputs:
        A tuple containing:
        - (Set[str]): A set of lowercase positive German words.
        - (Set[str]): A set of lowercase negative German words.
    """
    # --- 1. Load and Validate Lexicon ---
    try:
        lexicon_df = pd.read_csv(lexicon_path)
    except FileNotFoundError:
        raise FileNotFoundError(f"Lexicon file not found at: {lexicon_path}")

    if 'word_de' not in lexicon_df.columns or 'sentiment' not in lexicon_df.columns:
        raise ValueError("Lexicon CSV must contain 'word_de' and 'sentiment' columns.")

    # --- 2. Prepare Word Sets ---
    # Convert words to lowercase for consistent matching.
    lexicon_df['word_de'] = lexicon_df['word_de'].str.lower()

    # Create a set of positive words for efficient lookup.
    positive_words = set(lexicon_df[lexicon_df['sentiment'] == 'positive']['word_de'].dropna())

    # Create a set of negative words for efficient lookup.
    negative_words = set(lexicon_df[lexicon_df['sentiment'] == 'negative']['word_de'].dropna())

    print(f"Loaded lexicon: {len(positive_words)} positive words, {len(negative_words)} negative words.")

    return positive_words, negative_words

# ------------------------------------------------------------------------------
# Task 19, Step 2: Score German articles and compute monthly aggregates
# ------------------------------------------------------------------------------

def _score_articles_and_aggregate(
    german_articles_df: pd.DataFrame,
    positive_words: Set[str],
    negative_words: Set[str]
) -> pd.DataFrame:
    """
    Scores German articles based on lexicon word counts and aggregates to monthly.

    Purpose:
    This function implements the core logic of the lexicon-based approach. It
    tokenizes each article, counts the occurrences of positive and negative
    words, calculates a sentiment score for each article, and then computes the
    average score for each month to create the final time-series indicator.

    Inputs:
        german_articles_df (pd.DataFrame): A DataFrame of relevant German articles.
        positive_words (Set[str]): A set of positive sentiment words.
        negative_words (Set[str]): A set of negative sentiment words.

    Processes:
    1.  Defines a scoring function that:
        a. Takes a text, tokenizes it by splitting on whitespace.
        b. Counts positive and negative words.
        c. Calculates a polarity score: (Pos - Neg) / (Pos + Neg).
    2.  Applies this scoring function to every article's 'full_text'.
    3.  Calls a helper to aggregate the resulting article-level scores into a
        monthly time series.

    Outputs:
        (pd.DataFrame): A monthly time series DataFrame for the lexicon indicator.
    """
    # --- 1. Define the Article Scoring Function ---
    def calculate_lexicon_score(text: str) -> float:
        """Calculates the polarity score for a single piece of text."""
        # Simple whitespace tokenization and lowercasing.
        tokens = text.lower().split()

        # Count positive and negative words.
        pos_count = sum(1 for token in tokens if token in positive_words)
        neg_count = sum(1 for token in tokens if token in negative_words)

        # Calculate the polarity score. Add a small epsilon to avoid division by zero.
        # Formula: (Positive - Negative) / (Positive + Negative)
        total_sentiment_words = pos_count + neg_count
        if total_sentiment_words == 0:
            return 0.0 # Neutral if no sentiment words are found.

        return (pos_count - neg_count) / total_sentiment_words

    # --- 2. Apply Scoring to All Articles ---
    # Initialize tqdm for pandas .apply() to show a progress bar.
    tqdm.pandas(desc="Scoring articles with lexicon")

    # Create a new column with the calculated score for each article.
    # This is the most computationally intensive step.
    df = german_articles_df.copy()
    df['lexicon_score'] = df['full_text'].progress_apply(calculate_lexicon_score)

    # --- 3. Aggregate to Monthly Frequency ---
    # Rename the score column to match the input expected by the aggregation helper.
    df.rename(columns={'lexicon_score': 'p_pos'}, inplace=True)

    # Use the same aggregation helper from Task 14 for consistency.
    monthly_lexicon_indicator = _aggregate_scores_to_monthly(df, suffix='_lexicon')

    return monthly_lexicon_indicator

# ------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# ------------------------------------------------------------------------------

def construct_lexicon_baseline_indicator(
    df_relevant: pd.DataFrame,
    lexicon_path: str
) -> pd.DataFrame:
    """
    Orchestrates the construction of the lexicon-based sentiment indicator.

    This function serves as a self-contained pipeline to create the baseline
    sentiment indicator described in Appendix A.3. It loads a translated lexicon,
    applies it to score all relevant German-language articles, and aggregates
    these scores into a final monthly time series.

    Args:
        df_relevant (pd.DataFrame): The DataFrame of all economics-relevant articles.
        lexicon_path (str): The file path to the translated German lexicon CSV.

    Returns:
        (pd.DataFrame): A monthly time series DataFrame containing the lexicon-based
                        indicator and the corresponding monthly article counts. This
                        DataFrame is ready to be merged with other indicators for
                        the forecasting exercise.
    """
    # --- 1. Load and Prepare the Lexicon ---
    positive_words, negative_words = _load_and_prepare_lexicon(lexicon_path)

    # --- 2. Filter for German Articles ---
    # The paper specifies that this baseline is constructed on German articles only.
    german_articles_df = df_relevant[df_relevant['language'] == 'de'].copy()

    if german_articles_df.empty:
        print("Warning: No German articles found in the relevant set. Cannot construct lexicon baseline.")
        return pd.DataFrame()

    # --- 3. Score Articles and Aggregate ---
    # This function handles the core computation.
    monthly_indicator = _score_articles_and_aggregate(
        german_articles_df,
        positive_words,
        negative_words
    )

    print("Successfully constructed the lexicon-based baseline indicator.")

    return monthly_indicator


In [None]:
# Task 20 — Compute correlation benchmarks (Appendix A.2 replication)

# ==============================================================================
# Task 20: Compute correlation benchmarks (Appendix A.2 replication)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 20, Step 1: Transform monthly indicators to quarterly via three-month averaging
# ------------------------------------------------------------------------------

def _average_monthly_to_quarterly(
    monthly_indicators: Dict[str, pd.DataFrame]
) -> pd.DataFrame:
    """
    Transforms monthly indicators to quarterly by simple three-month averaging.

    Purpose:
    This function prepares the indicator data specifically for the correlation
    analysis in Appendix A.2. Unlike the forecasting task which uses a selection
    rule, this analysis requires a simple average of the three monthly values
    within each quarter.

    Inputs:
        monthly_indicators (Dict[str, pd.DataFrame]): A dictionary where keys are
            indicator names and values are the corresponding monthly time-series
            DataFrames.

    Processes:
    1.  Initializes an empty list to store the quarterly-averaged series.
    2.  Iterates through each monthly indicator DataFrame provided.
    3.  Uses the `resample('QS-JAN').mean()` method to perform the quarterly averaging.
    4.  Appends the resulting quarterly series to the list.
    5.  Concatenates all quarterly series into a single, aligned DataFrame.

    Outputs:
        (pd.DataFrame): A single DataFrame containing all indicators at a
                        quarterly frequency, created by averaging.
    """
    quarterly_series_list = []

    # Iterate through each monthly indicator provided in the input dictionary.
    for name, df in monthly_indicators.items():
        # Use resample('QS-JAN') to group by calendar quarter, then take the mean.
        # This correctly handles quarters with missing months (e.g., averaging over 2 instead of 3 values).
        quarterly_series = df.resample('QS-JAN').mean()
        # Rename the column to reflect the indicator name.
        quarterly_series.columns = [name]
        quarterly_series_list.append(quarterly_series)

    # Join all the quarterly series into a single DataFrame.
    # The 'outer' join ensures that the full time range is preserved.
    all_quarterly_indicators = pd.concat(quarterly_series_list, axis=1, join='outer')

    return all_quarterly_indicators

# ------------------------------------------------------------------------------
# Task 20, Steps 2 & 3: Compute lagged correlations and tabulate results
# ------------------------------------------------------------------------------

def _calculate_and_tabulate_correlations(
    quarterly_indicators: pd.DataFrame,
    gdp_series: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Computes and tabulates lagged correlations between indicators and GDP growth.

    Purpose:
    This function replicates the analysis in Table 2 of the paper. It calculates
    the Pearson correlation between various indicators (at lags 0 to 4) and two
    measures of GDP growth (year-on-year and quarter-on-quarter). It then
    assembles these results into two publication-ready tables.

    Inputs:
        quarterly_indicators (pd.DataFrame): The quarterly-averaged indicators.
        gdp_series (pd.DataFrame): A DataFrame with 'yoy_gdp' and 'qoq_gdp' columns.

    Processes:
    1.  Defines the lags to be computed (0 to 4).
    2.  Iterates through each indicator and each lag.
    3.  For each combination, it creates a lagged version of the indicator.
    4.  It then computes the correlation between the lagged indicator and both
        yoy and qoq GDP growth, ensuring that the calculation for each pair
        only uses the common set of available data points.
    5.  Stores the results in a long-format list.
    6.  Converts the list to a DataFrame and pivots it to create the final
        wide-format tables, adding the summary average columns.

    Outputs:
        A tuple containing:
        - (pd.DataFrame): The correlation table for year-on-year (yoy) GDP growth.
        - (pd.DataFrame): The correlation table for quarter-on-quarter (qoq) GDP growth.
    """
    # This list will store the raw correlation results in a long format.
    correlation_results = []
    lags = range(5) # Lags 0, 1, 2, 3, 4

    # Iterate through each indicator column.
    for indicator_col in quarterly_indicators.columns:
        # Iterate through each lag.
        for lag in lags:
            # Create the lagged indicator series.
            lagged_indicator = quarterly_indicators[indicator_col].shift(lag)

            # --- Correlate with YoY GDP ---
            # Combine the two series and drop NaNs to find the common sample.
            yoy_aligned = pd.concat([gdp_series['yoy_gdp'], lagged_indicator], axis=1).dropna()
            # Compute correlation on the aligned data.
            yoy_corr = yoy_aligned.corr().iloc[0, 1] if len(yoy_aligned) > 1 else np.nan

            correlation_results.append({
                'Indicator': indicator_col,
                'GDP_Type': 'yoy',
                'Lag': lag,
                'Correlation': yoy_corr
            })

            # --- Correlate with QoQ GDP ---
            # Repeat the process for quarter-on-quarter GDP.
            qoq_aligned = pd.concat([gdp_series['qoq_gdp'], lagged_indicator], axis=1).dropna()
            qoq_corr = qoq_aligned.corr().iloc[0, 1] if len(qoq_aligned) > 1 else np.nan

            correlation_results.append({
                'Indicator': indicator_col,
                'GDP_Type': 'qoq',
                'Lag': lag,
                'Correlation': qoq_corr
            })

    # --- Assemble the Final Tables (Step 3) ---
    results_df = pd.DataFrame(correlation_results)

    # Create a helper function to build the final table for a given GDP type.
    def build_table(gdp_type: str) -> pd.DataFrame:
        # Filter for the specific GDP type.
        df_subset = results_df[results_df['GDP_Type'] == gdp_type]

        # Pivot from long to wide format.
        table = df_subset.pivot(index='Indicator', columns='Lag', values='Correlation')

        # Calculate summary average columns.
        if 0 in table.columns and 1 in table.columns:
            table['AVG(0:1)'] = table[[0, 1]].mean(axis=1)
        if all(l in table.columns for l in lags):
            table['AVG(0:4)'] = table[list(lags)].mean(axis=1)

        return table

    # Build the two final tables.
    yoy_table = build_table('yoy')
    qoq_table = build_table('qoq')

    return yoy_table, qoq_table

# ------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# ------------------------------------------------------------------------------

def compute_correlation_benchmarks(
    monthly_neos_df: pd.DataFrame,
    monthly_indicator_df: pd.DataFrame,
    lexicon_baseline_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates the full correlation analysis to replicate Appendix A.2.

    This function manages the entire workflow for the descriptive correlation
    analysis. It first transforms all monthly indicators to a quarterly frequency
    using simple averaging, then computes their lagged correlations with both
    year-on-year and quarter-on-quarter GDP growth, and finally formats the
    results into two publication-ready tables.

    Args:
        monthly_neos_df (pd.DataFrame): Monthly NEOS indicators.
        monthly_indicator_df (pd.DataFrame): Monthly comparator indicators (PMI, KOF).
        lexicon_baseline_df (pd.DataFrame): Monthly lexicon-based indicator.
        raw_macro_data_df (pd.DataFrame): Raw quarterly data containing GDP series.

    Returns:
        A tuple containing:
        - (pd.DataFrame): The correlation table for year-on-year (yoy) GDP growth.
        - (pd.DataFrame): The correlation table for quarter-on-quarter (qoq) GDP growth.
    """
    # --- 1. Prepare Monthly Indicators for Averaging ---
    # Combine all monthly series into a dictionary for processing.
    # We only need the main indicator columns, not the counts.
    monthly_indicators_to_process = {
        'NEOS': monthly_neos_df[['NEOS_baseline']],
        'Manufacturing_PMI': monthly_indicator_df[['manufacturing_pmi']],
        'Service_PMI': monthly_indicator_df[['service_pmi']],
        'KOF_Business_Situation': monthly_indicator_df[['kof_biz_situation']],
        'Lexicon_Baseline': lexicon_baseline_df[['NEOS_lexicon']]
    }
    # Add early NEOS variants as well.
    for col in monthly_neos_df.columns:
        if '_d' in col and 'NEOS' in col:
            monthly_indicators_to_process[col] = monthly_neos_df[[col]]

    # --- Step 1: Transform to Quarterly via Averaging ---
    quarterly_indicators = _average_monthly_to_quarterly(monthly_indicators_to_process)

    # --- 2. Prepare GDP Series ---
    # Assume yoy_gdp is present. Calculate qoq_gdp if not.
    gdp_df = raw_macro_data_df[['yoy_gdp_growth_sports_adj']].copy()
    gdp_df.rename(columns={'yoy_gdp_growth_sports_adj': 'yoy_gdp'}, inplace=True)

    # For qoq, we need an underlying GDP level series, which is not provided.
    # For this implementation, we will assume a placeholder 'qoq_gdp' column exists
    # in raw_macro_data_df or create it as NaNs if not.
    if 'qoq_gdp_growth' in raw_macro_data_df.columns:
         gdp_df['qoq_gdp'] = raw_macro_data_df['qoq_gdp_growth']
    else:
         print("Warning: Quarter-on-quarter GDP growth not found. QoQ correlations will be NaN.")
         gdp_df['qoq_gdp'] = np.nan

    # Add the quarterly SECO indicator to the set of indicators to test.
    if 'seco_consumer_sentiment_q' in raw_macro_data_df.columns:
        quarterly_indicators['SECO_Consumer_Sentiment'] = raw_macro_data_df['seco_consumer_sentiment_q']

    # --- Steps 2 & 3: Compute and Tabulate Correlations ---
    yoy_table, qoq_table = _calculate_and_tabulate_correlations(
        quarterly_indicators, gdp_df
    )

    print("\n--- Correlation Table with YoY GDP Growth ---")
    print(yoy_table.round(2))
    print("\n--- Correlation Table with QoQ GDP Growth ---")
    print(qoq_table.round(2))

    return yoy_table, qoq_table


In [None]:
# Task 21 — Generate diagnostics and figures (Charts 2, 3, 4, 5)

# ==============================================================================
# Task 21: Generate diagnostics and figures (Charts 2, 3, 4, 5)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 21, Step 2: Chart 3 — Time-series comparison
# ------------------------------------------------------------------------------

def _generate_chart3_timeseries_comparison(
    monthly_neos_df: pd.DataFrame,
    monthly_indicator_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame,
    output_path: str
) -> None:
    """
    Generates a time-series comparison plot (replication of Chart 3).

    This function plots the standardized NEOS indicator against other key
    monthly indicators (PMI, KOF) and overlays the quarterly year-on-year
    GDP growth as bars on a secondary axis.

    Args:
        monthly_neos_df: DataFrame with the monthly NEOS_baseline.
        monthly_indicator_df: DataFrame with monthly comparator indicators.
        raw_macro_data_df: DataFrame with quarterly GDP growth.
        output_path: File path to save the generated plot.
    """
    # --- 1. Prepare and Combine Data ---
    # Select the series for plotting.
    indicators_to_plot = {
        'NEOS': monthly_neos_df['NEOS_baseline'],
        'Manufacturing PMI': monthly_indicator_df['manufacturing_pmi'],
        'KOF Business Situation': monthly_indicator_df['kof_biz_situation'],
    }
    plot_df = pd.concat(indicators_to_plot, axis=1).dropna(how='all')

    # --- 2. Standardize (Z-score) the Indicators ---
    # Standardization is necessary to plot them on the same scale.
    scaler = StandardScaler()
    plot_df_scaled = pd.DataFrame(
        scaler.fit_transform(plot_df),
        index=plot_df.index,
        columns=plot_df.columns
    )

    # --- 3. Create the Plot ---
    fig, ax1 = plt.subplots(figsize=(15, 8))
    plt.style.use('seaborn-v0_8-whitegrid')

    # Plot the standardized indicators on the primary y-axis.
    plot_df_scaled.plot(ax=ax1, lw=1.5)
    ax1.set_ylabel('Index, standardised', fontsize=12)
    ax1.set_xlabel('')
    ax1.grid(True, which='major', linestyle='--', linewidth=0.5)

    # --- 4. Create and Plot on Secondary Axis ---
    # Create a secondary y-axis for the GDP growth bars.
    ax2 = ax1.twinx()
    gdp_series = raw_macro_data_df['yoy_gdp_growth_sports_adj']
    # Plot GDP growth as bars. Width is set to ~90 days to fill quarters.
    ax2.bar(gdp_series.index, gdp_series.values, width=90, alpha=0.3, color='gray', label='GDP, yoy (rhs)')
    ax2.set_ylabel('%', fontsize=12, rotation=0, labelpad=15)
    ax2.grid(False) # Turn off grid for the secondary axis.

    # --- 5. Finalize Aesthetics ---
    fig.suptitle('News-based Economic Outlook for Switzerland (NEOS)', fontsize=16, fontweight='bold')
    ax1.set_title('Comparison with survey-based indicators and real GDP growth', fontsize=12)

    # Combine legends from both axes.
    lines, labels = ax1.get_legend_handles_labels()
    bars, bar_labels = ax2.get_legend_handles_labels()
    ax1.legend(lines + bars, labels + bar_labels, loc='upper left')

    # Set x-axis limits and format.
    ax1.set_xlim(plot_df_scaled.index.min(), plot_df_scaled.index.max())
    fig.tight_layout(rect=[0, 0, 1, 0.95])

    # Save the figure.
    fig.savefig(output_path, dpi=300)
    print(f"Chart 3 saved to: {output_path}")
    plt.close(fig)

# ------------------------------------------------------------------------------
# Task 21, Step 3: Charts 4 and 5 — Timeliness and crisis diagnostics
# ------------------------------------------------------------------------------

def _generate_chart4_timeliness_diagnostic(
    daily_indicator_df: pd.DataFrame,
    output_path: str
) -> None:
    """
    Generates the daily month-to-date NEOS evolution plot (Chart 4).

    This function visualizes the high-frequency nature of the NEOS indicator
    by plotting its daily evolution within a specific crisis period and
    annotating it with key real-world events.

    Args:
        daily_indicator_df: The daily MTD NEOS series from Task 14.
        output_path: File path to save the generated plot.
    """
    # --- 1. Prepare Data ---
    # The paper uses a hypothetical Feb-Apr 2025 period for illustration.
    plot_df = daily_indicator_df.loc['2025-02-01':'2025-04-30'].copy()

    # --- 2. Create the Plot ---
    fig, ax = plt.subplots(figsize=(12, 7))
    plt.style.use('seaborn-v0_8-whitegrid')

    # Plot the daily MTD series.
    ax.plot(plot_df.index, plot_df['NEOS_mtd'], label='NEOS (month-to-date)')

    # Find month-end values to mark with diamonds.
    month_ends = plot_df.resample('M').last()
    ax.plot(month_ends.index, month_ends['NEOS_mtd'], 'D', ms=8, label='NEOS (monthly indicator)')

    # --- 3. Add Event Annotations ---
    # These are the hypothetical events from the paper's chart.
    events = {
        '2025-02-03': '[Feb 03] Tariffs paused',
        '2025-03-04': '[Mar 04] Tariffs go into effect',
        '2025-04-09': '[Apr 09] 90-day pause on "reciprocal" tariffs',
    }
    for date_str, label in events.items():
        date = pd.to_datetime(date_str)
        ax.axvline(x=date, color='r', linestyle='--', linewidth=1)
        ax.text(date + pd.Timedelta(days=1), ax.get_ylim()[1]*0.95, label,
                rotation=0, verticalalignment='top', color='r')

    # --- 4. Finalize Aesthetics ---
    ax.set_title('NEWS-BASED ECONOMIC OUTLOOK FOR SWITZERLAND (NEOS)', fontsize=14, fontweight='bold')
    ax.set_ylabel('Index, standardised')
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%B %Y'))
    ax.legend()
    fig.tight_layout()

    # Save the figure.
    fig.savefig(output_path, dpi=300)
    print(f"Chart 4 saved to: {output_path}")
    plt.close(fig)

def _generate_chart5_crisis_performance(
    forecast_errors_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame,
    output_path: str
) -> None:
    """
    Generates the cumulative squared error difference plot (Chart 5).

    This function visualizes when the NEOS model provides the most value. It
    plots the cumulative sum of the difference in squared forecast errors
    between the NEOS model and the AR(1) benchmark. A downward-sloping line
    indicates periods where NEOS is outperforming the benchmark.

    Args:
        forecast_errors_df: The long-format DataFrame of all forecast errors.
        raw_macro_data_df: DataFrame with quarterly GDP growth for context.
        output_path: File path to save the generated plot.
    """
    # --- 1. Prepare Data ---
    # Filter for the specific comparison: NEOS baseline vs. AR(1) at h=0.
    errors_h0 = forecast_errors_df[forecast_errors_df['horizon'] == 0]
    pivoted_errors = errors_h0.pivot_table(
        index='forecast_origin', columns='model', values='error'
    )

    # Select the relevant models and find the common sample.
    comparison_df = pivoted_errors[['NEOS_baseline', 'AR(1)']].dropna()

    # --- 2. Calculate Cumulative Squared Error Difference ---
    # Equation: Delta_t = e_NEOS^2 - e_AR(1)^2
    comparison_df['sq_error_diff'] = comparison_df['NEOS_baseline']**2 - comparison_df['AR(1)']**2
    # Equation: C_t = sum_{s<=t} Delta_s
    comparison_df['cumulative_sq_error_diff'] = comparison_df['sq_error_diff'].cumsum()

    # --- 3. Create the Plot ---
    fig, ax1 = plt.subplots(figsize=(15, 8))
    plt.style.use('seaborn-v0_8-whitegrid')

    # Plot the cumulative difference on the primary y-axis.
    ax1.plot(comparison_df.index, comparison_df['cumulative_sq_error_diff'],
             label='Cumulative squared error differences', color='tab:blue')
    ax1.set_ylabel('Cumulative squared error differences', fontsize=12)
    ax1.set_xlabel('')

    # --- 4. Add GDP Context on Secondary Axis ---
    ax2 = ax1.twinx()
    gdp_series = raw_macro_data_df['yoy_gdp_growth_sports_adj']
    ax2.plot(gdp_series.index, gdp_series, color='black', lw=1.5, alpha=0.7,
             label='Real GDP, yoy (rhs)')
    ax2.set_ylabel('%', fontsize=12, rotation=0, labelpad=15)
    ax2.axhline(0, color='gray', linestyle='--', lw=1) # Add zero line for GDP

    # --- 5. Finalize Aesthetics ---
    ax1.set_title('NEOS is a valuable predictor for Swiss GDP in times of crises', fontsize=16, fontweight='bold')
    fig.legend(loc='lower center', bbox_to_anchor=(0.5, -0.05), ncol=2)
    ax1.set_xlim(comparison_df.index.min(), comparison_df.index.max())
    fig.tight_layout(rect=[0, 0.05, 1, 0.95])

    # Save the figure.
    fig.savefig(output_path, dpi=300)
    print(f"Chart 5 saved to: {output_path}")
    plt.close(fig)

# ------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# ------------------------------------------------------------------------------

def generate_all_charts(
    # Inputs for Chart 2
    synthetic_corpus_path: str,
    synthetic_embeddings: np.ndarray,
    # Inputs for Chart 3
    monthly_neos_df: pd.DataFrame,
    monthly_indicator_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame,
    # Inputs for Chart 4
    daily_indicator_df: Optional[pd.DataFrame],
    # Inputs for Chart 5
    forecast_errors_df: pd.DataFrame,
    # General
    fused_master_input_specification: Dict[str, Any],
    output_directory: str
) -> Dict[str, str]:
    """
    Orchestrates the generation of all diagnostic and result figures.

    This function calls the dedicated plotting function for each of the four
    charts specified in the paper, saving each to a file.

    Args:
        synthetic_corpus_path: Path to the synthetic articles CSV.
        synthetic_embeddings: The (256, 1024) array of synthetic embeddings.
        monthly_neos_df: DataFrame with monthly NEOS indicators.
        monthly_indicator_df: DataFrame with monthly comparator indicators.
        raw_macro_data_df: DataFrame with quarterly GDP.
        daily_indicator_df: Optional DataFrame with daily MTD NEOS.
        forecast_errors_df: DataFrame with all POOS forecast errors.
        fused_master_input_specification: The master config.
        output_directory: The directory where all chart images will be saved.

    Returns:
        A dictionary mapping chart names to their output file paths.
    """
    # Ensure the output directory exists.
    os.makedirs(output_directory, exist_ok=True)
    chart_paths = {}

    # --- Generate Chart 2 (UMAP) ---
    # Note: This calls the remediated version of _create_umap_visualization
    # which is assumed to be available in the execution context.
    print("\n--- Generating Chart 2: UMAP Visualization ---")
    chart2_path = os.path.join(output_directory, 'chart2_umap_synthetic_embeddings.png')
    _create_umap_visualization(
        synthetic_embeddings=synthetic_embeddings,
        synthetic_df=pd.read_csv(synthetic_corpus_path),
        umap_params=fused_master_input_specification['master_config']['umap_diagnostic_params'],
        output_path=chart2_path
    )
    chart_paths['chart2'] = chart2_path

    # --- Generate Chart 3 (Time Series Comparison) ---
    print("\n--- Generating Chart 3: Time Series Comparison ---")
    chart3_path = os.path.join(output_directory, 'chart3_timeseries_comparison.png')
    _generate_chart3_timeseries_comparison(
        monthly_neos_df, monthly_indicator_df, raw_macro_data_df, chart3_path
    )
    chart_paths['chart3'] = chart3_path

    # --- Generate Chart 4 (Timeliness) ---
    if daily_indicator_df is not None:
        print("\n--- Generating Chart 4: Timeliness Diagnostic ---")
        chart4_path = os.path.join(output_directory, 'chart4_timeliness_diagnostic.png')
        _generate_chart4_timeliness_diagnostic(daily_indicator_df, chart4_path)
        chart_paths['chart4'] = chart4_path
    else:
        print("\nSkipping Chart 4: Daily MTD indicator data not provided.")

    # --- Generate Chart 5 (Crisis Performance) ---
    print("\n--- Generating Chart 5: Crisis Performance ---")
    chart5_path = os.path.join(output_directory, 'chart5_crisis_performance.png')
    _generate_chart5_crisis_performance(
        forecast_errors_df, raw_macro_data_df, chart5_path
    )
    chart_paths['chart5'] = chart5_path

    return chart_paths


In [None]:
# Task 22 — Build the end-to-end orchestrator function for the full NEOS pipeline

# ===============================================================================
# Task 22: Build the end-to-end orchestrator function for the full NEOS pipeline
# ===============================================================================

def _setup_artifact_paths(output_directory: str) -> Dict[str, Any]:
    """
    Creates a structured dictionary of all output file paths for the pipeline.

    Purpose:
    This helper function centralizes the management of all file paths for
    artifacts generated by the pipeline. It creates a nested directory structure
    (e.g., for data, models, results) within the main output directory. This
    approach avoids hardcoding paths throughout the orchestrator, making the
    pipeline more maintainable, configurable, and robust.

    Inputs:
        output_directory (str): The root directory where all artifacts will be saved.

    Processes:
    1.  Defines a dictionary mapping logical artifact names (e.g., "relevance_model")
        to their full, absolute file paths within a standardized subdirectory
        structure (`models/`, `data/`, `results/`).
    2.  Iterates through the defined paths and uses `os.makedirs` to create the
        necessary subdirectories, ensuring they exist before any files are written.

    Outputs:
        (Dict[str, Any]): A dictionary where keys are artifact names and values
                          are their corresponding file paths.
    """
    # --- Input Validation ---
    # Ensure the output directory is a valid string.
    if not isinstance(output_directory, str) or not output_directory:
        raise ValueError("`output_directory` must be a non-empty string.")

    # --- Define Canonical Paths for All Artifacts ---
    # This dictionary serves as the single source of truth for file locations.
    paths: Dict[str, str] = {
        "relevance_model": os.path.join(output_directory, 'models', 'relevance_classifier.keras'),
        "sentiment_model": os.path.join(output_directory, 'models', 'sentiment_classifier.joblib'),
        "embeddings_h5": os.path.join(output_directory, 'data', 'document_embeddings.h5'),
        "embeddings_crosswalk": os.path.join(output_directory, 'data', 'embeddings_crosswalk.csv'),
        "synthetic_corpus": os.path.join(output_directory, 'data', 'synthetic_corpus.csv'),
        "relevance_scores": os.path.join(output_directory, 'data', 'relevance_scores.feather'),
        "scored_articles": os.path.join(output_directory, 'data', 'scored_relevant_articles.feather'),
        "final_evaluation_table": os.path.join(output_directory, 'results', 'final_evaluation_results.csv'),
        "correlation_yoy": os.path.join(output_directory, 'results', 'correlation_yoy.csv'),
        "correlation_qoq": os.path.join(output_directory, 'results', 'correlation_qoq.csv'),
        "charts_dir": os.path.join(output_directory, 'results', 'charts'),
        "reproducibility_manifest": os.path.join(output_directory, 'reproducibility_manifest.json')
    }

    # --- Create Directory Structure ---
    # Iterate through the path values and ensure the parent directory for each file exists.
    for path in paths.values():
        # Extract the directory part of the path.
        dir_name = os.path.dirname(path)
        # Create the directory if it doesn't already exist. `exist_ok=True` prevents errors on re-runs.
        os.makedirs(dir_name, exist_ok=True)

    # Return the completed dictionary of paths.
    return paths


def _create_reproducibility_manifest(config: Dict[str, Any], paths: Dict[str, Any]) -> None:
    """
    Generates a JSON file documenting the run's configuration, artifacts, and environment.

    Purpose:
    This function creates a critical final artifact: a self-contained manifest
    that captures all the necessary information to understand and reproduce a
    pipeline run. It records a snapshot of the configuration, the versions of all
    key libraries, and the locations of all generated data and result files.

    Inputs:
        config (Dict[str, Any]): The `fused_master_input_specification` dictionary
                                 used for the run.
        paths (Dict[str, Any]): The dictionary of artifact paths generated by
                                `_setup_artifact_paths`.

    Processes:
    1.  Defines a list of key scientific and ML libraries.
    2.  Iterates through the list, dynamically importing each library and
        retrieving its `__version__` attribute. It handles cases where a library
        might not be installed.
    3.  Constructs a final manifest dictionary containing the configuration
        snapshot, the library versions, and the artifact paths.
    4.  Serializes this dictionary to a formatted JSON file.
    """
    # --- 1. Collect Library Versions ---
    # This provides a snapshot of the software environment for reproducibility.
    versions: Dict[str, str] = {}
    # Define the list of critical libraries to version track.
    libs_to_track = [
        'pandas', 'numpy', 'tensorflow', 'sklearn', 'statsmodels',
        'h5py', 'joblib', 'anthropic', 'umap', 'matplotlib', 'seaborn'
    ]
    # Iterate and safely retrieve the version for each library.
    for lib in libs_to_track:
        try:
            # Dynamically import the library.
            module = __import__(lib)
            # Get its version attribute.
            versions[lib] = module.__version__
        except (ImportError, AttributeError):
            # If the library is not found or has no version, record it as "unknown".
            versions[lib] = "unknown"

    # --- 2. Assemble the Manifest ---
    # The manifest is a dictionary containing all key provenance information.
    manifest = {
        "config_snapshot": config,
        "output_artifacts": paths,
        "library_versions": versions
    }

    # --- 3. Serialize to JSON ---
    # Write the manifest dictionary to a human-readable, indented JSON file.
    try:
        with open(paths['reproducibility_manifest'], 'w') as f:
            json.dump(manifest, f, indent=4)
    except IOError as e:
        # Handle potential file writing errors.
        logging.error(f"Failed to write reproducibility manifest: {e}")


def run_neos_pipeline(
    # --- Raw Data Inputs ---
    raw_news_data_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame,
    monthly_indicator_data_df: pd.DataFrame,
    release_calendar_df: pd.DataFrame,
    evaluation_windows_df: pd.DataFrame,
    # --- Configuration and Paths ---
    fused_master_input_specification: Dict[str, Any],
    output_directory: str,
    lexicon_path: str
) -> Dict[str, Any]:
    """
    Orchestrates the end-to-end execution of the full NEOS pipeline.

    Purpose:
    This function is the master controller for the entire project. It sequences
    all tasks from initial data validation to final figure generation. It manages
    the flow of data artifacts between tasks and implements checkpointing to allow
    for resumability, saving significant computation time on re-runs. It produces
    a comprehensive set of output artifacts and a reproducibility manifest.

    Inputs:
        raw_news_data_df (pd.DataFrame): The raw, unprocessed news corpus.
        raw_macro_data_df (pd.DataFrame): Raw quarterly macro data (GDP, etc.).
        monthly_indicator_data_df (pd.DataFrame): Raw monthly comparator indicators.
        release_calendar_df (pd.DataFrame): Metadata on indicator release timing.
        evaluation_windows_df (pd.DataFrame): Metadata on valid evaluation periods.
        fused_master_input_specification (Dict[str, Any]): The master config.
        output_directory (str): A root directory to save all generated artifacts.
        lexicon_path (str): Path to the translated German sentiment lexicon CSV.

    Outputs:
        (Dict[str, Any]): A dictionary containing a 'status' ('Success' or 'Failure'),
                          an 'error' message if applicable, and a dictionary of
                          'artifacts' mapping names to their file paths.
    """
    # --- Phase 0: Setup Logging and Path Management ---
    # Create the output directory if it doesn't exist.
    os.makedirs(output_directory, exist_ok=True)
    # Configure a logger to write to both a file and the console.
    log_file = os.path.join(output_directory, 'pipeline_run.log')
    # Remove existing handlers to avoid duplication in interactive environments (e.g., notebooks).
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
    # Set up the new configuration.
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] - %(message)s',
        handlers=[logging.FileHandler(log_file, mode='w'), logging.StreamHandler(sys.stdout)]
    )

    logging.info("Initializing NEOS pipeline orchestration.")
    # Generate and create the directory structure for all pipeline artifacts.
    artifact_paths = _setup_artifact_paths(output_directory)

    try:
        # --- Phase 1: Pre-flight Validation (Tasks 1, 2, 3) ---
        logging.info("--- [Phase 1/8] Running Pre-flight Validations ---")
        # Validate all configuration and data inputs before starting computation.
        config_val = validate_master_config(fused_master_input_specification)
        news_val = validate_raw_news_corpus(raw_news_data_df, fused_master_input_specification)
        macro_val = validate_macro_data(raw_macro_data_df, monthly_indicator_data_df, release_calendar_df, evaluation_windows_df)

        # Aggregate all validation issues.
        all_issues = config_val.get('issues', []) + news_val.get('errors', []) + news_val.get('warnings', []) + macro_val.get('issues', [])
        # Check if any issue is a critical error and halt execution if so.
        if any("Error" in str(issue) for issue in all_issues):
            logging.error("Pre-flight validation failed. Halting execution.")
            for issue in all_issues: logging.error(f"  - {issue}")
            raise ValueError("Input data or configuration failed validation.")
        logging.info("All inputs validated successfully.")

        # --- Phase 2: Data Cleansing and Preparation (Tasks 4, 5) ---
        logging.info("--- [Phase 2/8] Cleansing and Preparing Corpus ---")
        df_clean, audit_log = cleanse_news_corpus(raw_news_data_df, fused_master_input_specification)
        logging.info(f"Cleansing complete. Final count: {audit_log.get('final_article_count', 'N/A')}. Full audit log generated.")
        df_prepared, _ = prepare_corpus_for_embedding(df_clean, raw_news_data_df)
        logging.info("Temporal features added to corpus.")

        # --- Phase 3: Feature Engineering: Embeddings (Task 6) ---
        logging.info("--- [Phase 3/8] Generating Document Embeddings ---")
        # Checkpointing: Skip this expensive step if outputs already exist.
        if not os.path.exists(artifact_paths['embeddings_h5']) or not os.path.exists(artifact_paths['embeddings_crosswalk']):
            df_prepared, _, _, _ = generate_document_embeddings(
                df_prepared, fused_master_input_specification, os.path.dirname(artifact_paths['embeddings_h5'])
            )
            logging.info("Embedding generation complete.")
        else:
            logging.info("Embeddings and crosswalk files already exist. Skipping generation.")

        # --- Phase 4: Relevance Model (Tasks 7, 8, 9) ---
        logging.info("--- [Phase 4/8] Training and Applying Relevance Model ---")
        # Checkpointing: Skip model training if the model file already exists.
        if not os.path.exists(artifact_paths['relevance_model']):
            X_train, y_train, X_val, y_val = prepare_relevance_training_data(
                df_prepared, artifact_paths['embeddings_crosswalk'], artifact_paths['embeddings_h5'], fused_master_input_specification
            )
            _, train_results = train_relevance_classifier(
                X_train, y_train, X_val, y_val, fused_master_input_specification, artifact_paths['relevance_model']
            )
            logging.info(f"Relevance model training complete. Validation AUC: {train_results['validation_metrics'].get('auc', 'N/A'):.4f}")
        else:
            logging.info("Relevance model already exists. Skipping training.")

        # Apply the classifier to filter the corpus. This step always runs.
        df_relevant, _, relevance_audit = filter_corpus_by_relevance(
            df_prepared, artifact_paths['relevance_model'], artifact_paths['embeddings_h5'],
            artifact_paths['embeddings_crosswalk'], fused_master_input_specification, os.path.dirname(artifact_paths['relevance_scores'])
        )
        logging.info(f"Corpus filtered for relevance. {relevance_audit.get('total_relevant_articles', 'N/A')} articles remain.")

        # --- Phase 5: Sentiment Model (Tasks 10, 11, 12) ---
        logging.info("--- [Phase 5/8] Training Sentiment Model ---")
        # Checkpointing: Skip synthetic data generation if it exists.
        if not os.path.exists(artifact_paths['synthetic_corpus']):
            generate_synthetic_articles(fused_master_input_specification, artifact_paths['synthetic_corpus'])
        else:
            logging.info("Synthetic corpus already exists. Skipping generation.")

        # Embed the synthetic data. This is fast and always runs.
        synthetic_embeddings, synthetic_labels, _ = process_synthetic_embeddings(
            artifact_paths['synthetic_corpus'], fused_master_input_specification, artifact_paths['charts_dir']
        )

        # Checkpointing: Skip sentiment model training if it exists.
        if not os.path.exists(artifact_paths['sentiment_model']):
            _, sent_train_results = train_sentiment_classifier(
                synthetic_embeddings, synthetic_labels, fused_master_input_specification, artifact_paths['sentiment_model']
            )
            logging.info(f"Sentiment model training complete. Optimal C: {sent_train_results.get('optimal_c', 'N/A')}")
        else:
            logging.info("Sentiment model already exists. Skipping training.")

        # --- Phase 6: Indicator Construction (Tasks 13, 14) ---
        logging.info("--- [Phase 6/8] Scoring Articles and Constructing Indicators ---")
        score_relevant_articles(
            df_relevant, artifact_paths['sentiment_model'], artifact_paths['embeddings_h5'],
            artifact_paths['embeddings_crosswalk'], artifact_paths['scored_articles']
        )
        monthly_neos_df, daily_indicator_df = construct_neos_indicators(
            artifact_paths['scored_articles'], fused_master_input_specification
        )
        logging.info("Monthly and daily NEOS indicators constructed.")

        # --- Phase 7: Econometric Evaluation (Tasks 15-20) ---
        logging.info("--- [Phase 7/8] Performing Econometric Evaluation ---")
        lexicon_baseline_df = construct_lexicon_baseline_indicator(df_relevant, lexicon_path)
        forecasting_df = prepare_forecasting_dataset(
            monthly_neos_df, monthly_indicator_data_df.join(lexicon_baseline_df, how='outer'), raw_macro_data_df, fused_master_input_specification
        )
        forecast_errors_df = execute_poos_forecasts(forecasting_df, fused_master_input_specification, evaluation_windows_df)
        rmse_ratios_df = compute_rmse_ratios(forecast_errors_df)
        final_eval_table = perform_diebold_mariano_tests(forecast_errors_df, rmse_ratios_df)
        final_eval_table.to_csv(artifact_paths['final_evaluation_table'])
        logging.info(f"Final evaluation table saved to {artifact_paths['final_evaluation_table']}")

        yoy_corr, qoq_corr = compute_correlation_benchmarks(
            monthly_neos_df, monthly_indicator_data_df, lexicon_baseline_df, raw_macro_data_df
        )
        yoy_corr.to_csv(artifact_paths['correlation_yoy'])
        qoq_corr.to_csv(artifact_paths['correlation_qoq'])
        logging.info("Correlation tables generated.")

        # --- Phase 8: Final Diagnostics and Figures (Task 21) ---
        logging.info("--- [Phase 8/8] Generating Final Figures ---")
        generate_all_charts(
            synthetic_corpus_path=artifact_paths['synthetic_corpus'], synthetic_embeddings=synthetic_embeddings,
            monthly_neos_df=monthly_neos_df, monthly_indicator_df=monthly_indicator_data_df,
            raw_macro_data_df=raw_macro_data_df, daily_indicator_df=daily_indicator_df,
            forecast_errors_df=forecast_errors_df, fused_master_input_specification=fused_master_input_specification,
            output_directory=artifact_paths['charts_dir']
        )
        logging.info(f"All charts saved to {artifact_paths['charts_dir']}")

        # --- Final Step: Create Reproducibility Manifest ---
        _create_reproducibility_manifest(fused_master_input_specification, artifact_paths)
        logging.info(f"Reproducibility manifest saved to {artifact_paths['reproducibility_manifest']}")

        logging.info("NEOS pipeline completed successfully.")
        # Return a dictionary indicating success and providing the artifact locations.
        return {"status": "Success", "artifacts": artifact_paths}

    except Exception as e:
        # Catch any exception that occurs during the pipeline execution.
        logging.error(f"Pipeline failed with a critical error: {e}", exc_info=True)
        # Return a dictionary indicating failure and the error message.
        return {"status": "Failure", "error": str(e), "artifacts": artifact_paths}


In [None]:
# Task 23: Conduct robustness analyses using the orchestrator

# ==============================================================================
# Task 23: Conduct robustness analyses using the orchestrator
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 23, Helper for Aggregating Results
# ------------------------------------------------------------------------------

def _aggregate_sensitivity_results(
    base_output_dir: str
) -> pd.DataFrame:
    """
    Aggregates final evaluation results from multiple sensitivity run directories.

    Purpose:
    After the main orchestrator has been run multiple times with different
    parameters, this function systematically scans the output directories,
    reads the `final_evaluation_results.csv` from each, and combines them into a
    single master DataFrame. This allows for easy comparison of the key
    performance metrics (RMSE ratios, p-values) across all experimental runs.

    Inputs:
        base_output_dir (str): The root directory containing all the individual
                               sensitivity run subdirectories.

    Outputs:
        (pd.DataFrame): A single DataFrame containing the aggregated results from
                        all sensitivity analyses, with additional columns
                        identifying the parameters of each run.
    """
    # This list will hold the DataFrames from each sensitivity run.
    all_results: List[pd.DataFrame] = []

    # Walk through the subdirectories of the base output directory.
    for root, dirs, files in os.walk(base_output_dir):
        # Define the expected results file.
        results_filename = 'final_evaluation_results.csv'
        # Check if the results file exists in the current directory.
        if results_filename in files:
            try:
                # Construct the full path to the results file.
                results_path = os.path.join(root, results_filename)
                # Read the evaluation table from the CSV.
                results_df = pd.read_csv(results_path)
                # Extract the unique run name from the directory path.
                run_name = os.path.basename(root)
                # Add a column to identify which sensitivity run these results belong to.
                results_df['sensitivity_run'] = run_name
                all_results.append(results_df)
            except Exception as e:
                # Log a warning if a results file is found but cannot be processed.
                logging.warning(f"Could not read or process results from {root}: {e}")

    # Return an empty DataFrame if no results were found.
    if not all_results:
        logging.warning("No sensitivity results found to aggregate.")
        return pd.DataFrame()

    # Concatenate all the collected DataFrames into a single master table.
    aggregated_df = pd.concat(all_results, ignore_index=True)
    # Set a multi-index for easier slicing and analysis of the results.
    if 'Indicator' in aggregated_df.columns:
        aggregated_df.set_index(['sensitivity_run', 'Indicator'], inplace=True)

    return aggregated_df

# ------------------------------------------------------------------------------
# Task 23, Helper for a Single Parameter-based Run
# ------------------------------------------------------------------------------

def _run_single_parameter_sensitivity(
    param_path: Tuple[str, ...],
    param_value: Any,
    run_name: str,
    base_output_dir: str,
    base_config: Dict[str, Any],
    **kwargs: Any
) -> None:
    """
    Executes a single, isolated run of the NEOS pipeline with one modified parameter.

    Purpose:
    This helper function is the core engine for the parameter sensitivity analysis.
    It provides a robust and reproducible way to test the impact of changing a
    single hyperparameter. It ensures complete isolation between runs by creating
    a unique output directory and working on a deep copy of the master
    configuration, thus preventing any side effects or data contamination.

    Inputs:
        param_path (Tuple[str, ...]): A tuple of strings representing the nested
            keys to access the parameter to be modified within the master config.
            For example: ('master_config', 'relevance_model_params', 'classification_threshold').
        param_value (Any): The new value to assign to the specified parameter.
        run_name (str): A unique name for this sensitivity run, used to create
                        the output subdirectory (e.g., "sensitivity_tau_0.4").
        base_output_dir (str): The root directory where the subdirectory for this
                               run will be created.
        base_config (Dict[str, Any]): The baseline `fused_master_input_specification`
                                      dictionary, which will be deep-copied.
        **kwargs (Any): All other keyword arguments required by the main
                        `run_neos_pipeline` function (e.g., the raw DataFrames
                        and lexicon path).

    Processes:
    1.  Logs the start of the specific sensitivity run.
    2.  Creates a unique, isolated output directory for this run's artifacts.
    3.  Performs a deep copy of the base configuration to ensure the original
        config remains unmodified.
    4.  Programmatically navigates the nested configuration dictionary using the
        `param_path` and updates the target parameter with `param_value`.
    5.  Calls the main `run_neos_pipeline` orchestrator, passing the modified
        configuration, the unique output directory, and all other necessary data.

    Outputs:
        None: This function does not return a value. Its primary effect is the
              execution of the full pipeline, which generates a complete set of
              artifacts within the specified `run_output_dir`.

    Error Handling:
        - Raises KeyError if the `param_path` is invalid for the given `base_config`.
        - Propagates any exceptions raised by the underlying `run_neos_pipeline`.
    """
    # --- Log the start of this specific experimental run ---
    logging.info(f"\n--- Running Sensitivity Analysis For: {run_name} ---")

    # --- 1. Create an Isolated Environment for the Run ---
    # Define a unique output directory for this run to prevent artifact collision.
    run_output_dir = os.path.join(base_output_dir, run_name)
    # A deep copy is essential to prevent modifications in this run from affecting subsequent runs.
    run_config = copy.deepcopy(base_config)

    # --- 2. Modify the Configuration Parameter ---
    # Navigate through the nested dictionary structure to reach the target parameter.
    try:
        # Start at the top level of the configuration dictionary.
        config_level = run_config
        # Traverse the path until the second-to-last key.
        for key in param_path[:-1]:
            config_level = config_level[key]
        # Use the final key to set the new parameter value.
        config_level[param_path[-1]] = param_value
        logging.info(f"Modified parameter '{'.'.join(param_path)}' to '{param_value}'.")
    except KeyError as e:
        # If the path is invalid, log a critical error and raise it.
        logging.error(f"Invalid parameter path provided: {param_path}. Key not found: {e}")
        raise e

    # --- 3. Execute the Full Pipeline with the Modified Config ---
    # Call the main orchestrator with the modified config and unique output directory.
    # All other data inputs are passed through via **kwargs.
    run_neos_pipeline(
        fused_master_input_specification=run_config,
        output_directory=run_output_dir,
        **kwargs
    )

# ------------------------------------------------------------------------------
# Task 23, Orchestrator for Robustness Analyses
# ------------------------------------------------------------------------------

def run_robustness_analyses(
    # --- Base Data Inputs (same as main pipeline) ---
    raw_news_data_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame,
    monthly_indicator_data_df: pd.DataFrame,
    release_calendar_df: pd.DataFrame,
    evaluation_windows_df: pd.DataFrame,
    # --- Base Configuration and Paths ---
    base_config: Dict[str, Any],
    base_output_dir: str,
    lexicon_path: str
) -> pd.DataFrame:
    """
    Orchestrates a series of sensitivity analyses by re-running the main pipeline.

    Purpose:
    This "meta-orchestrator" systematically tests the robustness of the main
    findings to changes in key methodological choices. It does this by creating
    modified versions of the configuration or input data and executing the
    entire `run_neos_pipeline` for each variation. Finally, it aggregates the
    results for comparison.

    Args:
        (various): All the standard inputs required by `run_neos_pipeline`.
        base_config (Dict[str, Any]): The baseline `fused_master_input_specification`.
        base_output_dir (str): A root directory where subdirectories for each
                               sensitivity run will be created.

    Returns:
        (pd.DataFrame): A summary DataFrame comparing the final evaluation
                        metrics across all sensitivity runs.
    """
    logging.info("====== STARTING ROBUSTNESS ANALYSIS PIPELINE ======")

    # Package the base data inputs for repeated calls to the main pipeline.
    base_kwargs = {
        "raw_news_data_df": raw_news_data_df,
        "raw_macro_data_df": raw_macro_data_df,
        "monthly_indicator_data_df": monthly_indicator_data_df,
        "release_calendar_df": release_calendar_df,
        "evaluation_windows_df": evaluation_windows_df,
        "lexicon_path": lexicon_path
    }

    # --- Step 1: Parameter Sensitivity ---
    logging.info("\n--- [Robustness 1/3] Testing Parameter Sensitivity ---")

    # 1a: Varying the relevance classification threshold (tau)
    for threshold in [0.4, 0.5, 0.6]:
        _run_single_parameter_sensitivity(
            param_path=('master_config', 'relevance_model_params', 'classification_threshold'),
            param_value=threshold,
            run_name=f"sensitivity_tau_{threshold}",
            base_output_dir=base_output_dir,
            base_config=base_config,
            **base_kwargs
        )

    # 1b: Varying the POOS initial window size (covers Step 3)
    for window_size in [6, 8, 10, 12]:
        _run_single_parameter_sensitivity(
            param_path=('master_config', 'econometric_validation_params', 'poos_initial_window_quarters'),
            param_value=window_size,
            run_name=f"sensitivity_poos_window_{window_size}",
            base_output_dir=base_output_dir,
            base_config=base_config,
            **base_kwargs
        )

    # --- Step 2: Data-Scope Sensitivity ---
    logging.info("\n--- [Robustness 2/3] Testing Data-Scope Sensitivity ---")

    # 2a: German-only vs. French-only NEOS
    for lang in ['de', 'fr']:
        run_name = f"sensitivity_lang_{lang}_only"
        logging.info(f"\n--- Running analysis for: {run_name} ---")
        run_output_dir = os.path.join(base_output_dir, run_name)
        lang_only_df = raw_news_data_df[raw_news_data_df['language'] == lang].copy()
        run_kwargs = base_kwargs.copy()
        run_kwargs['raw_news_data_df'] = lang_only_df
        run_neos_pipeline(
            fused_master_input_specification=base_config,
            output_directory=run_output_dir,
            **run_kwargs
        )

    # 2b: Truncation Sensitivity Analysis
    logging.info("\n--- Running analysis for: Truncation Sensitivity ---")
    run_name = "sensitivity_truncation_filtered"
    run_output_dir = os.path.join(base_output_dir, run_name)
    # Stage A: Generate metadata to identify high-truncation months.
    logging.info("Truncation analysis: Stage A - Generating metadata...")
    temp_output_dir = os.path.join(base_output_dir, "temp_truncation_meta")
    df_clean_temp, _ = cleanse_news_corpus(raw_news_data_df, base_config)
    df_prepared_temp, _, _, _ = generate_document_embeddings(
        df_clean_temp, base_config, temp_output_dir
    )
    # Stage B: Identify months with truncation rate > 10%.
    logging.info("Truncation analysis: Stage B - Identifying high-truncation months...")
    monthly_truncation_rate = df_prepared_temp.groupby('year_month')['truncation_flag'].mean()
    high_truncation_months = monthly_truncation_rate[monthly_truncation_rate > 0.10].index.tolist()
    logging.info(f"Found {len(high_truncation_months)} months with >10% truncation rate to exclude.")

    # Stage C: Filter the original raw data.
    logging.info("Truncation analysis: Stage C - Filtering raw data...")
    raw_df_copy = raw_news_data_df.copy()
    raw_df_copy['year_month'] = raw_df_copy['publication_datetime_utc'].dt.strftime('%Y-%m')
    filtered_raw_df = raw_df_copy[~raw_df_copy['year_month'].isin(high_truncation_months)].drop(columns=['year_month'])

    # Stage D: Execute the full pipeline on the filtered data.
    logging.info("Truncation analysis: Stage D - Executing full pipeline on filtered data...")
    run_kwargs = base_kwargs.copy()
    run_kwargs['raw_news_data_df'] = filtered_raw_df
    run_neos_pipeline(
        fused_master_input_specification=base_config,
        output_directory=run_output_dir,
        **run_kwargs
    )

    # --- Final Step: Aggregate All Results ---
    logging.info("\n--- Aggregating all sensitivity analysis results ---")
    summary_df = _aggregate_sensitivity_results(base_output_dir)

    # Save the final summary table.
    summary_path = os.path.join(base_output_dir, 'robustness_summary_results.csv')
    summary_df.to_csv(summary_path)

    logging.info(f"====== ROBUSTNESS ANALYSIS PIPELINE COMPLETE ======")
    logging.info(f"Final summary of all runs saved to: {summary_path}")

    return summary_df


In [None]:
# Top-Level Orchestrator

def run_complete_neos_study(
    # --- Raw Data Inputs ---
    raw_news_data_df: pd.DataFrame,
    raw_macro_data_df: pd.DataFrame,
    monthly_indicator_data_df: pd.DataFrame,
    release_calendar_df: pd.DataFrame,
    evaluation_windows_df: pd.DataFrame,
    # --- Configuration and Paths ---
    fused_master_input_specification: Dict[str, Any],
    root_output_directory: str,
    lexicon_path: str
) -> Dict[str, Any]:
    """
    Executes the complete NEOS study, including the baseline analysis and all robustness checks.

    Purpose:
    This top-level orchestrator serves as the single entry point for the entire
    research project. It first runs the main NEOS pipeline with the baseline
    configuration to generate the primary results. It then systematically
    launches a series of subsequent pipeline runs to perform the sensitivity
    and robustness analyses specified in Task 23.

    Inputs:
        raw_news_data_df (pd.DataFrame): The raw, unprocessed news corpus.
        raw_macro_data_df (pd.DataFrame): Raw quarterly macro data (GDP, etc.).
        monthly_indicator_data_df (pd.DataFrame): Raw monthly comparator indicators.
        release_calendar_df (pd.DataFrame): Metadata on indicator release timing.
        evaluation_windows_df (pd.DataFrame): Metadata on valid evaluation periods.
        fused_master_input_specification (Dict[str, Any]): The master config.
        root_output_directory (str): A root directory where all outputs, including
                                     subdirectories for baseline and sensitivity
                                     runs, will be saved.
        lexicon_path (str): Path to the translated German sentiment lexicon CSV.

    Processes:
    1.  Sets up a clear directory structure for the baseline and robustness runs.
    2.  Calls `run_neos_pipeline` to execute the main analysis based on the
        provided configuration, saving results to a 'baseline' subdirectory.
    3.  Calls `run_robustness_analyses` which then orchestrates multiple re-runs
        of the main pipeline with modified configurations or data, saving results
        to a 'robustness_checks' subdirectory.
    4.  Collects the primary outputs from both stages.

    Outputs:
        (Dict[str, Any]): A dictionary containing the results of the orchestration:
                          - 'baseline_run_results': The results dictionary from the
                            main pipeline run.
                          - 'robustness_summary_df': The aggregated DataFrame
                            comparing all robustness checks.
    """
    # --- 1. Setup Directory Structure ---
    # Define distinct subdirectories for the baseline run and the sensitivity analyses.
    baseline_output_dir = os.path.join(root_output_directory, 'baseline_run')
    robustness_output_dir = os.path.join(root_output_directory, 'robustness_checks')

    # Create the directories.
    os.makedirs(baseline_output_dir, exist_ok=True)
    os.makedirs(robustness_output_dir, exist_ok=True)

    # --- 2. Execute the Baseline Pipeline Run ---
    logging.info("="*80)
    logging.info(">>> STARTING BASELINE NEOS PIPELINE RUN <<<")
    logging.info("="*80)

    # Execute the main end-to-end pipeline with the provided configuration.
    baseline_run_results = run_neos_pipeline(
        raw_news_data_df=raw_news_data_df,
        raw_macro_data_df=raw_macro_data_df,
        monthly_indicator_data_df=monthly_indicator_data_df,
        release_calendar_df=release_calendar_df,
        evaluation_windows_df=evaluation_windows_df,
        fused_master_input_specification=fused_master_input_specification,
        output_directory=baseline_output_dir,
        lexicon_path=lexicon_path
    )

    # Check for failure in the baseline run before proceeding.
    if baseline_run_results.get('status') == 'Failure':
        logging.error("Baseline pipeline run failed. Halting execution of robustness analyses.")
        return {
            "baseline_run_results": baseline_run_results,
            "robustness_summary_df": pd.DataFrame()
        }

    logging.info(">>> BASELINE NEOS PIPELINE RUN COMPLETED SUCCESSFULLY <<<")

    # --- 3. Execute the Robustness Analyses ---
    logging.info("="*80)
    logging.info(">>> STARTING ROBUSTNESS ANALYSIS RUNS <<<")
    logging.info("="*80)

    # Execute the meta-orchestrator for all sensitivity checks.
    # This function will internally call `run_neos_pipeline` multiple times.
    robustness_summary_df = run_robustness_analyses(
        raw_news_data_df=raw_news_data_df,
        raw_macro_data_df=raw_macro_data_df,
        monthly_indicator_data_df=monthly_indicator_data_df,
        release_calendar_df=release_calendar_df,
        evaluation_windows_df=evaluation_windows_df,
        base_config=fused_master_input_specification,
        base_output_dir=robustness_output_dir,
        lexicon_path=lexicon_path
    )

    logging.info(">>> ROBUSTNESS ANALYSIS RUNS COMPLETED SUCCESSFULLY <<<")

    # --- 4. Package and Return Final Outputs ---
    # The final output is a dictionary containing the key results from both major stages.
    final_results = {
        "baseline_run_results": baseline_run_results,
        "robustness_summary_df": robustness_summary_df
    }

    return final_results
