### **`README.md`**

# Replication of "*Prompting for Policy: Forecasting Macroeconomic Scenarios with Synthetic LLM Personas*"

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2511.02458v1-b31b1b.svg)](https://arxiv.org/abs/2511.02458)
[![Conference](https://img.shields.io/badge/Conference-ACM%20ICAIF%202025-9cf)](https://icaif.acm.org/2025/)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/forecasting_macroeconomic_scenarios_synthetic_llm_personas)
[![Discipline](https://img.shields.io/badge/Discipline-Computational%20Economics-00529B)](https://github.com/chirindaopensource/forecasting_macroeconomic_scenarios_synthetic_llm_personas)
[![Data Source](https://img.shields.io/badge/Data%20Source-ECB%20SPF-003299)](https://www.ecb.europa.eu/stats/ecb_surveys/survey_of_professional_forecasters/html/index.en.html)
[![Data Source](https://img.shields.io/badge/Data%20Source-PersonaHub-FFD21E)](https://huggingface.co/datasets/proj-persona/PersonaHub)
[![Core Method](https://img.shields.io/badge/Method-LLM%20Forecasting%20%7C%20Ablation%20Study-orange)](https://github.com/chirindaopensource/forecasting_macroeconomic_scenarios_synthetic_llm_personas)
[![Analysis](https://img.shields.io/badge/Analysis-Time%20Series%20%7C%20Hypothesis%20Testing-red)](https://github.com/chirindaopensource/forecasting_macroeconomic_scenarios_synthetic_llm_personas)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![OpenAI](https://img.shields.io/badge/OpenAI-GPT--4o-412991?logo=openai&logoColor=white)](https://openai.com/index/hello-gpt-4o/)
[![spaCy](https://img.shields.io/badge/spaCy-%2309A3D5.svg?style=flat&logo=spaCy&logoColor=white)](https://spacy.io/)
[![SentenceTransformers](https://img.shields.io/badge/SentenceTransformers-2E4053-blue)](https://www.sbert.net/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/forecasting_macroeconomic_scenarios_synthetic_llm_personas`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Prompting for Policy: Forecasting Macroeconomic Scenarios with Synthetic LLM Personas"** by:

*   Giulia Iadisernia
*   Carolina Camassa

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous data validation and cleansing to a large-scale, multi-stage persona filtering process, high-volume asynchronous forecast generation, and the final statistical analysis, including the central ablation study.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `run_synthetic_economist_study`](#key-callable-run_synthetic_economist_study)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Iadisernia and Camassa (2025). The core of this repository is the iPython Notebook `forecasting_macroeconomic_scenarios_synthetic_llm_personas_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed to be a robust and scalable system for evaluating the impact of persona-based prompting on the macroeconomic forecasting performance of Large Language Models (LLMs).

The paper's central research question is whether sophisticated persona descriptions improve LLM performance in a real-world forecasting task. This codebase operationalizes the paper's experimental design, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Execute a multi-stage filtering pipeline on the ~370M-entry PersonaHub corpus to distill a set of 2,368 high-quality, domain-specific expert personas.
-   Systematically replicate 50 rounds of the ECB Survey of Professional Forecasters using GPT-4o.
-   Generate over 120,000 individual forecasts across a main "persona" arm and a "no-persona" baseline for a controlled ablation study.
-   Perform a comprehensive statistical analysis comparing the accuracy (MAE, win-share) and disagreement (dispersion) of the AI panels against the human expert panel.
-   Run hypothesis tests (Monte Carlo and exact binomial) to assess the statistical significance of the findings.
-   Execute the final ablation study tests (paired t-test, Kolmogorov-Smirnov test) to formally evaluate the impact of personas.

## Theoretical Background

The implemented methods are grounded in the principles of experimental design, computational linguistics, and time-series forecast evaluation.

**1. Persona-Based Prompting:**
The core hypothesis is that providing an LLM with a detailed "persona" or "role" can improve its performance on domain-specific reasoning tasks. This is tested via an ablation study, where the performance of prompts with personas is compared to identical prompts without them.

**2. Forecast Evaluation Metrics:**
-   **Mean Absolute Error (MAE):** A standard metric for point forecast accuracy, measuring the average magnitude of forecast errors.
    $$
    \mathrm{MAE}_{vh} = \frac{1}{n_{vh}} \sum_{r=1}^{n_{vh}} | \hat{y}_{rvh} - y_{rvh} |
    $$
-   **Win-Share:** A head-to-head comparison metric that counts the proportion of forecast rounds where one panel's forecast was strictly more accurate than another's, excluding ties.
    $$
    w_{vh} = \frac{W_{vh}}{n_{vh}}, \quad \text{where } W_{vh} = \sum_{r} \mathbf{1}\{ e_{rvh}^{\mathrm{AI}} < e_{rvh}^{\mathrm{H}} \}
    $$

**3. Hypothesis Testing:**
-   **Null Hypothesis:** The AI panel and human panel have an equal probability of producing a more accurate forecast ($H_0: p=0.5$).
-   **Test for Large Samples (In-Sample):** The null distribution is approximated using a **Monte Carlo simulation** with $N=10,000$ draws from a Binomial distribution, $W^* \sim \text{Binom}(n_{vh}, 0.5)$.
-   **Test for Small Samples (Out-of-Sample):** An **exact Binomial test** is used, calculating probabilities directly from the Binomial PMF.

## Features

The provided iPython Notebook (`forecasting_macroeconomic_scenarios_synthetic_llm_personas_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into 25 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters are managed in an external `config.yaml` file.
-   **Scalable Data Processing:** Includes streaming validators and processors for the large-scale PersonaHub dataset.
-   **Resilient API Orchestration:** A robust, asynchronous framework for managing over 120,000 API calls with concurrency control, rate limiting, automatic retries, and resumability via checkpointing.
-   **Advanced Persona Filtering:** A four-stage pipeline combining keyword filtering, NER, semantic deduplication (via embeddings and HNSW), and a multi-run, majority-vote LLM-as-a-judge triage.
-   **Rigorous Statistical Analysis:** Implements all specified forecast evaluation metrics and hypothesis tests with high fidelity.
-   **Complete Replication:** A single top-level function call can execute the entire study from raw data to final result tables.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Cleansing (Tasks 1-4):** Ingests and validates all raw inputs, including the ~370M-entry PersonaHub file and all analytical datasets.
2.  **Persona Filtering (Tasks 5-9):** Executes the four-stage filtering pipeline to derive the final `persona_final_df` of 2,368 personas. Includes a Cohen's kappa validation of the LLM judge.
3.  **Forecast Generation (Tasks 10-12):** Assembles all 123,400 prompts and executes the API calls for both the persona and baseline arms.
4.  **Analysis & Scoring (Tasks 13-22):** Consolidates and QCs all forecasts, computes AI panel medians, aligns all data sources, calculates dispersion, errors, MAE, and win-shares, and runs all hypothesis tests.
5.  **Ablation Study (Tasks 24-25):** Runs the final paired t-test and Kolmogorov-Smirnov test on the aligned results to formally test the paper's main hypothesis.

## Core Components (Notebook Structure)

The `forecasting_macroeconomic_scenarios_synthetic_llm_personas_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 25 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `run_synthetic_economist_study`

The project is designed around a single, top-level user-facing interface function:

-   **`run_synthetic_economist_study`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project.

## Prerequisites

-   Python 3.9+
-   An OpenAI API key.
-   Core dependencies: `pandas`, `numpy`, `pyyaml`, `pyarrow`, `openai`, `spacy`, `sentence-transformers`, `networkx`, `hnswlib`, `scipy`, `scikit-learn`, `tqdm`, `faker`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/forecasting_macroeconomic_scenarios_synthetic_llm_personas.git
    cd forecasting_macroeconomic_scenarios_synthetic_llm_personas
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install -r requirements.txt
    ```

4.  **Download the spaCy model:**
    ```sh
    python -m spacy download en_core_web_trf
    ```

5.  **Set your OpenAI API Key:**
    ```sh
    export OPENAI_API_KEY='your-key-here'
    ```

## Input Data Structure

The pipeline requires several input files with specific schemas, which are rigorously validated. A synthetic data generator is included in the notebook for a self-contained demonstration.
1.  **`persona_hub_raw.parquet`**: The large-scale persona dataset.
2.  **`contextual_data.csv`**: Time-series data for the 50 SPF rounds.
3.  **`human_benchmark.csv`**: Human expert panel median forecasts.
4.  **`human_micro.csv`**: Individual human expert forecasts.
5.  **`realized_outcomes.csv`**: Ground-truth macroeconomic data.
6.  **`human_annotations.csv`**: Human judgments for the kappa validation.

All other parameters are controlled by the `config.yaml` file.

## Usage

The `forecasting_macroeconomic_scenarios_synthetic_llm_personas_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `run_synthetic_economist_study` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Define paths and load configuration.
    run_dir = Path("./synthetic_economist_run")
    raw_data_dir = run_dir / "raw_data"
    output_dir = run_dir / "output"
    
    with open('config.yaml', 'r') as f:
        config = yaml.safe_load(f)
    
    # 2. Generate a full set of synthetic data files for the demonstration.
    # (The generation functions are defined in the notebook)
    data_paths = setup_synthetic_data(raw_data_dir, config)
    
    # 3. Execute the entire replication study.
    final_artifacts = run_synthetic_economist_study(
        data_paths=data_paths,
        config=config,
        output_dir=str(output_dir),
        total_persona_rows=10000, # Use the size of our synthetic dataset
        run_kappa_validation=True
    )
    
    # 4. Inspect final results.
    print("--- Ablation Paired T-Test Report ---")
    print(final_artifacts['ablation_ttest_report'])
```

## Output Structure

The pipeline generates a structured output directory:
-   **`output/processed/`**: Contains intermediate, processed data files (e.g., cleansed and filtered persona sets).
-   **`output/checkpoints/`**: Contains raw JSONL results from all API calls, enabling resumability.
-   **`output/results/`**: Contains all final output tables (MAE, win-share, dispersion) as CSV files and a comprehensive `full_pipeline_report.json`.
-   **`output/pipeline_run.log`**: A detailed log file for the entire run.

## Project Structure

```
forecasting_macroeconomic_scenarios_synthetic_llm_personas/
│
├── forecasting_macroeconomic_scenarios_synthetic_llm_personas_draft.ipynb
├── config.yaml
├── requirements.txt
│
├── study_run/
│   ├── raw_data/
│   └── output/
│       ├── processed/
│       ├── results/
│       ├── checkpoints/
│       └── pipeline_run.log
│
├── LICENSE
└── README.md
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify all study parameters, including model names, API settings, filtering thresholds, and file paths, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Evaluating Different LLMs:** The modular design allows for easy substitution of the `model_name` in the config to test other models (e.g., from Anthropic, Google).
-   **Density Forecasting:** Extending the prompts to ask for probability distributions (as in the real SPF) and evaluating the quality of the LLM's density forecasts.
-   **Alternative Prompting Strategies:** Implementing and testing other prompting techniques, such as chain-of-thought or adversarial prompting, to see if they can generate more diverse or accurate forecasts.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@inproceedings{iadisernia2025prompting,
  author = {Iadisernia, Giulia and Camassa, Carolina},
  title = {Prompting for Policy: Forecasting Macroeconomic Scenarios with Synthetic LLM Personas},
  year = {2025},
  booktitle = {Proceedings of the 6th ACM International Conference on AI in Finance},
  series = {ICAIF '25}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Production-Grade Replication of "Prompting for Policy: Forecasting Macroeconomic Scenarios with Synthetic LLM Personas".
GitHub repository: https://github.com/chirindaopensource/forecasting_macroeconomic_scenarios_synthetic_llm_personas
```

## Acknowledgments

-   Credit to **Giulia Iadisernia and Carolina Camassa** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, spaCy, Sentence-Transformers, NetworkX, Scipy, Scikit-learn, and OpenAI**.

--

*This README was generated based on the structure and content of the `forecasting_macroeconomic_scenarios_synthetic_llm_personas_draft.ipynb` notebook and follows best practices for research software documentation.*


# Paper

Title: "*Prompting for Policy: Forecasting Macroeconomic Scenarios with Synthetic LLM Personas*"

Authors: Giulia Iadisernia, Carolina Camassa

E-Journal Submission Date: 4 November 2025

Conference Affiliation: The 6th ACM International Conference on AI in Finance (ICAIF 2025)

Link: https://arxiv.org/abs/2511.02458

Abstract:

We evaluate whether persona-based prompting improves Large Language Model (LLM) performance on macroeconomic forecasting tasks. Using 2,368 economics-related personas from the PersonaHub corpus, we prompt GPT-4o to replicate the ECB Survey of Professional Forecasters across 50 quarterly rounds (2013-2025). We compare the persona-prompted forecasts against the human experts panel, across four target variables (HICP, core HICP, GDP growth, unemployment) and four forecast horizons. We also compare the results against 100 baseline forecasts without persona descriptions to isolate its effect. We report two main findings. Firstly, GPT-4o and human forecasters achieve remarkably similar accuracy levels, with differences that are statistically significant yet practically modest. Our out-of-sample evaluation on 2024-2025 data demonstrates that GPT-4o can maintain competitive forecasting performance on unseen events, though with notable differences compared to the in-sample period. Secondly, our ablation experiment reveals no measurable forecasting advantage from persona descriptions, suggesting these prompt components can be omitted to reduce computational costs without sacrificing accuracy. Our results provide evidence that GPT-4o can achieve competitive forecasting accuracy even on out-of-sample macroeconomic events, if provided with relevant context data, while revealing that diverse prompts produce remarkably homogeneous forecasts compared to human panels.


# Summary

### **The Core Research Question and Motivation**

The central question the authors investigate is both simple and profound: **Does emulating human expertise through detailed "persona" descriptions improve the macroeconomic forecasting performance of a Large Language Model (LLM)?**

The motivation is clear. Central banks, like the European Central Bank (ECB), rely on surveys of human experts—in this case, the Survey of Professional Forecasters (SPF)—to gauge expectations about the economy. These surveys are crucial for setting monetary policy. LLMs offer the tantalizing possibility of simulating these expert panels at scale, speed, and low cost. However, the field of "prompt engineering" is still nascent. While it's known that LLM outputs are sensitive to prompts, there is little systematic evidence on whether complex, role-playing prompts (i.e., personas) actually add value in a technical domain like forecasting, or if they are merely computational window dressing. This paper aims to fill that empirical gap.

### **The Experimental Design and Methodology**

The authors' experimental pipeline, elegantly summarized in Figure 1, is the heart of this paper. It's a multi-stage process designed to rigorously test their hypothesis.

**A. The Task:** The goal is to replicate the ECB's quarterly Survey of Professional Forecasters. The LLM must produce point forecasts for four key Euro area variables:
*   **HICP:** Headline inflation
*   **HICPX:** Core inflation (excluding volatile items)
*   **rGDP:** Real GDP growth
*   **UNR:** Unemployment rate

These forecasts are required for multiple time horizons: the current year (CY), next year (CY+1), two years out (CY+2), and the long term. The experiment spans 50 quarterly rounds from 2013 to 2025.

**B. The "Synthetic Experts" (Persona Creation):** Instead of hand-crafting a few "expert" personas, the authors perform a large-scale filtering exercise.
1.  They start with the massive **PersonaHub dataset** (370 million expert descriptions).
2.  They apply a multi-stage filter (keyword search, named-entity recognition, deduplication) to isolate descriptions relevant to economics and monetary policy.
3.  Crucially, they use another LLM (GPT-4o-mini) as a "judge" to score the remaining personas on three criteria: **EU-centrality**, **neutrality** (no obvious bias), and **monetary policy depth**.
4.  This rigorous process yields a final panel of **2,368 distinct, relevant, and high-quality personas**.

**C. The Prompt Architecture:** For each of the 50 survey rounds, every one of the 2,368 personas is prompted. Each prompt contains standardized contextual information critical for any forecaster, human or AI:
*   **The Persona Blurb:** The specific expert description being tested.
*   **Monetary Policy Context:** The full text of the latest ECB press release.
*   **Macro Snapshot:** Key recent data points (e.g., latest inflation reading).
*   **Past Forecasts:** The median forecast from the previous SPF survey.
*   **Task Instruction:** A clear directive to provide numerical forecasts in a specific format.

**D. The Control Group (Ablation Study):** To isolate the effect of the persona, they run a parallel experiment. They create 100 "baseline" forecasts for each round using the exact same prompt, but with the persona description block completely removed. This is the control group. If the personas add value, the persona-driven forecasts should be significantly more accurate than these baseline forecasts.

**E. The Evaluation Framework:** Performance is measured in two primary ways:
1.  **Against Reality (Point Accuracy):** The median forecast of the 2,368 AI personas is compared to the actual realized economic data. The metric used is the Mean Absolute Error (MAE).
2.  **Against Humans (Relative Performance):** The AI panel's median forecast is compared to the human SPF panel's median forecast. This is evaluated using **"win-share,"** which calculates the percentage of rounds where the AI's forecast was closer to the true value than the human forecast (ties are excluded).

The authors also astutely include an **out-of-sample** test period (2024-2025), which falls after the knowledge cutoff of the GPT-4o model they use. This helps mitigate concerns that the model is simply "memorizing" economic history rather than performing genuine, context-based reasoning.

### **The Principal Findings**

The paper delivers two clear, high-impact findings.

**Finding 1: AI Achieves Competitive Accuracy with Human Experts.**
When comparing the median AI forecast to the median human forecast, the performance is remarkably similar. As shown in Table 4 (MAE) and the heatmap in Figure 1 (Win Rate), neither panel consistently dominates the other. The differences in accuracy are often statistically significant but practically modest. For example, AI tends to have a slight edge in forecasting core inflation, while humans are slightly better at short-term unemployment. The key takeaway is that, when provided with the proper real-time context, an LLM can function as a "synthetic forecaster" on par with a panel of credentialed human experts.

**Finding 2: Sophisticated Personas Provide No Measurable Forecasting Advantage.**
This is the paper's most significant methodological contribution. The ablation experiment yields a **null result**.
*   Comparing the error distributions of the persona-driven forecasts versus the no-persona baseline forecasts reveals no statistically significant difference.
*   This is visualized perfectly in Figure 4, where the kernel density plots of the absolute errors for both groups are almost perfectly overlapping. A Kolmogorov-Smirnov test confirms the distributions are statistically indistinguishable.
*   The implication is powerful: the elaborate and computationally expensive process of selecting and using 2,368 unique personas added no value to the final forecast accuracy. The model's performance is driven by the contextual data (ECB statements, macro data), not by being told to "act like" a specific type of economist.

### **Key Quantitative and Behavioral Insights**

Beyond the two main findings, the paper reveals a crucial behavioral difference between the AI and human panels.

*   **Extremely Low Dispersion in AI Forecasts:** As quantified in Table 3 and visualized in Figure 2, the AI forecasts are incredibly homogeneous. The inter-quartile range (IQR) of the 2,368 AI forecasts is often two orders of magnitude smaller than the IQR of the human panel.
*   **Interpretation:** Despite being prompted with thousands of diverse personas, the LLM exhibits strong **consensus-seeking behavior**. It converges to a single, stable view of the economy based on the provided data. Human experts, in contrast, display a wide and realistic range of disagreement, reflecting diverse models, biases, and interpretations. This suggests that while LLMs can replicate the *central tendency* of expert opinion, they fail to capture the *heterogeneity* of that opinion.

### **Implications and Concluding Remarks**

In essence, this paper delivers a clear, actionable, and somewhat humbling message for the field of AI-driven forecasting.

1.  **For Practitioners:** The good news is that LLMs like GPT-4o are powerful tools for macroeconomic forecasting that can achieve human-level accuracy. The crucial insight is that resources should be focused on **high-quality data integration and prompt clarity**, not on elaborate persona engineering. One can achieve top-tier results with a simple, direct prompt, saving significant computational cost and complexity.

2.  **For Researchers:** The paper demonstrates that LLMs act as powerful information synthesizers but not as genuine simulators of diverse human cognition. The lack of forecast dispersion is a critical finding. Future work should explore methods to induce realistic diversity in LLM outputs, perhaps through adversarial prompting or chain-of-thought techniques that force the model to consider alternative scenarios, rather than just personas.

This work is a fine example of how to conduct clean, hypothesis-driven research on LLMs. It replaces speculation with empirical evidence and provides a solid foundation for building more effective and efficient AI-augmented forecasting systems.

# Import Essential Modules

In [None]:

#!/usr/bin/env python3
# ==============================================================================#
#
#  Replication Pipeline for "Prompting for Policy: Forecasting Macroeconomic
#  Scenarios with Synthetic LLM Personas"
#
#  This module provides a complete, production-grade implementation of the
#  end-to-end research pipeline presented in "Prompting for Policy: Forecasting
#  Macroeconomic Scenarios with Synthetic LLM Personas" by Iadisernia and
#  Camassa (2025). It delivers a robust and reproducible system for replicating
#  the paper's core findings on the efficacy of persona-based prompting for
#  LLM-driven macroeconomic forecasting.
#
#  Core Methodological Components:
#  • Large-scale persona filtering from the PersonaHub corpus using keyword
#    matching, Named Entity Recognition (NER), semantic deduplication, and a
#    multi-criteria LLM-as-a-judge triage.
#  • Systematic replication of the ECB Survey of Professional Forecasters (SPF)
#    across 50 quarterly rounds (2013-2025) using GPT-4o.
#  • Generation of 118,400 forecasts from 2,368 synthetic LLM personas.
#  • Controlled ablation study with 5,000 baseline forecasts (no persona).
#  • Rigorous forecast evaluation against human expert medians and realized
#    outcomes using Mean Absolute Error (MAE) and win-share statistics.
#  • Statistical hypothesis testing using Monte Carlo simulation for in-sample
#    data and exact binomial tests for out-of-sample data.
#
#  Technical Implementation Features:
#  • Streaming data processing for large-scale validation and cleansing.
#  • Asynchronous, resilient, and resumable API orchestration for high-volume
#    LLM calls with concurrency and rate-limit management.
#  • Efficient semantic search using sentence embeddings and an HNSW index.
#  • Graph-based clustering for robust deduplication.
#  • Modular, task-specific orchestrators composed into a single, auditable
#    end-to-end pipeline.
#
#  Paper Reference:
#  Iadisernia, G., & Camassa, C. (2025). Prompting for Policy: Forecasting
#  Macroeconomic Scenarios with Synthetic LLM Personas. In Proceedings of the
#  6th ACM International Conference on AI in Finance (ICAIF '25).
#  arXiv preprint arXiv:2511.02458. https://arxiv.org/abs/2511.02458
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# ==============================================================================#
# Consolidated Imports for the End-to-End Pipeline
# ==============================================================================#

# --- Standard Library ---
import asyncio
import hashlib
import json
import logging
import os
import re
import unicodedata
from collections import Counter
from pathlib import Path
from typing import (Any, Coroutine, Dict, Generator, List, Optional, Set,
                    Tuple, Union)

# --- Third-Party Libraries ---
# Core Data Handling and Numerical Operations
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Natural Language Processing
import spacy
import tiktoken
from sentence_transformers import SentenceTransformer
from spacy.language import Language

# Asynchronous Operations and API Interaction
import openai
from openai import APIError, AsyncOpenAI, RateLimitError
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio

# Statistical Analysis
from scipy.stats import binom, ks_2samp, ttest_rel
from sklearn.metrics import cohen_kappa_score, confusion_matrix

# Graph Analysis for Deduplication
import networkx as nx

# Approximate Nearest Neighbor Search for Deduplication
try:
    import hnswlib
except ImportError:
    # Provide a clear error message if this optional dependency is missing.
    raise ImportError(
        "The 'hnswlib' library is required for the deduplication task. "
        "Please install it by running: pip install hnswlib"
    )

# ==============================================================================
# Setup: Logger Configuration
# ==============================================================================
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# Implementation

## Draft 1

### **Documentation of Pipeline Orchestrators**

#### **Task 1: `validate_persona_hub_df`**

*   **Inputs:**
    *   `df_path`: Path to the raw `persona_hub_raw_df` Parquet file (~370M records).
    *   `config`: The main configuration dictionary.
    *   `total_rows`: The known total number of rows in the file.
*   **Processes:**
    1.  Reads the large Parquet file in a streaming fashion, chunk by chunk, to maintain a low memory footprint.
    2.  **Schema Validation:** For the first chunk, verifies that all expected columns (`id`, `description`, etc.) are present with the correct dtypes.
    3.  **ID Uniqueness:** Iteratively builds a set of all unique `id` values encountered across all chunks, checking for duplicates both within and across chunks.
    4.  **Content Validation:** Accumulates statistics across all chunks, including the count of null descriptions, the distribution of `token_count`, the set of `language_code` values, and the frequency of `domain_tags`.
    5.  **Integrity Validation:** On a random sample of rows drawn probabilistically from the stream, it re-computes the SHA-256 hash of the `description` and compares it to the stored hash. It also aggregates counts for each `shard_id` to ensure full coverage.
*   **Outputs:**
    *   A comprehensive dictionary (`report`) containing the overall validation status (`SUCCESS`/`FAILURE`), a list of any errors found, and detailed statistics from all checks (e.g., duplicate ID count, hash mismatch count, top domain tags).
*   **Role in Research Pipeline:** This function serves as the **initial data integrity gatekeeper** for the largest and most critical input dataset. It corresponds to the implicit pre-processing and validation that must occur before the filtering pipeline described in **Section 3.1 ("Persona dataset")** can begin. It ensures the raw data is structurally sound and internally consistent, preventing errors in all subsequent processing steps.



#### **Task 2: `validate_analytical_inputs`**

*   **Inputs:**
    *   `contextual_data_df`, `human_survey_predictions_benchmark_df`, `human_survey_micro_df`, `realized_macro_data_df`: The four core analytical DataFrames, loaded into memory.
    *   `config`: The main configuration dictionary.
*   **Processes:**
    1.  **Contextual Data Validation:** Verifies `contextual_data_df` has exactly 50 rows, correct `survey_round` format, valid temporal logic (`ecb_meeting_date <= survey_date`), and correct schemas for its complex nested dictionary columns. It cross-validates HICPX availability rules against the config.
    2.  **Human Survey Validation:** Verifies that the `benchmark` and `micro` DataFrames contain only allowed categorical values. It performs a critical cross-validation by re-computing the `target_year` and ensuring it matches the provided data. It also enforces HICPX and OOS business rules.
    3.  **Realized Data Validation:** Checks for uniqueness on `(reference_year, variable)` and performs a critical look-ahead bias check: `first_release_date` must be after the `reference_year`.
    4.  **Config Validation:** Performs a deep validation of the `config` dictionary, ensuring key experimental parameters (model names, temperature, simulation counts, etc.) match the paper's specification exactly.
*   **Outputs:**
    *   A dictionary (`report`) summarizing the validation status of each input. The function raises a `ValueError` if any check fails.
*   **Role in Research Pipeline:** This function is the **integrity gatekeeper for the analytical datasets and the experimental parameters**. It corresponds to the data validation steps described throughout **Section 3 ("Data and experimental setup")**, ensuring that the human benchmarks, realized outcomes, and contextual data are perfectly aligned and that the experiment will be run with the correct parameters.



#### **Task 3: `cleanse_persona_hub_df`**

*   **Inputs:**
    *   `raw_df_path`: Path to the validated raw persona hub file.
    *   `cleansed_df_path`: Path where the output file will be written.
    *   `config`: The configuration dictionary.
*   **Processes:**
    1.  Reads the raw data in a streaming fashion.
    2.  For each chunk, it applies a series of "minimal, lossless" transformations:
        *   **Text Normalization:** Normalizes whitespace, removes control characters, and applies Unicode NFC normalization to the `description` column.
        *   **Filtering:** Creates boolean masks to identify and subsequently drop rows with empty descriptions, disallowed language codes, or invalid token counts.
        *   **Auxiliary Field Cleansing:** Normalizes the `domain_tags` column, handling mixed types (lists, JSON strings, nulls).
        *   **Integrity Enforcement:** Re-computes the `sha256_description_hash` based on the *newly cleansed* description. It also sets the `id` column as the DataFrame index.
    3.  Writes the cleansed, filtered, and indexed chunks to a new Parquet file in a streaming fashion.
*   **Outputs:**
    *   A new Parquet file at `cleansed_df_path`.
    *   A dictionary (`report`) summarizing the process (rows read, written, and dropped by reason).
*   **Role in Research Pipeline:** This function performs the **canonical data preparation** for the persona dataset. It creates the clean, standardized starting point for the main filtering pipeline described in **Section 3.1**.



#### **Task 4: `cleanse_analytical_inputs`**

*   **Inputs:** The four raw (but validated) analytical DataFrames.
*   **Processes:**
    1.  **Contextual Data Cleansing:** Normalizes the `ecb_communication_text`, parses date strings into `datetime` objects, and re-computes `ecb_communication_token_count` using the correct tokenizer for perfect accuracy.
    2.  **`target_year` Computation:** This is a critical transformation. It computes the canonical `target_year` for every row in the `human_benchmark_df` and `human_micro_df` by combining the `round` and `horizon` information, using `contextual_data_df` for the 'LT' mapping.
    3.  **Realized Data Cleansing:** Deduplicates the `realized_macro_data_df` and strictly validates the `aggregation_method` strings.
*   **Outputs:**
    *   A tuple of the four cleansed and transformed DataFrames.
*   **Role in Research Pipeline:** This function performs the **final preparation of all analytical datasets**. The most important step is the computation of the `target_year`, which creates the essential join key that will be used in **Task 15** to align forecasts with their corresponding realized outcomes for scoring.



#### **Task 5: `apply_keyword_domain_filter`**

*   **Inputs:** Path to the cleansed persona hub file.
*   **Processes:**
    1.  Reads the data in a streaming fashion.
    2.  For each chunk, it applies two filters in parallel:
        *   **Keyword Filter:** Counts the number of unique lexicon phrases in each description and checks if the count is $\geq 2$. Implements: $I_i^{\mathrm{kw}} = \mathbf{1}\{ c_i \geq 2 \}$, where $c_i = \left| \{ \ell \in L : \ell \text{ is in } \text{description}_i \} \right|$.
        *   **Domain Filter:** Checks if the persona's `domain_tags` have a non-empty intersection with a set of allowed domains. Implements: $I_i^{\mathrm{dom}} = \mathbf{1}\{ \mathrm{domain\_tags}_i \cap \mathrm{Dom\_allow} \neq \emptyset \}$.
    3.  Keeps only the rows that pass both filters and writes them to a new Parquet file.
*   **Outputs:**
    *   A new, smaller Parquet file (`persona_step1.parquet`).
    *   A summary report.
*   **Role in Research Pipeline:** This is the **first main filtering stage** described in **Section 3.1, step (1) "Keyword search and domain filtering"**. It acts as a high-recall pre-screen to drastically reduce the dataset from ~370M to ~200k candidates.



#### **Task 6: `apply_ner_person_filter`**

*   **Inputs:** The `persona_step1_df` DataFrame (~200k records).
*   **Processes:**
    1.  Loads and configures a `spaCy` NER model.
    2.  Processes all descriptions in batches using the efficient `nlp.pipe()` method.
    3.  For each description, it checks if the model identified any entities with the label 'PERSON'.
    4.  It filters out any persona where a 'PERSON' entity was found. Implements: $I_i^{\mathrm{NER}} = \mathbf{1}\{ \text{no entity in } E_i \text{ has label PERSON} \}$.
*   **Outputs:**
    *   A new, smaller DataFrame (`persona_step2_df`, ~43k records).
    *   A summary report.
*   **Role in Research Pipeline:** This is the **second filtering stage**, corresponding to **Section 3.1, step (2) "Name filtering"**. Its purpose is to remove descriptions that mention specific individuals to prevent the LLM from simply role-playing a known figure.



#### **Task 7: `apply_embedding_deduplication`**

*   **Inputs:** The `persona_step2_df` DataFrame (~43k records).
*   **Processes:**
    1.  **Embedding:** Generates a high-dimensional vector embedding for each description using a `sentence-transformers` model and L2-normalizes them.
    2.  **Similarity Search:** Builds an HNSW (Approximate Nearest Neighbor) index on the embeddings. Queries the index to find all pairs of personas $(i, j)$ whose cosine similarity is above the threshold of 0.90. Implements: Find all $(i, j)$ where $s_{ij} = e_i^\top e_j \geq 0.90$.
    3.  **Clustering:** Treats the similar pairs as edges in a graph and finds the connected components. Each component represents a cluster of semantically duplicate personas.
    4.  **Selection:** From each cluster, it selects a single representative persona based on a deterministic rule (the one with the lexicographically smallest `id`).
*   **Outputs:**
    *   A new, much smaller DataFrame (`persona_step3_df`, ~4.3k records in the paper, though the implementation yields ~2.4k).
    *   A summary report.
*   **Role in Research Pipeline:** This is the **third filtering stage**, corresponding to **Section 3.1, step (3) "Duplicate removal"**. It ensures the set of candidate personas is diverse and non-redundant at a semantic level.



#### **Task 8: `apply_llm_judge_filter`**

*   **Inputs:** The `persona_step3_df` DataFrame.
*   **Processes:**
    1.  For each of the ~4.3k candidate personas, it makes three independent, asynchronous calls to the `gpt-4o-mini` API with `temperature=1.0`.
    2.  The prompt for each call is a verbatim copy of the system prompt from Appendix A, which instructs the model to evaluate the persona on three criteria: EU-centrality, monetary policy depth, and neutrality.
    3.  It parses the structured JSON response from each call.
    4.  **Majority Vote:** For each persona and each criterion, it applies a majority vote rule. Implements: $\text{pass}_c(i) = \mathbf{1}\{ \sum_{r=1}^{3} \mathbf{1}\{ \text{run } r \text{ passed for } c \} \geq 2 \}$.
    5.  **Final Selection:** It keeps only those personas that passed the majority vote on *all three* criteria. Implements: $\text{keep}_i = \text{pass}_{\mathrm{EU}}(i) \cdot \text{pass}_{\mathrm{MP}}(i) \cdot \text{pass}_{\mathrm{NEU}}(i)$.
*   **Outputs:**
    *   The final DataFrame of selected personas (`persona_final_df`, with exactly 2,368 rows).
    *   A summary report.
*   **Role in Research Pipeline:** This is the **fourth and final filtering stage**, corresponding to **Section 3.1, step (4) "Zero-shot relevance rating"**. It is the most sophisticated filter, using an LLM's reasoning capabilities to select the highest-quality personas.



#### **Task 9: `validate_llm_judge_reliability`**

*   **Inputs:** The `persona_step3_df`, the full set of LLM judgments, and a DataFrame of human annotations for a sample of 50 personas.
*   **Processes:**
    1.  Aligns the judgments from two human annotators and the LLM's majority-vote decision for the 50 sampled personas.
    2.  For each of the three criteria, it computes Cohen's kappa for three pairs of raters: (Human1, Human2), (Human1, LLM), and (Human2, LLM).
    3.  Implements the kappa formula: $\kappa = \frac{p_o - p_e}{1 - p_e}$.
    4.  Compares the computed kappa scores to the "substantial agreement" range ([0.61, 0.81]) reported in the paper.
*   **Outputs:**
    *   A detailed report DataFrame containing the 9 kappa scores, their interpretation, and confusion matrices.
    *   A high-level summary of the validation outcome.
*   **Role in Research Pipeline:** This is a **methodological validation step** that corresponds to the validation process described at the end of **Section 3.1, step (4)**. It provides statistical evidence that the LLM-as-judge filter is reliable and aligns with human judgment.



#### **Task 10: `assemble_forecasting_prompts`**

*   **Inputs:** The `persona_final_df` (2,368 personas) and the `clean_contextual_data_df` (50 rounds).
*   **Processes:**
    1.  Iterates through each of the 50 survey rounds.
    2.  For each round, it prepares a dictionary of all contextual data placeholders by extracting and formatting data from `contextual_data_df`.
    3.  It then generates two sets of prompts for that round:
        *   **Persona Arm:** It loops through all 2,368 personas and creates a prompt for each one, injecting the persona's description into the persona-specific system prompt template.
        *   **No-Persona Arm:** It loops 100 times and creates a baseline prompt for each run, using the no-persona system prompt template.
    4.  It yields each fully formed prompt as a structured object.
*   **Outputs:**
    *   A generator that yields a total of 123,400 prompt objects (118,400 for the persona arm + 5,000 for the baseline arm).
*   **Role in Research Pipeline:** This function implements the **prompt engineering** stage described in **Section 3.3 ("Prompt architecture")**. It is responsible for creating the exact text that will be sent to the forecasting LLM.



#### **Task 11: `generate_persona_forecasts`**

*   **Inputs:** The generator of prompts from Task 10, filtered for the 'persona' arm.
*   **Processes:**
    1.  Orchestrates the execution of 118,400 asynchronous API calls to the `gpt-4o-2024-11-20` model.
    2.  Manages concurrency, rate limiting, and retries.
    3.  Saves the raw JSON response for each call to a checkpoint file for resumability.
    4.  Parses the JSON responses, validates their structure (16 forecast items), and converts them into a tidy DataFrame.
*   **Outputs:**
    *   A DataFrame (`persona_forecasts_df`) containing all 1,894,400 individual forecast points (118,400 completions * 16 points).
    *   A summary report.
*   **Role in Research Pipeline:** This function executes the **main experimental run**, generating the AI forecasts for the persona arm as described in **Section 1** and the overall experimental design.



#### **Task 12: `generate_baseline_forecasts`**

*   **Inputs:** The generator of prompts from Task 10, filtered for the 'baseline' arm.
*   **Processes:**
    1.  Orchestrates the execution of 5,000 asynchronous API calls.
    2.  Follows the exact same process as Task 11 (concurrency, checkpointing, parsing).
    3.  Includes an additional validation step to check for stochastic variation in the outputs, ensuring the `temperature=1.0` setting was effective.
*   **Outputs:**
    *   A DataFrame (`baseline_forecasts_df`) containing all 80,000 individual baseline forecast points.
    *   A summary report.
*   **Role in Research Pipeline:** This function executes the **control group run** for the ablation study, as described in the paper's experimental design.



#### **Task 13: `parse_and_qc_all_forecasts`**

*   **Inputs:** The `persona_forecasts_df` and `baseline_forecasts_df`.
*   **Processes:**
    1.  **Consolidation:** Combines the two DataFrames into a single, unified DataFrame, adding a `source_type` column ('persona' or 'baseline') and renaming the ID columns to a generic `source_id`.
    2.  **Final QC:** Performs a final validation pass, ensuring all `value`s are valid floats and adding a `qc_pass` flag.
    3.  **Outlier Flagging:** Applies sanity bounds to the `value`s based on their `variable` and adds a boolean `is_outlier` flag, but does not filter these rows.
*   **Outputs:**
    *   A single, clean, unified DataFrame (`unified_forecasts_df`) containing all valid forecast points from both arms.
    *   A summary report.
*   **Role in Research Pipeline:** This is the **final data preparation step** before analysis begins, ensuring a clean, consistent, and unified dataset.



#### **Task 14: `compute_ai_panel_medians`**

*   **Inputs:** The `unified_forecasts_df`.
*   **Processes:**
    1.  For the 'persona' arm, it groups by `(round, variable, horizon)` and computes the median forecast. Implements: $\hat{y}_{rvh}^{\mathrm{AI}} = \mathrm{median}_{p \in P^*} \left( \hat{y}^{\mathrm{AI}}_{p, r, v, h} \right)$.
    2.  For the 'baseline' arm, it does the same. Implements: $\hat{y}_{rvh}^{\mathrm{AI,NP}} = \mathrm{median}_{s \in \{1, \ldots, 100\}} \left( \hat{y}^{\mathrm{AI,NP}}_{s, r, v, h} \right)$.
    3.  It merges these two sets of results into a single DataFrame.
*   **Outputs:**
    *   A DataFrame (`ai_panel_medians_df`) with the consensus (median) forecast for both arms for every forecast instance.
*   **Role in Research Pipeline:** This function performs the **cross-sectional aggregation** described in **Section 3.4 ("Scoring metrics")**. It transforms the panel of individual AI forecasts into the single time series that will be compared against the human panel.



#### **Task 15: `align_forecasts_for_scoring`**

*   **Inputs:** The `ai_panel_medians_df`, `human_benchmark_df`, `realized_outcomes_df`, and `contextual_data_df`.
*   **Processes:**
    1.  Computes the `target_year` for every AI forecast.
    2.  Merges the AI medians with the human medians on `(round, variable, horizon)`.
    3.  Merges the result with the realized outcomes on `(target_year, variable)`.
    4.  Applies the OOS filtering rules.
*   **Outputs:**
    *   The master analysis DataFrame (`aligned_df`) containing all data needed for scoring.
*   **Role in Research Pipeline:** This is the **final data alignment step**, creating the master table upon which all subsequent scoring and analysis (Tasks 18-25) will be performed.



#### **Task 16: `compute_ai_persona_dispersion`**

*   **Inputs:** The `unified_forecasts_df`.
*   **Processes:**
    1.  Groups the 'persona' forecasts by `(round, variable, horizon)`.
    2.  For each group, calculates the IQR and the population standard deviation. Implements: $\mathrm{IQR}_{rvh}^{\mathrm{AI}}$ and $\mathrm{SD}_{rvh}^{\mathrm{AI}} = \sqrt{ \frac{1}{K} \sum \dots }$.
    3.  Summarizes these per-round metrics by taking the median across all rounds for each `(variable, horizon)`.
*   **Outputs:**
    *   A summary DataFrame (`ai_dispersion_summary_df`) with the median dispersion metrics.
*   **Role in Research Pipeline:** This function implements the **panel disagreement analysis** for the AI panel, as described in **Section 3.4** and presented in **Table 3**.



#### **Task 17: `compute_human_dispersion_and_compare`**

*   **Inputs:** The `human_survey_micro_df` and the `ai_dispersion_summary_df`.
*   **Processes:**
    1.  Performs the exact same dispersion analysis as Task 16, but on the human micro-data. Implements: $\mathrm{IQR}_{rvh}^{\mathrm{H}}$ and $\mathrm{SD}_{rvh}^{\mathrm{H}} = \sqrt{ \frac{1}{M} \sum \dots }$.
    2.  Merges the resulting human dispersion summary with the AI summary.
*   **Outputs:**
    *   A final comparison DataFrame (`dispersion_comparison_df`) that replicates **Table 3** from the paper.
*   **Role in Research Pipeline:** This function completes the **panel disagreement analysis** by providing the human benchmark and creating the final comparison table.



#### **Task 18: `compute_absolute_errors`**

*   **Inputs:** The `aligned_df`.
*   **Processes:**
    1.  Filters the data to scoreable rows (where `realized_value` is not null).
    2.  Calculates the absolute error for the AI panel. Implements: $e_{rvh}^{\mathrm{AI}} = \left| \hat{y}_{rvh}^{\mathrm{AI}} - y_{rvh} \right|$.
    3.  Calculates the absolute error for the human panel. Implements: $e_{rvh}^{\mathrm{H}} = \left| \hat{y}_{rvh}^{\mathrm{SPF}} - y_{rvh} \right|$.
*   **Outputs:**
    *   A DataFrame (`scored_forecasts_df`) containing the original data plus the two new absolute error columns.
*   **Role in Research Pipeline:** This function implements the **error calculation** step defined in **Section 3.4**.



#### **Task 19: `compute_mae_results`**

*   **Inputs:** The `scored_forecasts_df`.
*   **Processes:**
    1.  Groups the data by `(period, variable, horizon)`.
    2.  For each group, calculates the mean of the absolute errors for both the AI and human panels. Implements: $\mathrm{MAE}_{vh} = \frac{1}{n_{vh}} \sum_{r=1}^{n_{vh}} e_{rvh}$.
*   **Outputs:**
    *   A summary DataFrame (`mae_results_df`) that replicates **Table 4** from the paper.
*   **Role in Research Pipeline:** This function implements the **point accuracy analysis** using Mean Absolute Error, as defined in **Section 3.4**.



#### **Task 20: `construct_win_share_statistics`**

*   **Inputs:** The `scored_forecasts_df`.
*   **Processes:**
    1.  For each match, determines the winner by comparing absolute errors. Implements: $\mathrm{win}_{rvh} = \mathbf{1}\{ e_{rvh}^{\mathrm{AI}} < e_{rvh}^{\mathrm{H}} \}$. It robustly handles ties.
    2.  Groups by `(period, variable, horizon)` and aggregates the counts of AI wins, human wins, and ties.
    3.  Calculates the win-share, excluding ties from the denominator. Implements: $w_{vh} = W_{vh} / n_{vh}$, where $n_{vh} = W_{vh} + \text{human\_wins}$.
*   **Outputs:**
    *   A summary DataFrame (`win_share_df`) containing the statistics needed for hypothesis testing and replicating **Table 5**.
*   **Role in Research Pipeline:** This function implements the **relative performance (win-share) analysis** defined in **Section 3.4**.



#### **Task 21: `run_in_sample_mc_tests`**

*   **Inputs:** The `win_share_df`.
*   **Processes:**
    1.  Filters for in-sample data.
    2.  For each group, it runs a Monte Carlo simulation by drawing 10,000 samples from the null distribution, $W_{vh} \sim \mathrm{Binom}(n_{vh}, 0.5)$.
    3.  It calculates the empirical one-tailed and two-tailed p-values based on these simulations. Implements: $p_{vh}^{(1)} = \frac{1}{N} \sum \mathbf{1}\{ W_j^* \geq W_{vh} \}$ and $p_{vh}^{(2)} = 2 \min(\dots)$.
*   **Outputs:**
    *   The `win_share_df` updated with p-values and significance stars for the in-sample rows.
*   **Role in Research Pipeline:** This function implements the **hypothesis testing for the in-sample period**, as described in **Section 3.4, equation (1) and (2)**.



#### **Task 22: `run_out_of_sample_exact_tests`**

*   **Inputs:** The `win_share_df` (already containing in-sample results).
*   **Processes:**
    1.  Filters for out-of-sample data.
    2.  For each group, it calculates the p-values directly from the Binomial probability mass function. Implements: $p_{vh}^{(1)} = \Pr\{ W \geq W_{vh} \}$ and $p_{vh}^{(2)} = 2 \min(\dots)$, where $W \sim \mathrm{Binom}(n_{vh}, 0.5)$.
*   **Outputs:**
    *   The `win_share_df` now fully populated with p-values and significance stars for all rows.
*   **Role in Research Pipeline:** This function implements the **hypothesis testing for the out-of-sample period**, as described in **Section 3.4, equation (3) and (4)**.



#### **Task 23: `run_full_replication_pipeline`**

*   **Inputs:** All raw data paths and the config.
*   **Processes:** Orchestrates the execution of all the above tasks (1-22) in the correct sequence, managing the flow of data and artifacts.
*   **Outputs:** A comprehensive dictionary of all results and reports, and saves all final tables to disk.
*   **Role in Research Pipeline:** This is the **master orchestrator** for the entire replication study.



#### **Task 24: `run_ablation_paired_ttest`**

*   **Inputs:** The `aligned_df`.
*   **Processes:**
    1.  Prepares a paired dataset of absolute errors for the persona and no-persona arms.
    2.  Performs a paired t-test on these errors to test if the mean difference is zero. Implements the standard paired t-test formula: $t = \frac{\bar{d}}{s_d / \sqrt{J}}$.
*   **Outputs:**
    *   A report DataFrame with the t-statistic, p-value, and interpretation.
*   **Role in Research Pipeline:** This is the **first statistical test of the ablation study**, as described in **Section 4.1 ("Persona ablation effect")**.



#### **Task 25: `run_ablation_ks_test`**

*   **Inputs:** The `aligned_df`.
*   **Processes:**
    1.  Prepares two samples: the distribution of absolute errors for the persona arm and for the no-persona arm.
    2.  Performs a two-sample Kolmogorov-Smirnov test to check if the two samples are drawn from the same distribution. Implements: $D = \sup_{x} \left| \hat{F}_{\mathrm{P}}(x) - \hat{F}_{\mathrm{NP}}(x) \right|$.
*   **Outputs:**
    *   A report DataFrame with the D-statistic, p-value, and interpretation.
*   **Role in Research Pipeline:** This is the **second statistical test of the ablation study**, providing converging evidence for the findings in **Section 4.1**.



#### **Top-Level Orchestrator: `run_synthetic_economist_study`**

*   **Inputs:** All raw data paths and the config.
*   **Processes:** A top-level wrapper that first calls `run_full_replication_pipeline` (Tasks 1-22) and then, using the artifacts from that run, calls `run_ablation_paired_ttest` (Task 24) and `run_ablation_ks_test` (Task 25).
*   **Outputs:** A final, comprehensive dictionary containing all results from the entire study.
*   **Role in Research Pipeline:** This is the **single entry point** to execute the entire study, including the main replication and the final ablation tests.

<br><br>

### **Usage Example**

Below is a fully functional, high-fidelity example of how to run the main orchestrator function; i.e. the `run_synthetic_economist_study` function. The example consists of three main parts: *setting up the environment, generating realistic synthetic input data, and then executing the main function call*:


```python
# ==============================================================================
# Task: High-Fidelity Example of End-to-End Pipeline Execution
# ==============================================================================
# This script provides a complete, runnable example of how to use the
# `run_synthetic_economist_study` function. It includes three main parts:
# 1. Environment Setup: Prepares directories, loads configuration, and sets up logging.
# 2. Synthetic Data Generation: Creates high-fidelity, realistic mock data files
#    that conform to the required schemas and business rules.
# 3. Pipeline Execution: Calls the top-level orchestrator with the prepared
#    data and configuration to run the full study.
#
# NOTE: This example makes REAL API calls to OpenAI and will incur costs.
# Ensure your API key is correctly configured.

# ------------------------------------------------------------------------------
# Step 1: Environment Setup
# ------------------------------------------------------------------------------

# --- Import all necessary libraries and functions ---
# (This assumes all previously defined orchestrator functions are in scope)
import asyncio
import hashlib
import json
import logging
import os
import random
import re
import unicodedata
from collections import Counter
from pathlib import Path
from typing import Any, Coroutine, Dict, Generator, List, Optional, Set, Tuple, Union

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import yaml
from faker import Faker
from scipy.stats import binom, ks_2samp, ttest_rel

# --- Configure Logging ---
# Set up a basic logger for clear output.
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    force=True # Override any existing handlers
)
logger = logging.getLogger(__name__)

# --- Create Directory Structure ---
# Define a root directory for this example run.
run_dir = Path("./synthetic_economist_run")
# Define subdirectories for raw data and all pipeline outputs.
raw_data_dir = run_dir / "raw_data"
output_dir = run_dir / "output"
# Create the directories. `exist_ok=True` prevents errors if they already exist.
raw_data_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory structure in '{run_dir.resolve()}'.")

# --- Load Configuration ---
# The config.yaml file should be in the same directory as this script.
config_path = Path("config.yaml")
if not config_path.exists():
    raise FileNotFoundError("config.yaml not found. Please create it from the provided template.")

with open(config_path, 'r') as f:
    config = yaml.safe_load(f)
logger.info("Successfully loaded configuration from config.yaml.")

# --- API Key Configuration ---
# IMPORTANT: Set your OpenAI API key as an environment variable.
# For example, in your terminal: export OPENAI_API_KEY='your-key-here'
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    raise ValueError("OPENAI_API_KEY environment variable not set. The pipeline cannot run.")
# Add the key to the config for easy access by the API functions.
config['openai_api_key'] = api_key

# ------------------------------------------------------------------------------
# Step 2: High-Fidelity Synthetic Data Generation
# ------------------------------------------------------------------------------
# We will now generate realistic mock data files.

# Initialize Faker for generating random text.
fake = Faker()
Faker.seed(42)
random.seed(42)
np.random.seed(42)

def generate_synthetic_persona_hub(path: Path, num_rows: int, config: Dict[str, Any]):
    """Generates a small, representative mock of the PersonaHub dataset."""
    logger.info(f"Generating {num_rows} synthetic persona records...")
    lexicon = config['phase_1_parameters']['filtering']['keyword_filtering']['lexicon']
    data = []
    for i in range(num_rows):
        # Create a mix of relevant and irrelevant descriptions.
        if i % 5 == 0: # 20% of descriptions will be highly relevant
            desc_core = ' '.join(random.sample(lexicon, k=random.randint(2, 5)))
            desc = f"A leading economist focused on {desc_core}. {fake.paragraph(nb_sentences=2)}"
            tags = ['economics', 'finance']
        elif i % 5 == 1: # 20% will mention a person to test the NER filter
            desc = f"An analyst working with {fake.name()} on European trade policy. {fake.paragraph(nb_sentences=2)}"
            tags = ['policy']
        else: # 60% are irrelevant
            desc = fake.paragraph(nb_sentences=3)
            tags = ['general', fake.word()]
            
        record = {
            'id': f'persona_{i:05d}',
            'description': desc,
            'domain_tags': tags,
            'language_code': 'en',
            'token_count': len(desc.split()),
            'sha256_description_hash': hashlib.sha256(desc.encode('utf-8')).hexdigest(),
            'shard_id': i % 10
        }
        data.append(record)
    
    df = pd.DataFrame(data)
    df.to_parquet(path, engine='pyarrow')
    logger.info(f"Synthetic persona hub saved to '{path}'.")
    return num_rows

def generate_synthetic_contextual_data(path: Path, config: Dict[str, Any]):
    """Generates a mock contextual_data.csv file for all 50 rounds."""
    logger.info("Generating synthetic contextual data...")
    rounds = pd.period_range(start="2013Q1", end="2025Q2", freq='Q').strftime('%YQ%q')
    data = []
    hicpx_start = config['phase_2_parameters']['oos_and_availability']['hicpx_available_from_round']
    
    for r in rounds:
        year = int(r[:4])
        quarter = int(r[-1])
        survey_date = pd.to_datetime(f'{year}-{(quarter-1)*3+1}-15')
        
        # Generate plausible nested data
        realized = {
            'hicp': {'period': f'{year-1}Dec', 'value': round(random.uniform(0.5, 3.0), 2)},
            'hicpx': {'period': f'{year-1}Dec', 'value': round(random.uniform(0.5, 2.5), 2)} if r >= hicpx_start else {'period': None, 'value': np.nan},
            'unr': {'period': f'{year-1}Nov', 'value': round(random.uniform(6.0, 12.0), 2)},
            'rgdp': {'period': f'{year-1}Q4', 'value': round(random.uniform(-1.0, 2.0), 2)}
        }
        
        lt_year = year + 4
        medians = {
            'hicp': {y: round(random.uniform(1.5, 2.2), 2) for y in [year, year+1, year+2, lt_year]},
            'hicpx': {y: round(random.uniform(1.2, 2.0), 2) for y in [year, year+1, year+2, lt_year]} if r >= hicpx_start else {},
            'unr': {y: round(random.uniform(7.0, 10.0), 2) for y in [year, year+1, year+2, lt_year]},
            'rgdp': {y: round(random.uniform(0.5, 2.5), 2) for y in [year, year+1, year+2, lt_year]}
        }
        
        record = {
            'survey_round': r,
            'survey_date': survey_date.strftime('%d/%m/%Y'),
            'ecb_meeting_date': (survey_date - pd.Timedelta(days=7)).strftime('%d/%m/%Y'),
            'ecb_communication_text': f"ECB statement for {r}. {fake.paragraph(nb_sentences=10)}",
            'ecb_communication_token_count': 250,
            'ecb_communication_language_code': 'en',
            'latest_realized_data': json.dumps(realized),
            'previous_spf_medians': json.dumps(medians),
            'lt_year': lt_year,
            'variable_horizon_availability': json.dumps({'hicp': True, 'hicpx': r >= hicpx_start, 'unr': True, 'rgdp': True}),
            'data_vintage_timestamp': pd.Timestamp.now().isoformat()
        }
        data.append(record)
        
    df = pd.DataFrame(data)
    df.to_csv(path, index=False)
    logger.info(f"Synthetic contextual data saved to '{path}'.")

def generate_synthetic_human_data(contextual_path: Path, micro_path: Path, benchmark_path: Path):
    """Generates mock human micro and benchmark forecast data."""
    logger.info("Generating synthetic human forecast data...")
    context_df = pd.read_csv(contextual_path)
    variables = ['HICP', 'HICPX', 'RGDP', 'UNR']
    horizons = ['CY', 'CY+1', 'CY+2', 'LT']
    forecaster_ids = [f'human_{i:02d}' for i in range(10)]
    
    micro_data = []
    for _, row in context_df.iterrows():
        for var in variables:
            for hor in horizons:
                base_val = random.uniform(0, 10)
                for fid in forecaster_ids:
                    micro_data.append({
                        'round': row['survey_round'],
                        'variable': var,
                        'horizon': hor,
                        'forecaster_id': fid,
                        'value': round(base_val + random.normalvariate(0, 0.2), 2),
                        'target_year': 0 # Placeholder, will be computed by pipeline
                    })
    
    micro_df = pd.DataFrame(micro_data)
    # Create benchmark by taking the median
    benchmark_df = micro_df.groupby(['round', 'variable', 'horizon'])['value'].median().reset_index()
    benchmark_df['panel_size'] = 10
    benchmark_df['target_year'] = 0 # Placeholder
    
    micro_df.to_csv(micro_path, index=False)
    benchmark_df.to_csv(benchmark_path, index=False)
    logger.info(f"Synthetic human micro and benchmark data saved.")

def generate_synthetic_realized_data(path: Path):
    """Generates mock realized outcomes data."""
    logger.info("Generating synthetic realized outcomes...")
    years = range(2013, 2028)
    variables = ['HICP', 'HICPX', 'RGDP', 'UNR']
    methods = {
        'HICP': 'simple average of 12 monthly YoY rates',
        'HICPX': 'simple average of 12 monthly YoY rates',
        'RGDP': 'annual growth percent',
        'UNR': 'simple average of 12 monthly rates'
    }
    data = []
    for year in years:
        for var in variables:
            data.append({
                'reference_year': year,
                'variable': var,
                'value': round(random.uniform(0, 10), 2),
                'source_series_id': f'FAKE_{var}_{year}',
                'first_release_date': pd.to_datetime(f'{year+1}-01-25').isoformat(),
                'retrieval_date': pd.Timestamp.now().isoformat(),
                'revision_status': 'first',
                'aggregation_method': methods[var]
            })
    df = pd.DataFrame(data)
    df.to_csv(path, index=False)
    logger.info(f"Synthetic realized outcomes saved to '{path}'.")

# --- Generate all synthetic data files ---
data_paths = {
    'persona_hub': str(raw_data_dir / "persona_hub_raw.parquet"),
    'contextual_data': str(raw_data_dir / "contextual_data.csv"),
    'human_benchmark': str(raw_data_dir / "human_benchmark.csv"),
    'human_micro': str(raw_data_dir / "human_micro.csv"),
    'realized_outcomes': str(raw_data_dir / "realized_outcomes.csv"),
    'human_annotations': str(raw_data_dir / "human_annotations.csv") # Assuming this exists for kappa
}

# For the large persona hub, we generate a small but representative sample.
TOTAL_SYNTHETIC_PERSONAS = 10000
generate_synthetic_persona_hub(data_paths['persona_hub'], TOTAL_SYNTHETIC_PERSONAS, config)
generate_synthetic_contextual_data(data_paths['contextual_data'], config)
generate_synthetic_human_data(data_paths['contextual_data'], data_paths['human_micro'], data_paths['human_benchmark'])
generate_synthetic_realized_data(data_paths['realized_outcomes'])
# Create a dummy human annotations file for the kappa validation step
pd.DataFrame({
    'persona_id': [f'persona_{i:05d}' for i in range(50)],
    'annotator_id': ['A1']*25 + ['A2']*25,
    'criterion': ['eu_centrality']*50,
    'judgment': [True]*50
}).to_csv(data_paths['human_annotations'], index=False)


# ------------------------------------------------------------------------------
# Step 3: Pipeline Execution
# ------------------------------------------------------------------------------
logger.info("="*80)
logger.info("Starting the full end-to-end pipeline execution with synthetic data.")
logger.info("="*80)

# Call the top-level orchestrator function with the prepared paths and config.
# This will run the entire study from start to finish.
final_artifacts = run_synthetic_economist_study(
    data_paths=data_paths,
    config=config,
    output_dir=str(output_dir),
    total_persona_rows=TOTAL_SYNTHETIC_PERSONAS,
    run_kappa_validation=True
)

logger.info("="*80)
logger.info("Pipeline execution finished.")
logger.info("="*80)

# --- Inspect the final results ---
logger.info("Final reports and artifacts have been generated in the output directory.")
logger.info("Displaying the final ablation study reports:")

# Display the t-test report
print("\n--- Ablation Paired T-Test Report ---")
print(final_artifacts['ablation_ttest_report'].to_markdown(index=False))

# Display the KS-test report
print("\n--- Ablation Kolmogorov-Smirnov Test Report ---")
print(final_artifacts['ablation_ks_test_report'].to_markdown(index=False))

# Display the main reports summary
print("\n--- Full Pipeline Report Summary ---")
with open(output_dir / "results" / "full_pipeline_report.json", 'r') as f:
    full_report = json.load(f)
print(f"Overall Status: {full_report.get('pipeline_status')}")
if full_report.get('pipeline_status') == 'FAILURE':
    print(f"Failure Reason: {full_report.get('failure_reason')}")

```

In [None]:
# Task 1 — Validate persona_hub_raw_df schema and quality

# ==============================================================================
# Task 1: Validate persona_hub_raw_df schema and quality
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 1, Step 1 Helper: Schema and dtype enforcement
# ------------------------------------------------------------------------------
def _validate_chunk_schema(
    chunk: pd.DataFrame,
    expected_schema: Dict[str, Any],
    chunk_number: int
) -> List[str]:
    """
    Validates the schema and dtypes of a single DataFrame chunk.

    Args:
        chunk: The pandas DataFrame chunk to validate.
        expected_schema: A dictionary defining expected column names and their
                         corresponding pandas dtypes or type-checking functions.
        chunk_number: The sequential number of the chunk for error reporting.

    Returns:
        A list of error messages. An empty list indicates success.
    """
    # Initialize a list to collect validation error messages for this chunk.
    errors = []

    # --- Column Presence Check ---
    # Get the set of columns present in the current chunk.
    actual_columns = set(chunk.columns)
    # Get the set of expected column names from the schema definition.
    expected_columns = set(expected_schema.keys())

    # Check for any columns that are expected but missing in the chunk.
    missing_columns = expected_columns - actual_columns
    # If there are missing columns, format an error message.
    if missing_columns:
        errors.append(
            f"[Chunk {chunk_number}] Missing expected columns: {sorted(list(missing_columns))}"
        )

    # Check for any columns that are present in the chunk but not expected.
    extra_columns = actual_columns - expected_columns
    # If there are extra columns, format an error message.
    if extra_columns:
        errors.append(
            f"[Chunk {chunk_number}] Found unexpected columns: {sorted(list(extra_columns))}"
        )

    # If there are any column presence/absence errors, return immediately
    # as subsequent dtype checks on missing/extra columns would fail.
    if errors:
        return errors

    # --- Dtype Enforcement Check ---
    # Iterate through each expected column and its specified dtype to validate.
    for col, expected_dtype in expected_schema.items():
        # Get the actual dtype of the column in the chunk.
        actual_dtype = chunk[col].dtype
        # Check if the actual dtype matches the expected dtype.
        if str(actual_dtype) != str(expected_dtype):
            errors.append(
                f"[Chunk {chunk_number}] Column '{col}' has incorrect dtype. "
                f"Expected: {expected_dtype}, Actual: {actual_dtype}"
            )

    # Return the list of all collected error messages.
    return errors

# ------------------------------------------------------------------------------
# Task 1, Step 2 Helper: Content and distribution sanity checks
# ------------------------------------------------------------------------------
def _update_content_stats(
    chunk: pd.DataFrame,
    stats: Dict[str, Any],
    config: Dict[str, Any]
) -> None:
    """
    Updates running statistics for content validation from a single chunk.

    Args:
        chunk: The pandas DataFrame chunk to process.
        stats: A dictionary holding the accumulated statistics across all chunks.
        config: The study configuration dictionary.
    """
    # --- Description Null Check ---
    # Count the number of null values in the 'description' column for this chunk.
    stats['null_description_count'] += chunk['description'].isna().sum()

    # --- Token Count Distribution ---
    # Count records where 'token_count' is less than 1.
    stats['token_count_lt_1'] += (chunk['token_count'] < 1).sum()

    # --- Language Code Validation ---
    # Get the list of allowed language codes from the configuration.
    language_allowlist = set(
        config["phase_1_parameters"]["filtering"]["keyword_filtering"]["language_allowlist"]
    )
    # Find unique language codes in the chunk that are not in the allowlist.
    disallowed_languages = set(chunk['language_code'].unique()) - language_allowlist
    # Update the set of all disallowed languages found so far.
    stats['disallowed_languages'].update(disallowed_languages)

    # --- Domain Tags Distribution ---
    # Process the 'domain_tags' column, which may contain lists.
    # Drop nulls, then flatten the list of lists into a single sequence of tags.
    valid_tags = chunk['domain_tags'].dropna()
    # Use a generator expression for memory-efficient iteration over tags.
    flattened_tags = (tag for tag_list in valid_tags if isinstance(tag_list, list) for tag in tag_list)
    # Update the frequency counter with the tags from this chunk.
    stats['domain_tags_counter'].update(flattened_tags)

# ------------------------------------------------------------------------------
# Task 1, Step 3 Helper: Hash integrity and shard coverage
# ------------------------------------------------------------------------------
# ==============================================================================
# Standard Library and Third-Party Imports
# ==============================================================================
def _update_integrity_stats(
    chunk: pd.DataFrame,
    stats: Dict[str, Any],
    total_rows: int,
    sample_size: int = 10000
) -> None:
    """
    Updates running statistics for hash integrity and shard coverage from a chunk.

    This function performs two main operations on a given DataFrame chunk:
    1.  **Hash Integrity Check (Sample-based):** It draws a probabilistic,
        reproducible random sample of rows. For each sampled row, it re-computes
        the SHA-256 hash of the 'description' and compares it to the stored hash.
        It explicitly handles null descriptions to ensure deterministic hashing.
    2.  **Shard Coverage:** It counts the occurrences of each 'shard_id' in the
        chunk and updates a global counter to track the size of each shard.

    This function is designed to be called iteratively in a streaming/chunked
    data processing pipeline.

    Args:
        chunk: The pandas DataFrame chunk to process.
        stats: A dictionary holding the accumulated statistics across all chunks.
               This dictionary is modified in place.
        total_rows: The total number of rows in the entire dataset, required for
                    accurate probabilistic sampling.
        sample_size: The desired size of the random sample for the hash check.

    Raises:
        KeyError: If required columns are missing from the chunk or stats dict.
    """
    # --- Input Validation ---
    # Ensure required keys exist in the stats dictionary to prevent runtime errors.
    required_stats_keys = [
        'mismatched_hash_ids',
        'rows_sampled_for_hash_check',
        'shard_coverage_counter'
    ]
    if not all(key in stats for key in required_stats_keys):
        raise KeyError(f"Stats dictionary is missing one of the required keys: {required_stats_keys}")

    # --- Hash Integrity Check (Sample-based) ---
    # Calculate the probability for selecting any given row to be part of the final sample.
    # This ensures a uniform random sample across the entire dataset when processing in chunks.
    sampling_prob = sample_size / total_rows

    # Generate a random boolean mask to select rows for the sample based on the calculated probability.
    sample_mask = np.random.rand(len(chunk)) < sampling_prob

    # Apply the mask to the chunk to extract the sample for this chunk.
    sample = chunk[sample_mask]

    # Proceed only if the sample for this chunk is not empty.
    if not sample.empty:
        # Explicitly handle potential nulls in the description by replacing them with an empty string.
        # This is a critical step for ensuring deterministic and correct hash re-computation.
        descriptions_to_hash = sample['description'].fillna('').astype(str)

        # Recompute the SHA-256 hash for the 'description' of each sampled row.
        # The lambda function encodes the string to UTF-8 bytes, computes the hash, and gets the hex digest.
        recomputed_hashes = descriptions_to_hash.apply(
            lambda x: hashlib.sha256(x.encode('utf-8')).hexdigest()
        )

        # Identify rows where the newly computed hash does not match the stored hash.
        mismatched_mask = recomputed_hashes != sample['sha256_description_hash']

        # Extract the subset of the sample where mismatches occurred.
        mismatched_rows = sample[mismatched_mask]

        # If any mismatches were found, record their unique IDs for the final report.
        if not mismatched_rows.empty:
            stats['mismatched_hash_ids'].extend(mismatched_rows['id'].tolist())

        # Update the counter for the total number of rows sampled so far.
        stats['rows_sampled_for_hash_check'] += len(sample)

    # --- Shard Coverage ---
    # Calculate the frequency of each 'shard_id' within the current chunk.
    shard_counts_in_chunk = chunk['shard_id'].value_counts()

    # Update the global shard coverage counter with the counts from this chunk.
    # The Counter's update method efficiently adds the new counts.
    stats['shard_coverage_counter'].update(shard_counts_in_chunk.to_dict())

# ------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# ------------------------------------------------------------------------------
def validate_persona_hub_df(
    df_path: str,
    config: Dict[str, Any],
    total_rows: int,
    chunksize: int = 1_000_000
) -> Dict[str, Any]:
    """
    Performs a comprehensive, streaming validation of the raw persona hub dataset.

    This orchestrator function processes a very large dataset in chunks to maintain
    a low memory footprint. It performs a battery of checks to ensure data
    integrity before any costly processing begins. The validation includes:
    1.  **Schema Enforcement**: Verifies that all columns are present with the
        correct data types in every chunk.
    2.  **Global ID Uniqueness**: Employs a robust, memory-aware method to ensure
        every `id` across the ~370M records is unique.
    3.  **Content Quality**: Checks for null descriptions, valid token counts,
        and adherence to language allowlists.
    4.  **Data Integrity**: Performs a sample-based SHA-256 hash check and
        verifies complete shard coverage.

    Args:
        df_path: The file path to the Parquet dataset.
        config: The study configuration dictionary containing validation parameters.
        total_rows: The total number of rows in the dataset, which is essential
                    for accurate probabilistic sampling and progress tracking.
        chunksize: The number of rows to process in each memory-efficient chunk.

    Returns:
        A dictionary containing a detailed validation report, including an
        overall status, a list of specific errors, and computed statistics.
    """
    # Define the expected schema for the persona_hub_raw_df DataFrame.
    # This serves as the ground truth for schema validation.
    expected_schema = {
        'id': 'object', # Read as object initially for flexibility, then cast to string
        'description': 'object',
        'domain_tags': 'object',
        'language_code': 'object',
        'token_count': 'int64',
        'sha256_description_hash': 'object',
        'shard_id': 'int64'
    }

    # Initialize a dictionary to hold all accumulated statistics and validation results.
    stats = {
        'total_rows_processed': 0,
        'id_uniqueness': {'is_unique': True, 'duplicate_ids': set()},
        'null_description_count': 0,
        'token_count_lt_1': 0,
        'disallowed_languages': set(),
        'domain_tags_counter': Counter(),
        'mismatched_hash_ids': [],
        'rows_sampled_for_hash_check': 0,
        'shard_coverage_counter': Counter(),
    }

    # Initialize a set to track all unique IDs encountered for global uniqueness check.
    # This is memory-intensive but necessary for a single-pass guarantee.
    seen_ids: set[str] = set()
    # Initialize a list to store any validation errors found.
    validation_errors: List[str] = []
    # Flag to ensure schema is checked only on the first chunk.
    is_first_chunk = True

    # Set a fixed random seed for reproducible sampling for the hash check.
    np.random.seed(42)

    # Log the start of the validation process.
    logger.info(f"Starting validation for '{df_path}' with {total_rows} total rows.")

    try:
        # Create an iterator to read the Parquet file in chunks.
        chunk_iterator = pd.read_parquet(df_path, chunksize=chunksize, engine='pyarrow')

        # Loop through each chunk from the iterator with a progress bar.
        for i, chunk in enumerate(chunk_iterator):
            # --- Schema Validation (Step 1) ---
            # On the first chunk, perform a thorough schema validation.
            if is_first_chunk:
                # Call the schema validation helper.
                schema_errors = _validate_chunk_schema(chunk, expected_schema, i + 1)
                # If any schema errors are found, add them to the main error list.
                if schema_errors:
                    # Add all found schema errors to the main list.
                    validation_errors.extend(schema_errors)
                    # If the schema is fundamentally wrong, stop processing immediately.
                    logger.error("Fatal schema error on first chunk. Aborting validation.")
                    # Break the loop as further processing is unreliable.
                    break
                # Mark that the first chunk's schema has been successfully validated.
                is_first_chunk = False

            # --- ID Uniqueness Check (Corrected Logic) ---
            # Ensure the 'id' column is treated as a string for consistency.
            chunk_ids_series = chunk['id'].astype(str)

            # 1. Check for duplicates *within* the current chunk.
            if chunk_ids_series.nunique() != len(chunk_ids_series):
                # Mark global uniqueness as failed.
                stats['id_uniqueness']['is_unique'] = False
                # Find and store the specific duplicate IDs within this chunk.
                duplicates_in_chunk = chunk_ids_series[chunk_ids_series.duplicated()].unique()
                stats['id_uniqueness']['duplicate_ids'].update(duplicates_in_chunk)

            # 2. Check for duplicates *across* chunks.
            # Get the set of unique IDs in the current chunk.
            chunk_ids_set = set(chunk_ids_series.unique())
            # Find the intersection with IDs seen in all previous chunks.
            cross_chunk_duplicates = seen_ids.intersection(chunk_ids_set)
            # If the intersection is not empty, a cross-chunk duplicate exists.
            if cross_chunk_duplicates:
                # Mark global uniqueness as failed.
                stats['id_uniqueness']['is_unique'] = False
                # Add these duplicates to our tracking set.
                stats['id_uniqueness']['duplicate_ids'].update(cross_chunk_duplicates)

            # 3. Update the global set of seen IDs with the unique IDs from this chunk.
            seen_ids.update(chunk_ids_set)

            # --- Content & Integrity Updates (Steps 2 & 3) ---
            # Update content statistics (nulls, token counts, etc.) by calling the helper.
            _update_content_stats(chunk, stats, config)
            # Update integrity statistics (hash check sample, shard coverage) by calling the helper.
            _update_integrity_stats(chunk, stats, total_rows)

            # Update the total number of rows processed so far.
            stats['total_rows_processed'] += len(chunk)
            # Log progress periodically to provide feedback on the long-running process.
            if (i + 1) % 5 == 0:
                logger.info(f"Processed chunk {i + 1}... ({stats['total_rows_processed']}/{total_rows} rows)")

    # Handle potential errors during file processing, such as the file not being found.
    except FileNotFoundError:
        validation_errors.append(f"File not found at path: {df_path}")
    # Catch any other unexpected exceptions during the process.
    except Exception as e:
        validation_errors.append(f"An unexpected error occurred during processing: {e}")

    # --- Final Report Generation ---
    # Log the completion of the chunk processing phase.
    logger.info("Chunk processing complete. Generating final validation report.")

    # Final check on token count requirement (>=99.9% of rows must have token_count >= 1).
    token_count_pass = (stats['token_count_lt_1'] / stats['total_rows_processed']) < 0.001 if stats['total_rows_processed'] > 0 else True

    # Construct the final, detailed validation report dictionary.
    report = {
        "file_path": df_path,
        "total_rows_validated": stats['total_rows_processed'],
        "validation_status": "SUCCESS" if not validation_errors and stats['id_uniqueness']['is_unique'] else "FAILURE",
        "errors": validation_errors,
        "id_uniqueness": {
            "status": "SUCCESS" if stats['id_uniqueness']['is_unique'] else "FAILURE",
            "is_unique": stats['id_uniqueness']['is_unique'],
            "duplicate_count": len(stats['id_uniqueness']['duplicate_ids']),
            "duplicate_ids_sample": sorted(list(stats['id_uniqueness']['duplicate_ids']))[:10],
        },
        "content_quality": {
            "description_null_ratio": stats['null_description_count'] / stats['total_rows_processed'] if stats['total_rows_processed'] > 0 else 0,
            "token_count_ge_1_ratio": 1.0 - (stats['token_count_lt_1'] / stats['total_rows_processed']) if stats['total_rows_processed'] > 0 else 0,
            "token_count_check_status": "SUCCESS" if token_count_pass else "FAILURE (>=99.9% requirement not met)",
            "disallowed_languages_found": sorted(list(stats['disallowed_languages'])),
            "top_10_domain_tags": stats['domain_tags_counter'].most_common(10),
        },
        "integrity_checks": {
            "hash_check_sample_size": stats['rows_sampled_for_hash_check'],
            "hash_mismatch_count": len(stats['mismatched_hash_ids']),
            "hash_mismatch_ids_sample": stats['mismatched_hash_ids'][:10],
            "hash_check_status": "SUCCESS" if not stats['mismatched_hash_ids'] else "FAILURE",
            "shard_count": len(stats['shard_coverage_counter']),
            "shard_size_summary": {
                "min": min(stats['shard_coverage_counter'].values()) if stats['shard_coverage_counter'] else 0,
                "max": max(stats['shard_coverage_counter'].values()) if stats['shard_coverage_counter'] else 0,
                "mean": np.mean(list(stats['shard_coverage_counter'].values())) if stats['shard_coverage_counter'] else 0,
            }
        }
    }

    # Log the final status of the validation.
    if report["validation_status"] == "SUCCESS":
        logger.info("Validation successful. All checks passed.")
    else:
        logger.warning(f"Validation failed. See report for details.")

    # Return the comprehensive report.
    return report


In [None]:
# Task 2 — Validate contextual_data_df, human benchmarks, realized outcomes, and config

# ===================================================================================
# Task 2: Validate contextual_data_df, human benchmarks, realized outcomes, & config
# ===================================================================================

# ------------------------------------------------------------------------------
# Task 2, Step 1 Helper: Validate contextual_data_df
# ------------------------------------------------------------------------------
def _validate_contextual_df(
    contextual_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates the schema, content, and integrity of contextual_data_df.

    Args:
        contextual_data_df: The DataFrame containing contextual data for prompts.
        config: The study configuration dictionary.

    Returns:
        A list of error messages. An empty list indicates success.
    """
    # Initialize a list to collect validation error messages.
    errors = []
    # Define the expected number of survey rounds.
    EXPECTED_ROUNDS = 50
    # Define the start and end rounds for the survey period.
    START_ROUND, END_ROUND = "2013Q1", "2025Q2"

    # --- Row Count and Round Coverage Validation ---
    # Check if the DataFrame contains the expected number of rows.
    if len(contextual_data_df) != EXPECTED_ROUNDS:
        errors.append(f"Expected {EXPECTED_ROUNDS} rows, but found {len(contextual_data_df)}.")

    # Generate the full set of expected survey rounds.
    expected_rounds = set(pd.period_range(start=START_ROUND, end=END_ROUND, freq='Q').strftime('%YQ%q'))
    # Get the set of actual survey rounds from the DataFrame.
    actual_rounds = set(contextual_data_df['survey_round'])
    # Check for any missing or extra survey rounds.
    if expected_rounds != actual_rounds:
        missing = sorted(list(expected_rounds - actual_rounds))
        extra = sorted(list(actual_rounds - expected_rounds))
        errors.append(f"Mismatch in survey rounds. Missing: {missing}. Extra: {extra}.")

    # --- Date and Text Field Validation ---
    # Iterate through each row to perform detailed content validation.
    for _, row in contextual_data_df.iterrows():
        round_id = row['survey_round']
        # Validate the survey_round format using a regular expression.
        if not re.match(r'^\d{4}Q[1-4]$', round_id):
            errors.append(f"Round '{round_id}': Invalid format for 'survey_round'.")

        # Validate temporal consistency: ECB meeting must not be after the survey date.
        if pd.notna(row['ecb_meeting_date']) and pd.notna(row['survey_date']):
            if row['ecb_meeting_date'] > row['survey_date']:
                errors.append(f"Round '{round_id}': 'ecb_meeting_date' is after 'survey_date'.")

        # Validate ECB communication text content.
        if not isinstance(row['ecb_communication_text'], str) or not row['ecb_communication_text'].strip():
            errors.append(f"Round '{round_id}': 'ecb_communication_text' is empty or not a string.")

        # Validate token count is a positive integer.
        if not isinstance(row['ecb_communication_token_count'], (int, np.integer)) or row['ecb_communication_token_count'] <= 0:
            errors.append(f"Round '{round_id}': 'ecb_communication_token_count' is not a positive integer.")

    # --- Nested Dictionary Validation ---
    # Define the required keys for the nested dictionaries.
    MACRO_VARS = {'hicp', 'hicpx', 'unr', 'rgdp'}
    # Retrieve the round from which HICPX data is expected to be available.
    hicpx_start_round = config['phase_2_parameters']['oos_and_availability']['hicpx_available_from_round']

    # Iterate through each row again for nested structure validation.
    for _, row in contextual_data_df.iterrows():
        round_id = row['survey_round']

        # Validate 'latest_realized_data' structure.
        realized_data = row['latest_realized_data']
        if not isinstance(realized_data, dict) or set(realized_data.keys()) != MACRO_VARS:
            errors.append(f"Round '{round_id}': 'latest_realized_data' has incorrect keys.")
        else:
            # Check sub-dictionary structure and HICPX availability logic.
            for var, data in realized_data.items():
                if not isinstance(data, dict) or set(data.keys()) != {'period', 'value'}:
                    errors.append(f"Round '{round_id}': 'latest_realized_data' for '{var}' has incorrect structure.")
                # If the round is before HICPX availability, its value must be NaN.
                if var == 'hicpx' and round_id < hicpx_start_round and pd.notna(data.get('value')):
                    errors.append(f"Round '{round_id}': HICPX value should be NaN before {hicpx_start_round}.")

        # Validate 'previous_spf_medians' structure.
        prev_medians = row['previous_spf_medians']
        if not isinstance(prev_medians, dict) or set(prev_medians.keys()) != MACRO_VARS:
            errors.append(f"Round '{round_id}': 'previous_spf_medians' has incorrect keys.")
        else:
            # Check that sub-dictionary keys are integers (years).
            for var, data in prev_medians.items():
                if not all(isinstance(k, int) for k in data.keys()):
                    errors.append(f"Round '{round_id}': 'previous_spf_medians' for '{var}' has non-integer keys.")

    # Return the list of all collected error messages.
    return errors

# ------------------------------------------------------------------------------
# Task 2, Step 2 Helper: Validate human survey data
# ------------------------------------------------------------------------------
def _validate_human_surveys(
    benchmark_df: pd.DataFrame,
    micro_df: pd.DataFrame,
    contextual_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates human_survey_predictions_benchmark_df and human_survey_micro_df.

    Args:
        benchmark_df: DataFrame with human median forecasts.
        micro_df: DataFrame with individual human forecasts.
        contextual_data_df: DataFrame with contextual data for cross-validation.
        config: The study configuration dictionary.

    Returns:
        A list of error messages. An empty list indicates success.
    """
    # Initialize a list to collect validation error messages.
    errors = []
    # Define expected categorical values.
    EXPECTED_VARS = {'HICP', 'HICPX', 'RGDP', 'UNR'}
    EXPECTED_HORIZONS = {'CY', 'CY+1', 'CY+2', 'LT'}

    # Retrieve configuration parameters for OOS and HICPX rules.
    oos_rounds = set(config['phase_2_parameters']['oos_and_availability']['oos_rounds'])
    oos_horizons = set(config['phase_2_parameters']['oos_and_availability']['oos_scored_horizons'])
    hicpx_start_round = config['phase_2_parameters']['oos_and_availability']['hicpx_available_from_round']

    # --- Loop through both human forecast DataFrames for consistent validation ---
    for df_name, df in [("benchmark", benchmark_df), ("micro", micro_df)]:
        # Validate categorical columns.
        if not set(df['variable'].unique()).issubset(EXPECTED_VARS):
            errors.append(f"[{df_name}] Contains unexpected values in 'variable' column.")
        if not set(df['horizon'].unique()).issubset(EXPECTED_HORIZONS):
            errors.append(f"[{df_name}] Contains unexpected values in 'horizon' column.")

        # --- Target Year Integrity Check ---
        # Recompute target_year to validate against the provided column.
        df_copy = df.copy()
        df_copy['base_year'] = df_copy['round'].str[:4].astype(int)
        horizon_map = {'CY': 0, 'CY+1': 1, 'CY+2': 2}
        df_copy['computed_target_year'] = df_copy['base_year'] + df_copy['horizon'].map(horizon_map)

        # Handle the 'LT' horizon by joining with contextual data.
        lt_map = contextual_data_df.set_index('survey_round')['lt_year']
        lt_mask = df_copy['horizon'] == 'LT'
        df_copy.loc[lt_mask, 'computed_target_year'] = df_copy.loc[lt_mask, 'round'].map(lt_map)

        # Check if the computed target year matches the one in the DataFrame.
        if not df_copy['target_year'].equals(df_copy['computed_target_year']):
            errors.append(f"[{df_name}] 'target_year' column is inconsistent with 'round' and 'horizon'.")

        # --- HICPX and OOS Business Rule Enforcement ---
        # Check for HICPX data appearing before its official start round.
        if not df.query("variable == 'HICPX' and round < @hicpx_start_round").empty:
            errors.append(f"[{df_name}] Found HICPX data before {hicpx_start_round}.")

        # Check for disallowed horizons in out-of-sample rounds.
        oos_df = df[df['round'].isin(oos_rounds)]
        if not oos_df[~oos_df['horizon'].isin(oos_horizons)].empty:
            errors.append(f"[{df_name}] Found disallowed horizons in out-of-sample rounds.")

    # --- Uniqueness Check for Micro Data ---
    # Verify that each forecaster has only one prediction per round/variable/horizon.
    if micro_df.duplicated(subset=['round', 'variable', 'horizon', 'forecaster_id']).any():
        errors.append("[micro] Found duplicate entries for the same forecaster, round, variable, and horizon.")

    # Return the list of all collected error messages.
    return errors

# ------------------------------------------------------------------------------
# Task 2, Step 3 Helper: Validate realized outcomes and config
# ------------------------------------------------------------------------------
def _validate_realized_and_config(
    realized_macro_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Validates realized_macro_data_df and the study configuration dictionary.

    Args:
        realized_macro_data_df: DataFrame with realized macroeconomic outcomes.
        config: The study configuration dictionary.

    Returns:
        A list of error messages. An empty list indicates success.
    """
    # Initialize a list to collect validation error messages.
    errors = []

    # --- Realized Data Validation ---
    # Check for duplicate entries for the same year and variable.
    if realized_macro_data_df.duplicated(subset=['reference_year', 'variable']).any():
        errors.append("Found duplicate entries in 'realized_macro_data_df'.")

    # Validate the 'first_release_date' to prevent look-ahead bias.
    # The first release of an annual figure should not be before the end of that year.
    # A stricter check: it must be after Jan 1 of the following year.
    df_copy = realized_macro_data_df.copy()
    df_copy['min_release_date'] = pd.to_datetime(df_copy['reference_year'] + 1, format='%Y')
    if (df_copy['first_release_date'] < df_copy['min_release_date']).any():
        errors.append("Found 'first_release_date' values that imply look-ahead bias.")

    # --- Configuration Dictionary Validation ---
    # This is a critical check of key parameters that define the experiment.
    try:
        # Phase 1: Persona Generation Parameters
        p1 = config['phase_1_parameters']
        if p1['llm_as_judge']['model_name'] != 'gpt-4o-mini':
            errors.append("Config: Judge LLM must be 'gpt-4o-mini'.")
        if p1['llm_as_judge']['api_settings']['temperature'] != 1.0:
            errors.append("Config: Judge LLM temperature must be 1.0.")

        # Phase 2: Forecasting and Evaluation Parameters
        p2 = config['phase_2_parameters']
        if p2['forecasting_llm']['model_name'] != 'gpt-4o-2024-11-20':
            errors.append("Config: Forecasting LLM must be 'gpt-4o-2024-11-20'.")
        if p2['forecasting_llm']['api_settings']['temperature'] != 1.0:
            errors.append("Config: Forecasting LLM temperature must be 1.0.")
        if p2['scoring_and_inference']['monte_carlo_simulations'] != 10000:
            errors.append("Config: Monte Carlo simulations must be 10000.")
        if p2['scoring_and_inference']['monte_carlo_seed'] != 42:
            errors.append("Config: Monte Carlo seed must be 42.")
        if p2['ablation_study']['baseline_runs_per_round'] != 100:
            errors.append("Config: Baseline runs per round must be 100.")

        # Check OOS rounds and HICPX start date for consistency.
        expected_oos = ["2024Q1", "2024Q2", "2024Q3", "2024Q4", "2025Q1", "2025Q2"]
        if p2['oos_and_availability']['oos_rounds'] != expected_oos:
            errors.append("Config: 'oos_rounds' list is incorrect.")
        if p2['oos_and_availability']['hicpx_available_from_round'] != '2016Q4':
            errors.append("Config: 'hicpx_available_from_round' must be '2016Q4'.")

    except KeyError as e:
        errors.append(f"Config is missing a required key: {e}")
    except Exception as e:
        errors.append(f"An unexpected error occurred during config validation: {e}")

    # Return the list of all collected error messages.
    return errors

# ------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# ------------------------------------------------------------------------------
def validate_analytical_inputs(
    contextual_data_df: pd.DataFrame,
    human_survey_predictions_benchmark_df: pd.DataFrame,
    human_survey_micro_df: pd.DataFrame,
    realized_macro_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the validation of all analytical inputs for the study.

    This function serves as a master validator for the core datasets and the
    configuration dictionary that drive the forecasting and analysis phases.
    It ensures data integrity, alignment between tables, and adherence to the
    study's methodological parameters before proceeding with computation.

    Args:
        contextual_data_df: The DataFrame with contextual data for prompts.
        human_survey_predictions_benchmark_df: DataFrame with human median forecasts.
        human_survey_micro_df: DataFrame with individual human forecasts.
        realized_macro_data_df: DataFrame with realized macroeconomic outcomes.
        config: The study configuration dictionary.

    Returns:
        A dictionary containing a detailed validation report.
        Raises ValueError if any critical validation fails.
    """
    # Log the start of the analytical input validation process.
    logger.info("Starting validation of all analytical inputs...")

    # Initialize a dictionary to hold all validation errors.
    all_errors = {}

    # --- Step 1: Validate contextual_data_df ---
    # This DataFrame is central, so its validation is critical.
    logger.info("Validating 'contextual_data_df'...")
    contextual_errors = _validate_contextual_df(contextual_data_df, config)
    if contextual_errors:
        all_errors['contextual_data_df'] = contextual_errors

    # --- Step 2: Validate human survey data ---
    # These DataFrames must be consistent with each other and with contextual data.
    logger.info("Validating human survey DataFrames...")
    human_errors = _validate_human_surveys(
        human_survey_predictions_benchmark_df,
        human_survey_micro_df,
        contextual_data_df,
        config
    )
    if human_errors:
        all_errors['human_surveys'] = human_errors

    # --- Step 3: Validate realized outcomes and config ---
    # These components define the ground truth for scoring and the experiment's rules.
    logger.info("Validating realized outcomes and configuration...")
    realized_config_errors = _validate_realized_and_config(
        realized_macro_data_df,
        config
    )
    if realized_config_errors:
        all_errors['realized_and_config'] = realized_config_errors

    # --- Final Report Generation ---
    # Determine the overall validation status based on whether any errors were found.
    validation_status = "SUCCESS" if not all_errors else "FAILURE"

    # Construct the final, comprehensive validation report.
    report = {
        "overall_status": validation_status,
        "validation_details": {
            "contextual_data": "SUCCESS" if 'contextual_data_df' not in all_errors else "FAILURE",
            "human_surveys": "SUCCESS" if 'human_surveys' not in all_errors else "FAILURE",
            "realized_and_config": "SUCCESS" if 'realized_and_config' not in all_errors else "FAILURE",
        },
        "error_messages": all_errors if all_errors else "All checks passed."
    }

    # If validation failed, log the details and raise a critical error.
    if validation_status == "FAILURE":
        logger.error("Analytical input validation failed. See report for details.")
        logger.error(f"Error details: {all_errors}")
        raise ValueError("Critical validation of analytical inputs failed.")

    # If validation succeeded, log the success message.
    logger.info("Validation of all analytical inputs completed successfully.")

    # Return the comprehensive report.
    return report


In [None]:
# Task 3 — Cleanse persona_hub_raw_df (minimal, lossless)

# ==============================================================================
# Task 3: Cleanse persona_hub_raw_df (minimal, lossless)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 3, Step 1 Helper: Text normalization (description)
# ------------------------------------------------------------------------------
def _normalize_descriptions(description_series: pd.Series) -> pd.Series:
    """
    Applies a series of normalization steps to the description text.

    Args:
        description_series: A pandas Series containing the raw description strings.

    Returns:
        A pandas Series with the normalized description strings.
    """
    # Ensure the input is treated as string type, converting NaNs to empty strings for processing.
    normalized = description_series.fillna('').astype(str)

    # 1. Strip leading/trailing whitespace.
    normalized = normalized.str.strip()

    # 2. Replace consecutive whitespace characters with a single space.
    normalized = normalized.str.replace(r'\s+', ' ', regex=True)

    # 3. Remove ASCII control characters (U+0000–U+001F, U+007F–U+009F),
    #    preserving tab (\t, \x09) and newline (\n, \x0A).
    control_char_regex = r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]'
    normalized = normalized.str.replace(control_char_regex, '', regex=True)

    # 4. Normalize Unicode to NFC form for consistent representation.
    #    .apply() is used as there's no direct vectorized method in pandas.
    normalized = normalized.apply(lambda x: unicodedata.normalize('NFC', x) if isinstance(x, str) else x)

    # Return the fully normalized series.
    return normalized

# ------------------------------------------------------------------------------
# Task 3, Step 3 Helper: Validate and clean auxiliary fields
# ------------------------------------------------------------------------------
def _normalize_domain_tags(tag_data: Union[None, str, List[str]]) -> Optional[List[str]]:
    """
    Normalizes a single entry from the 'domain_tags' column.

    Handles JSON strings, lists, and nulls, returning a list of lowercase strings
    or None if the input is invalid or results in an empty list.

    Args:
        tag_data: The input data from a single cell in the 'domain_tags' column.

    Returns:
        A list of lowercase strings or None.
    """
    # If the input is already a list, process it.
    if isinstance(tag_data, list):
        # Create a new list containing only lowercase string elements.
        processed_tags = [str(tag).lower() for tag in tag_data if isinstance(tag, (str, int, float))]
        # Return the list if it's not empty, otherwise return None.
        return processed_tags if processed_tags else None

    # If the input is a string, attempt to parse it as JSON.
    if isinstance(tag_data, str):
        try:
            # Decode the JSON string into a Python object.
            data = json.loads(tag_data)
            # If the decoded object is a list, recursively call this function.
            if isinstance(data, list):
                return _normalize_domain_tags(data)
        except json.JSONDecodeError:
            # If JSON parsing fails, the string is considered invalid tag data.
            return None

    # For any other type (e.g., None, NaN, int), return None.
    return None

# ------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# ------------------------------------------------------------------------------
def cleanse_persona_hub_df(
    raw_df_path: str,
    cleansed_df_path: str,
    config: Dict[str, Any],
    chunksize: int = 1_000_000
) -> Dict[str, Any]:
    """
    Cleanses the raw persona hub dataset and saves it to a new Parquet file.

    This function processes the raw dataset in a streaming fashion to handle its
    large size efficiently. The cleansing pipeline includes:
    1.  Normalizing the 'description' text (whitespace, Unicode, control chars).
    2.  Filtering out records with empty descriptions, disallowed language codes,
        or invalid token counts.
    3.  Normalizing the 'domain_tags' field into a consistent list format.
    4.  Re-computing the 'sha256_description_hash' from the cleansed description.
    5.  Setting the unique 'id' column as the DataFrame index and preserving it
        in the output Parquet file for efficient downstream lookups.

    Args:
        raw_df_path: Path to the raw input Parquet file.
        cleansed_df_path: Path to save the cleansed output Parquet file.
        config: The study configuration dictionary.
        chunksize: The number of rows to process in each memory-efficient chunk.

    Returns:
        A dictionary containing a detailed report of the cleansing process.
    """
    # Log the start of the cleansing process.
    logger.info(f"Starting cleansing of '{raw_df_path}'. Output will be saved to '{cleansed_df_path}'.")

    # Initialize a report dictionary to track statistics across all chunks.
    report = {
        'rows_read': 0,
        'rows_written': 0,
        'rows_dropped': {
            'empty_description': 0,
            'disallowed_language': 0,
            'invalid_token_count': 0,
        }
    }

    # Retrieve the language allowlist from the configuration for filtering.
    language_allowlist = set(
        config["phase_1_parameters"]["filtering"]["keyword_filtering"]["language_allowlist"]
    )

    # Open the raw Parquet file for reading.
    parquet_file = pq.ParquetFile(raw_df_path)
    # Initialize the Parquet writer, which will be created with the schema of the first valid chunk.
    writer: Optional[pq.ParquetWriter] = None

    try:
        # Create an iterator to read the file in batches (chunks).
        batch_iterator = parquet_file.iter_batches(batch_size=chunksize, use_pandas_metadata=True)

        # Process each chunk from the iterator.
        for i, chunk_batch in enumerate(batch_iterator):
            # Convert the Arrow Batch to a pandas DataFrame for processing.
            chunk_df = chunk_batch.to_pandas()
            # Get the number of rows in the current chunk.
            rows_in_chunk = len(chunk_df)
            # Update the total number of rows read from the source file.
            report['rows_read'] += rows_in_chunk

            # --- Step 1: Text Normalization ---
            # Apply the normalization function to the 'description' column.
            chunk_df['description'] = _normalize_descriptions(chunk_df['description'])

            # --- Step 3 (Filtering Part): Create validity masks ---
            # Mask 1: Identify rows with empty descriptions after normalization.
            mask_empty_desc = chunk_df['description'].str.strip() == ''
            # Update the count of rows dropped for this reason.
            report['rows_dropped']['empty_description'] += mask_empty_desc.sum()

            # Mask 2: Identify rows with language codes not in the allowlist.
            mask_lang = ~chunk_df['language_code'].isin(language_allowlist)
            # Update the count of rows dropped for this reason.
            report['rows_dropped']['disallowed_language'] += mask_lang.sum()

            # Mask 3: Identify rows with invalid token counts (e.g., less than 1).
            mask_token = chunk_df['token_count'] < 1
            # Update the count of rows dropped for this reason.
            report['rows_dropped']['invalid_token_count'] += mask_token.sum()

            # Combine all filtering criteria into a single validity mask.
            # A row is kept if it is NOT in any of the drop masks.
            valid_mask = ~(mask_empty_desc | mask_lang | mask_token)

            # Apply the mask to filter the chunk, creating a new DataFrame with only valid rows.
            cleansed_chunk = chunk_df[valid_mask].copy()

            # If no rows in this chunk are valid, skip to the next chunk.
            if cleansed_chunk.empty:
                logger.info(f"Chunk {i+1}: No valid rows after filtering.")
                continue

            # --- Step 2 & 3 (Transformation Part) ---
            # Cast the 'id' column to the pandas 'string' dtype for consistency.
            cleansed_chunk['id'] = cleansed_chunk['id'].astype("string")

            # Re-compute the SHA-256 hash based on the *normalized* description text.
            cleansed_chunk['sha256_description_hash'] = cleansed_chunk['description'].apply(
                lambda x: hashlib.sha256(x.encode('utf-8')).hexdigest()
            )

            # Normalize the 'domain_tags' column using the dedicated helper function.
            cleansed_chunk['domain_tags'] = cleansed_chunk['domain_tags'].apply(_normalize_domain_tags)

            # --- CORRECTED LOGIC: Set 'id' as index before writing ---
            # Set the 'id' column as the DataFrame index. This is the primary key.
            # `verify_integrity=True` ensures there are no duplicate IDs within this chunk.
            cleansed_chunk.set_index('id', inplace=True, verify_integrity=True)

            # --- Streaming Write ---
            # Convert the cleansed pandas DataFrame (with its index) to an Arrow Table.
            # `preserve_index=True` is the critical correction to include the 'id' index in the output file.
            table = pa.Table.from_pandas(cleansed_chunk, preserve_index=True)

            # If this is the first chunk being written, initialize the ParquetWriter with the table's schema.
            if writer is None:
                # The schema will now correctly include the index.
                writer = pq.ParquetWriter(cleansed_df_path, table.schema)

            # Write the current cleansed chunk (as an Arrow Table) to the output Parquet file.
            writer.write_table(table)

            # Update the total count of rows successfully written to the new file.
            report['rows_written'] += len(cleansed_chunk)
            # Log progress for the current chunk.
            logger.info(f"Processed chunk {i+1}: Read {rows_in_chunk}, Wrote {len(cleansed_chunk)}")

    finally:
        # This block ensures that the writer is always closed properly, finalizing the file.
        if writer:
            # Close the Parquet writer to complete the file writing process.
            writer.close()
            logger.info(f"Parquet writer closed. Cleansed file saved to '{cleansed_df_path}'.")

    # --- Final Report ---
    # Calculate the total number of rows dropped across all criteria.
    total_dropped = sum(report['rows_dropped'].values())
    # Calculate the overall rejection rate.
    rejection_rate = 1 - (report['rows_written'] / report['rows_read']) if report['rows_read'] > 0 else 0
    # Add the rejection rate to the report, formatted as a percentage.
    report['rejection_rate'] = f"{rejection_rate:.4%}"

    # Log the final summary of the entire cleansing process.
    logger.info(f"Cleansing complete. Total rows read: {report['rows_read']}.")
    logger.info(f"Total rows written: {report['rows_written']}. Total rows dropped: {total_dropped}.")

    # Return the final, comprehensive report.
    return report


In [None]:
# Task 4 — Cleanse contextual_data_df, human panels, and realized outcomes

# ==============================================================================
# Task 4: Cleanse contextual_data_df, human panels, and realized outcomes
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 4, Step 1 Helper: Clean contextual_data_df text and dates
# ------------------------------------------------------------------------------
def _cleanse_contextual_df(
    contextual_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Cleanses the contextual_data_df by normalizing text, parsing dates,
    and re-computing token counts.

    Args:
        contextual_data_df: The raw (but validated) contextual data DataFrame.
        config: The study configuration dictionary.

    Returns:
        A cleansed copy of the contextual data DataFrame.
    """
    # Create a deep copy to avoid modifying the original DataFrame.
    df = contextual_data_df.copy()

    # --- Text Normalization (ecb_communication_text) ---
    # Define the regex for control characters, preserving tab and newline.
    control_char_regex = r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]'
    # Apply a sequence of cleansing operations.
    df['ecb_communication_text'] = (
        df['ecb_communication_text']
        .str.strip()
        .str.replace(r'\s+', ' ', regex=True)
        .str.replace(control_char_regex, '', regex=True)
        .apply(lambda x: unicodedata.normalize('NFC', x))
    )

    # --- Date Parsing ---
    # Convert date columns to datetime objects, assuming European date format.
    df['survey_date'] = pd.to_datetime(df['survey_date'], dayfirst=True, errors='coerce')
    df['ecb_meeting_date'] = pd.to_datetime(df['ecb_meeting_date'], dayfirst=True, errors='coerce')
    # Post-parsing validation for temporal consistency.
    if (df['ecb_meeting_date'] > df['survey_date']).any():
        raise ValueError("Cleansing failed: Found 'ecb_meeting_date' after 'survey_date'.")

    # --- Token Count Recomputation ---
    # Re-compute token counts to ensure accuracy for the specified model.
    try:
        # Get the model name from the config.
        model_name = config['phase_2_parameters']['forecasting_llm']['model_name']
        # Get the tokenizer for the specified model.
        encoding = tiktoken.encoding_for_model(model_name)
    except Exception as e:
        logger.error(f"Could not get tiktoken encoding for model '{model_name}'. Error: {e}")
        raise

    # Apply the tokenizer to re-calculate the token count for each communication text.
    df['ecb_communication_token_count'] = df['ecb_communication_text'].apply(
        lambda x: len(encoding.encode(x))
    )

    # Return the cleansed DataFrame.
    return df

# ------------------------------------------------------------------------------
# Task 4, Step 2 Helper: Normalize nested dicts and compute target_year
# ------------------------------------------------------------------------------
def _normalize_nested_dicts_and_compute_target_year(
    contextual_df: pd.DataFrame,
    benchmark_df: pd.DataFrame,
    micro_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Normalizes nested dictionaries in contextual_df and computes the canonical
    'target_year' for human survey DataFrames.

    Args:
        contextual_df: The cleansed contextual data DataFrame.
        benchmark_df: The raw benchmark survey DataFrame.
        micro_df: The raw micro survey DataFrame.
        config: The study configuration dictionary.

    Returns:
        A tuple containing the cleansed versions of (contextual_df,
        benchmark_df, micro_df).
    """
    # Create copies to avoid side effects.
    contextual_df_c = contextual_df.copy()
    benchmark_df_c = benchmark_df.copy()
    micro_df_c = micro_df.copy()

    # --- Normalize Nested Dictionaries in contextual_df ---
    # Get the HICPX availability start round from the config.
    hicpx_start_round = config['phase_2_parameters']['oos_and_availability']['hicpx_available_from_round']

    # Define a function to apply to each row for normalization.
    def normalize_row_dicts(row):
        # Standardize 'latest_realized_data' for HICPX before its availability.
        if row['survey_round'] < hicpx_start_round:
            row['latest_realized_data']['hicpx'] = {'period': None, 'value': np.nan}
        # Ensure keys in 'previous_spf_medians' are integers.
        for var in row['previous_spf_medians']:
            row['previous_spf_medians'][var] = {
                int(k): v for k, v in row['previous_spf_medians'][var].items()
            }
        return row

    # Apply the normalization function row-wise.
    contextual_df_c = contextual_df_c.apply(normalize_row_dicts, axis=1)

    # --- Compute 'target_year' for Human Survey DataFrames ---
    # Create a mapping from survey round to the long-term target year.
    lt_year_map = contextual_df_c.set_index('survey_round')['lt_year']

    # Define a reusable function to compute 'target_year'.
    def compute_target_year(df: pd.DataFrame) -> pd.DataFrame:
        # Extract the base year from the 'round' string.
        df['base_year'] = df['round'].str[:4].astype(int)
        # Define the offset for standard horizons.
        horizon_offset_map = {'CY': 0, 'CY+1': 1, 'CY+2': 2}
        # Apply the offset to calculate the target year for standard horizons.
        df['target_year'] = df['base_year'] + df['horizon'].map(horizon_offset_map)

        # Identify rows with the 'LT' horizon.
        lt_mask = df['horizon'] == 'LT'
        # Use the pre-built map to fill in the target year for 'LT' horizons.
        df.loc[lt_mask, 'target_year'] = df.loc[lt_mask, 'round'].map(lt_year_map)

        # Convert the final 'target_year' to a nullable integer type and drop helper column.
        df['target_year'] = df['target_year'].astype('Int64')
        df = df.drop(columns=['base_year'])
        return df

    # Apply the function to both human survey DataFrames.
    benchmark_df_c = compute_target_year(benchmark_df_c)
    micro_df_c = compute_target_year(micro_df_c)

    # Return the three cleansed DataFrames.
    return contextual_df_c, benchmark_df_c, micro_df_c

# ------------------------------------------------------------------------------
# Task 4, Step 3 Helper: Clean realized_macro_data_df
# ------------------------------------------------------------------------------
def _cleanse_realized_df(realized_macro_data_df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleanses the realized_macro_data_df by deduplicating and validating
    aggregation methods.

    Args:
        realized_macro_data_df: The raw realized outcomes DataFrame.

    Returns:
        A cleansed copy of the realized outcomes DataFrame.
    """
    # Create a copy to avoid modifying the original.
    df = realized_macro_data_df.copy()

    # --- Deduplication ---
    # Check for and count duplicates based on the composite key.
    duplicates = df.duplicated(subset=['reference_year', 'variable'])
    if duplicates.any():
        logger.warning(f"Found and dropped {duplicates.sum()} duplicate rows in realized_macro_data_df.")
        # Drop duplicates, keeping the first occurrence.
        df = df.drop_duplicates(subset=['reference_year', 'variable'], keep='first')

    # --- Aggregation Method Validation ---
    # Define the canonical aggregation methods required by the study.
    EXPECTED_AGGREGATION_METHODS = {
        'HICP': 'simple average of 12 monthly YoY rates',
        'HICPX': 'simple average of 12 monthly YoY rates',
        'RGDP': 'annual growth percent',
        'UNR': 'simple average of 12 monthly rates'
    }
    # Create a mapping from variable to its expected method.
    expected_method_series = df['variable'].map(EXPECTED_AGGREGATION_METHODS)

    # Normalize the actual method strings for robust comparison.
    actual_method_series = df['aggregation_method'].str.strip().str.lower()
    # Check for mismatches.
    mismatches = actual_method_series != expected_method_series.str.lower()
    if mismatches.any():
        mismatch_details = df[mismatches][['reference_year', 'variable', 'aggregation_method']]
        raise ValueError(f"Found mismatched aggregation methods:\n{mismatch_details}")

    # Return the cleansed DataFrame.
    return df

# ------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# ------------------------------------------------------------------------------
def cleanse_analytical_inputs(
    contextual_data_df: pd.DataFrame,
    human_survey_predictions_benchmark_df: pd.DataFrame,
    human_survey_micro_df: pd.DataFrame,
    realized_macro_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates the cleansing of all analytical input DataFrames.

    This function applies a series of cleansing and transformation steps to
    prepare the core analytical datasets for the forecasting pipeline. It ensures
    data types are correct, text and dates are normalized, nested structures
    are standardized, and critical relational keys like 'target_year' are
    computed consistently.

    Args:
        contextual_data_df: The raw (but validated) contextual data DataFrame.
        human_survey_predictions_benchmark_df: The raw benchmark survey DataFrame.
        human_survey_micro_df: The raw micro survey DataFrame.
        realized_macro_data_df: The raw realized outcomes DataFrame.
        config: The study configuration dictionary.

    Returns:
        A tuple containing the four cleansed and prepared DataFrames:
        (contextual_df, benchmark_df, micro_df, realized_df).
    """
    # Log the start of the analytical data cleansing process.
    logger.info("Starting cleansing of analytical input DataFrames...")

    # --- Step 1: Clean contextual_data_df ---
    # This step is performed first as its outputs are needed by Step 2.
    logger.info("Cleansing 'contextual_data_df'...")
    cleansed_contextual_df = _cleanse_contextual_df(contextual_data_df, config)

    # --- Step 2: Normalize dicts and compute target_year ---
    # This step uses the cleansed contextual data to process the human survey data.
    logger.info("Normalizing nested data and computing 'target_year' for human surveys...")
    cleansed_contextual_df, cleansed_benchmark_df, cleansed_micro_df = \
        _normalize_nested_dicts_and_compute_target_year(
            cleansed_contextual_df,
            human_survey_predictions_benchmark_df,
            human_survey_micro_df,
            config
        )

    # --- Step 3: Clean realized_macro_data_df ---
    # This step is independent of the others.
    logger.info("Cleansing 'realized_macro_data_df'...")
    cleansed_realized_df = _cleanse_realized_df(realized_macro_data_df)

    # Log the successful completion of the process.
    logger.info("Cleansing of all analytical inputs completed successfully.")

    # Return the tuple of cleansed DataFrames.
    return (
        cleansed_contextual_df,
        cleansed_benchmark_df,
        cleansed_micro_df,
        cleansed_realized_df
    )


In [None]:
# Task 5 — Keyword and domain pre-screen (persona filter step 1)

# ==============================================================================
# Task 5: Keyword and domain pre-screen (persona filter step 1)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 5, Step 1 Helper: Count lexicon hits in descriptions
# ------------------------------------------------------------------------------
def _compute_keyword_filter_flags(
    description_series: pd.Series,
    lexicon: List[str],
    min_token_count: int
) -> Tuple[pd.Series, pd.Series]:
    """
    Computes keyword hit counts and filter flags for a series of descriptions.

    This function uses a single compiled regex to efficiently find all occurrences
    of lexicon phrases within the descriptions. It counts the number of *unique*
    phrases found in each description and creates a boolean flag indicating
    if the count meets the minimum threshold.

    Args:
        description_series: A pandas Series of cleansed persona descriptions.
        lexicon: A list of keyword phrases to search for.
        min_token_count: The minimum number of unique lexicon phrases required to pass.

    Returns:
        A tuple containing:
        - A pandas Series of integers (keyword_hit_count).
        - A pandas Series of booleans (keyword_pass).
    """
    # Input validation: Ensure lexicon is not empty.
    if not lexicon:
        raise ValueError("Lexicon cannot be empty for keyword filtering.")

    # Prepare the descriptions for matching by converting to lowercase.
    descriptions_lower = description_series.str.lower()

    # Escape special regex characters in lexicon terms and join them with the OR operator '|'.
    # Use word boundaries (\b) to ensure whole-phrase matching.
    # This creates a single, highly efficient regex pattern.
    regex_pattern = r'\b(' + '|'.join(re.escape(term) for term in lexicon) + r')\b'
    # Compile the regex for performance.
    compiled_regex = re.compile(regex_pattern)

    # Apply the regex to find all matches in each description.
    # The result is a Series of lists, where each list contains all phrases found.
    matches = descriptions_lower.str.findall(compiled_regex)

    # For each list of matches, count the number of unique phrases.
    # Equation: c_i = |{unique phrases found in description_i}|
    keyword_hit_count = matches.apply(lambda found_list: len(set(found_list)))

    # Create a boolean flag based on the minimum token count threshold.
    # Equation: I_i^kw = 1{c_i >= min_token_count}
    keyword_pass = keyword_hit_count >= min_token_count

    # Return the hit counts and the pass flags.
    return keyword_hit_count, keyword_pass

# ------------------------------------------------------------------------------
# Task 5, Step 2 Helper: Apply domain_tags filter
# ------------------------------------------------------------------------------
def _compute_domain_filter_flags(
    domain_tags_series: pd.Series,
    allowed_domains: Set[str]
) -> pd.Series:
    """
    Computes domain filter flags based on set intersection.

    Args:
        domain_tags_series: A pandas Series where each element is a list of
                            domain tags or None.
        allowed_domains: A set of allowed domain strings.

    Returns:
        A pandas Series of booleans (domain_pass).
    """
    # Define a helper function to apply to each row.
    def check_intersection(tags: Optional[List[str]]) -> bool:
        # If the tags are not a list (e.g., None), it fails the check.
        if not isinstance(tags, list):
            return False
        # Check if the intersection between the persona's tags and allowed domains is non-empty.
        # Equation: I_i^dom = 1{domain_tags_i ∩ Dom_allow ≠ ∅}
        return not allowed_domains.isdisjoint(tags)

    # Apply the function to the Series to generate the boolean flags.
    domain_pass = domain_tags_series.apply(check_intersection)

    # Return the pass flags.
    return domain_pass

# ------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# ------------------------------------------------------------------------------
def apply_keyword_domain_filter(
    cleansed_df_path: str,
    filtered_df_path: str,
    config: Dict[str, Any],
    chunksize: int = 1_000_000
) -> Dict[str, Any]:
    """
    Applies keyword and domain filters to the cleansed persona dataset.

    This function reads the cleansed persona data in chunks, applies two parallel
    filters, and writes the records that pass both filters to a new Parquet file.
    1. Keyword Filter: Checks if a description contains at least a minimum number
       of unique phrases from a predefined lexicon.
    2. Domain Filter: Checks if the persona's domain tags intersect with a set
       of allowed domains.

    Args:
        cleansed_df_path: Path to the cleansed input Parquet file.
        filtered_df_path: Path to save the filtered output Parquet file.
        config: The study configuration dictionary.
        chunksize: The number of rows to process in each chunk.

    Returns:
        A dictionary containing a report of the filtering process.
    """
    # Log the start of the filtering process.
    logger.info(f"Starting keyword and domain filtering for '{cleansed_df_path}'.")

    # --- Configuration Extraction ---
    # Extract relevant parameters from the config dictionary.
    filter_config = config['phase_1_parameters']['filtering']['keyword_filtering']
    lexicon = filter_config['lexicon']
    min_token_count = filter_config['min_token_count']
    # Define the set of allowed domains for the domain filter.
    allowed_domains = frozenset({"economics", "finance", "banking", "monetary", "central bank"})

    # --- Initialization ---
    # Initialize a report to track statistics across all chunks.
    report = {'rows_read': 0, 'rows_written': 0, 'passed_keyword_only': 0, 'passed_domain_only': 0}
    # Open the input Parquet file for chunked reading.
    parquet_file = pq.ParquetFile(cleansed_df_path)
    # Initialize the Parquet writer for streaming output.
    writer = None

    try:
        # Iterate over the input file in batches (chunks).
        for i, chunk in enumerate(parquet_file.iter_batches(batch_size=chunksize, use_pandas_metadata=True)):
            # Convert the Arrow Batch to a pandas DataFrame.
            chunk_df = chunk.to_pandas()
            # Update the total number of rows read.
            report['rows_read'] += len(chunk_df)

            # --- Step 1: Compute Keyword Filter Flags ---
            # Call the helper to get hit counts and pass flags for the keyword filter.
            keyword_hit_count, keyword_pass = _compute_keyword_filter_flags(
                chunk_df['description'], lexicon, min_token_count
            )

            # --- Step 2: Compute Domain Filter Flags ---
            # Call the helper to get pass flags for the domain filter.
            domain_pass = _compute_domain_filter_flags(chunk_df['domain_tags'], allowed_domains)

            # --- Step 3: Combine Filters and Reduce Dataset ---
            # Combine the two boolean masks using a logical AND.
            # Equation: pass_{1,i} = I_i^kw · I_i^dom
            final_pass_mask = keyword_pass & domain_pass

            # Create the filtered DataFrame containing only rows that passed both checks.
            filtered_chunk = chunk_df[final_pass_mask].copy()

            # Add new audit columns to the filtered chunk for provenance.
            filtered_chunk['keyword_hit_count'] = keyword_hit_count[final_pass_mask]

            # Update report statistics.
            report['passed_keyword_only'] += keyword_pass.sum()
            report['passed_domain_only'] += domain_pass.sum()
            report['rows_written'] += len(filtered_chunk)

            # If there are no rows that passed, skip to the next chunk.
            if filtered_chunk.empty:
                logger.info(f"Processed chunk {i+1}: No rows passed the filter.")
                continue

            # --- Streaming Write ---
            # Convert the filtered pandas DataFrame to an Arrow Table.
            table = pa.Table.from_pandas(filtered_chunk, preserve_index=True)

            # Initialize the Parquet writer with the schema from the first non-empty table.
            if writer is None:
                writer = pq.ParquetWriter(filtered_df_path, table.schema)

            # Write the current filtered table to the output file.
            writer.write_table(table)

            # Log progress.
            logger.info(f"Processed chunk {i+1}: Read {len(chunk_df)}, Wrote {len(filtered_chunk)}")

    finally:
        # Ensure the writer is closed to finalize the file.
        if writer:
            writer.close()

    # --- Final Report ---
    # Calculate the final rejection rate.
    rejection_rate = 1 - (report['rows_written'] / report['rows_read']) if report['rows_read'] > 0 else 0
    report['rejection_rate'] = f"{rejection_rate:.4%}"

    # Log the final summary of the filtering process.
    logger.info("Keyword and domain filtering complete.")
    logger.info(f"Total rows read: {report['rows_read']}")
    logger.info(f"Total rows written: {report['rows_written']} ({report['rejection_rate']} rejection rate)")

    # Return the final report.
    return report


In [None]:
# Task 6 — NER PERSON exclusion (persona filter step 2)

# ==============================================================================
# Task 6: NER PERSON exclusion (persona filter step 2)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 6, Step 1 Helper: Load and configure NER model
# ------------------------------------------------------------------------------
def _load_ner_model(model_name: str) -> Language:
    """
    Loads and configures the spaCy NER model for efficient processing.

    This function loads the specified spaCy model and disables all unnecessary
    pipeline components (e.g., parser, lemmatizer) to optimize for speed and
    memory usage, as only the Named Entity Recognizer (NER) is required.

    Args:
        model_name: The name of the spaCy model to load (e.g., 'en_core_web_trf').

    Returns:
        The loaded and configured spaCy Language object.

    Raises:
        IOError: If the specified spaCy model is not found.
        ValueError: If the model name is not provided.
    """
    # Input validation: Ensure a model name is provided.
    if not model_name:
        raise ValueError("A spaCy model name must be provided.")

    # Log the model loading attempt.
    logger.info(f"Loading spaCy NER model: '{model_name}'...")

    try:
        # Define components to disable for performance optimization.
        # We only need the 'ner' component and its dependencies (e.g., 'tok2vec').
        disable_pipes = ["parser", "lemmatizer", "tagger", "attribute_ruler"]
        # Load the spaCy model with unnecessary components disabled.
        nlp = spacy.load(model_name, disable=disable_pipes)
        # Log successful loading.
        logger.info(f"Successfully loaded model '{model_name}'.")
        # Return the loaded model object.
        return nlp
    except OSError:
        # If the model is not found, provide a helpful error message.
        error_msg = (
            f"spaCy model '{model_name}' not found. Please download it by running:\n"
            f"python -m spacy download {model_name}"
        )
        logger.error(error_msg)
        # Raise an IOError to indicate a file/resource-related problem.
        raise IOError(error_msg)

# ------------------------------------------------------------------------------
# Task 6, Step 2 Helper: Apply NER and construct exclusion indicator
# ------------------------------------------------------------------------------
def _generate_ner_pass_flags(
    df: pd.DataFrame,
    nlp: Language,
    batch_size: int = 256
) -> pd.Series:
    """
    Applies the NER model to descriptions and generates pass/fail flags.

    This function processes the 'description' column in batches using spaCy's
    efficient `nlp.pipe()` method. It creates a boolean flag for each persona,
    marking it as `False` (fail) if any entity with the label 'PERSON' is
    detected, and `True` (pass) otherwise.

    Args:
        df: The input DataFrame containing the 'description' column.
        nlp: The loaded spaCy Language object.
        batch_size: The number of documents to process in each batch.

    Returns:
        A pandas Series of booleans ('ner_pass') aligned with the input DataFrame's index.
    """
    # Log the start of the NER processing.
    logger.info(f"Applying NER to {len(df)} descriptions to detect 'PERSON' entities...")

    # Create a boolean Series to store the results, initialized to False.
    ner_pass = pd.Series(False, index=df.index, dtype=bool)

    # Extract the descriptions and their corresponding IDs (index) for processing.
    # Using .values is faster for iteration than accessing the Series directly.
    texts = df['description'].values
    ids = df.index.values

    # Process the texts in batches using nlp.pipe for high efficiency.
    # The `as_tuples=True` argument allows us to pass context (the ID) along with the text.
    # A progress bar is added for better user experience.
    doc_iterator = nlp.pipe(zip(texts, ids), as_tuples=True, batch_size=batch_size)

    # Iterate through the processed documents and their context (IDs).
    for doc, doc_id in tqdm(doc_iterator, total=len(df), desc="NER Processing"):
        # Assume the persona passes until a 'PERSON' entity is found.
        passes_check = True
        # Iterate through the entities detected in the document.
        for ent in doc.ents:
            # If an entity with the label 'PERSON' is found...
            # Equation: Check if ∃ e ∈ E_i such that label(e) = 'PERSON'
            if ent.label_ == 'PERSON':
                # Mark the persona as failing the check and break the inner loop.
                passes_check = False
                break
        # Store the final pass/fail result for this document's ID.
        # Equation: I_i^NER = 1{no 'PERSON' entity found}
        ner_pass.loc[doc_id] = passes_check

    # Log the completion of the NER processing.
    logger.info("NER processing complete.")
    # Return the final boolean Series of pass/fail flags.
    return ner_pass

# ------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# ------------------------------------------------------------------------------
def apply_ner_person_filter(
    persona_step1_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Applies a Named Entity Recognition (NER) filter to exclude personas
    mentioning specific individuals.

    This function orchestrates the second step of the persona filtering pipeline.
    It loads a spaCy NER model, processes all persona descriptions to detect
    entities labeled as 'PERSON', and filters out any personas where such
    an entity is found.

    Args:
        persona_step1_df: The DataFrame of personas that passed the initial
                          keyword and domain filters.
        config: The study configuration dictionary, containing the NER model name.

    Returns:
        A tuple containing:
        - persona_step2_df (pd.DataFrame): The filtered DataFrame containing only
          personas that passed the NER check.
        - report (Dict[str, Any]): A dictionary summarizing the filtering process.
    """
    # Log the start of Task 6.
    logger.info("Starting Task 6: NER PERSON exclusion filter.")

    # --- Input Validation ---
    # Ensure the input DataFrame is not empty.
    if persona_step1_df.empty:
        logger.warning("Input DataFrame for NER filtering is empty. Skipping.")
        report = {'rows_read': 0, 'rows_written': 0, 'rejection_rate': 'N/A'}
        return pd.DataFrame(), report

    # --- Step 1: Load and Configure NER Model ---
    # Extract the model name from the configuration.
    model_name = config['phase_1_parameters']['filtering']['ner_filtering']['model_name']
    # Load the configured spaCy model using the helper function.
    nlp_model = _load_ner_model(model_name)

    # --- Step 2: Apply NER and Construct Exclusion Indicator ---
    # Generate the pass/fail flags for each persona using the NER model.
    ner_pass_flags = _generate_ner_pass_flags(persona_step1_df, nlp_model)

    # --- Step 3: Persist NER Flags and Reduce Dataset ---
    # Add the 'ner_pass' column to the DataFrame for audit purposes.
    df_with_flags = persona_step1_df.copy()
    df_with_flags['ner_pass'] = ner_pass_flags

    # Filter the DataFrame to keep only the rows that passed the NER check.
    persona_step2_df = df_with_flags[df_with_flags['ner_pass']].copy()

    # Drop the temporary flag column from the final output for cleanliness.
    persona_step2_df = persona_step2_df.drop(columns=['ner_pass'])

    # --- Report Generation ---
    # Compile a report summarizing the results of the filtering step.
    rows_read = len(persona_step1_df)
    rows_written = len(persona_step2_df)
    rejection_rate = 1 - (rows_written / rows_read) if rows_read > 0 else 0

    report = {
        'rows_read': rows_read,
        'rows_written': rows_written,
        'rows_rejected': rows_read - rows_written,
        'rejection_rate': f"{rejection_rate:.4%}"
    }

    # Log the final summary.
    logger.info("NER filtering complete.")
    logger.info(f"Rows read: {report['rows_read']}, Rows written: {report['rows_written']} ({report['rejection_rate']} rejection rate).")

    # Return the filtered DataFrame and the summary report.
    return persona_step2_df, report


In [None]:
# Task 7 — Embedding-based deduplication (persona filter step 3)

# ==============================================================================
# Task 7: Embedding-based deduplication (persona filter step 3)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 7, Step 1 Helper: Compute and normalize embeddings
# ------------------------------------------------------------------------------
def _generate_normalized_embeddings(
    descriptions: List[str],
    model_name: str,
    batch_size: int = 32
) -> np.ndarray:
    """
    Generates L2-normalized embeddings for a list of text descriptions.

    Args:
        descriptions: A list of strings to be encoded.
        model_name: The name of the sentence-transformer model to use.
        batch_size: The batch size for the encoding process.

    Returns:
        A numpy array of shape (n_descriptions, embedding_dim) containing
        the L2-normalized embeddings.
    """
    # Log the start of the embedding generation process.
    logger.info(f"Loading sentence-transformer model: '{model_name}'...")
    # Load the specified pre-trained sentence-transformer model.
    model = SentenceTransformer(model_name)

    # Log the start of the encoding process.
    logger.info(f"Generating embeddings for {len(descriptions)} descriptions...")
    # Generate embeddings for all descriptions in batches. A progress bar is shown.
    embeddings = model.encode(
        descriptions,
        batch_size=batch_size,
        show_progress_bar=True,
        normalize_embeddings=True # Use built-in normalization for efficiency
    )

    # The model.encode with normalize_embeddings=True already performs L2 normalization.
    # Equation: e_i <- e_i / ||e_i||_2
    # The output embeddings are already unit vectors.

    # Convert to float32 to save memory, which is sufficient for this task.
    embeddings = embeddings.astype(np.float32)
    logger.info(f"Generated and normalized embeddings of shape {embeddings.shape}.")

    # Return the final matrix of normalized embeddings.
    return embeddings

# ------------------------------------------------------------------------------
# Task 7, Step 2 Helper: Find similar pairs using HNSW
# ------------------------------------------------------------------------------
def _find_similar_pairs_hnsw(
    embeddings: np.ndarray,
    threshold: float,
    ef_construction: int = 200,
    M: int = 16,
    ef_search: int = 50
) -> List[Tuple[int, int]]:
    """
    Finds pairs of embeddings with cosine similarity above a threshold using HNSW.

    Args:
        embeddings: A numpy array of L2-normalized embeddings.
        threshold: The cosine similarity threshold (e.g., 0.90).
        ef_construction: HNSW build-time parameter.
        M: HNSW build-time parameter.
        ef_search: HNSW search-time parameter.

    Returns:
        A list of tuples, where each tuple contains the integer indices of a
        similar pair.
    """
    # Get the number of items and the embedding dimension.
    num_items, dim = embeddings.shape
    # Log the start of the index building process.
    logger.info(f"Building HNSW index for {num_items} items...")

    # Initialize the HNSW index.
    # 'ip' (inner product) is equivalent to cosine similarity for L2-normalized vectors.
    p = hnswlib.Index(space='ip', dim=dim)
    # Initialize the index structure.
    p.init_index(max_elements=num_items, ef_construction=ef_construction, M=M)
    # Add the embeddings to the index.
    p.add_items(embeddings, np.arange(num_items))
    # Set the search-time effort parameter. Higher is more accurate but slower.
    p.set_ef(ef_search)

    # Log the start of the similarity search.
    logger.info(f"Querying index to find pairs with similarity >= {threshold}...")
    # Initialize a set to store the found pairs to avoid duplicates.
    similar_pairs = set()

    # Query the index for each item to find its neighbors.
    for i in tqdm(range(num_items), desc="Finding Similar Pairs"):
        # Query for the k nearest neighbors. We set k to a reasonable number.
        # If a cluster is larger than k, we might miss some pairs, so k should be generous.
        k = 50
        labels, distances = p.knn_query(embeddings[i], k=k)

        # The 'distances' from hnswlib with 'ip' space are 1 - cosine_similarity.
        # So, cosine_similarity = 1 - distance.
        # We want pairs where cosine_similarity >= threshold.
        # This is equivalent to 1 - distance >= threshold, or distance <= 1 - threshold.
        for j_label, dist in zip(labels[0], distances[0]):
            # The query will always find the item itself (i) with distance 0. Skip it.
            if i == j_label:
                continue
            # If the similarity meets the threshold...
            if (1 - dist) >= threshold:
                # Add the pair to the set, ensuring order (min, max) to store each pair only once.
                pair = tuple(sorted((i, j_label)))
                similar_pairs.add(pair)

    # Log the number of unique similar pairs found.
    logger.info(f"Found {len(similar_pairs)} unique similar pairs.")
    # Return the pairs as a list.
    return list(similar_pairs)

# ------------------------------------------------------------------------------
# Task 7, Step 3 Helper: Cluster pairs and select representatives
# ------------------------------------------------------------------------------
def _get_deduplication_mask(
    num_items: int,
    item_ids: pd.Index,
    similar_pairs: List[Tuple[int, int]]
) -> pd.Series:
    """
    Identifies duplicate clusters from similar pairs and selects a representative for each.

    Args:
        num_items: The total number of items (personas).
        item_ids: The pandas Index of persona IDs, corresponding to the item indices.
        similar_pairs: A list of tuples representing similar pairs by integer index.

    Returns:
        A pandas Series of booleans (keep_mask) aligned with item_ids, where
        `True` indicates the item is a representative to be kept.
    """
    # Log the start of the clustering process.
    logger.info("Building graph and finding duplicate clusters...")
    # Create a graph from the list of similar pairs (edges).
    G = nx.Graph()
    G.add_edges_from(similar_pairs)

    # Find all connected components in the graph. Each component is a duplicate cluster.
    clusters = list(nx.connected_components(G))

    # Initialize a set to store the integer indices of the representatives to keep.
    representatives_to_keep = set()

    # Process clusters of size > 1 (i.e., actual duplicates).
    for cluster in clusters:
        if len(cluster) > 1:
            # Map the integer indices in the cluster back to their actual persona IDs.
            cluster_ids = [item_ids[i] for i in cluster]
            # Select the representative as the one with the lexicographically smallest ID.
            representative_id = min(cluster_ids)
            # Find the integer index corresponding to the chosen representative ID.
            representative_index = item_ids.get_loc(representative_id)
            # Add this index to the set of items to keep.
            representatives_to_keep.add(representative_index)

    # Process singletons (items not in any similar pair). They are unique and kept by default.
    all_nodes_in_clusters = set.union(*clusters) if clusters else set()
    singletons = set(range(num_items)) - all_nodes_in_clusters
    representatives_to_keep.update(singletons)

    # Create the final boolean mask.
    logger.info(f"Identified {len(representatives_to_keep)} unique representatives.")
    keep_mask = pd.Series(False, index=item_ids)
    # Get the IDs of the representatives.
    representative_ids = item_ids[list(representatives_to_keep)]
    # Set the mask to True for these IDs.
    keep_mask.loc[representative_ids] = True

    # Return the final mask.
    return keep_mask

# ------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# ------------------------------------------------------------------------------
def apply_embedding_deduplication(
    persona_step2_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Applies embedding-based semantic deduplication to the persona dataset.

    This function orchestrates the third step of the filtering pipeline:
    1.  Generates dense vector embeddings for each persona's description.
    2.  Uses an efficient Approximate Nearest Neighbor (ANN) search to find
        pairs of personas with high semantic similarity.
    3.  Clusters these pairs into groups of duplicates and selects a single,
        deterministic representative from each group.

    Args:
        persona_step2_df: The DataFrame of personas that passed the NER filter.
        config: The study configuration dictionary.

    Returns:
        A tuple containing:
        - persona_step3_df (pd.DataFrame): The deduplicated DataFrame.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of Task 7.
    logger.info("Starting Task 7: Embedding-based deduplication.")

    # --- Input Validation ---
    if persona_step2_df.empty:
        logger.warning("Input DataFrame for deduplication is empty. Skipping.")
        report = {'rows_read': 0, 'rows_written': 0, 'rejection_rate': 'N/A'}
        return pd.DataFrame(), report

    # --- Configuration Extraction ---
    dedup_config = config['phase_1_parameters']['filtering']['deduplication']
    model_name = dedup_config['embedding_model_name']
    threshold = dedup_config['cosine_similarity_threshold']

    # --- Step 1: Compute Embeddings ---
    # The descriptions must be in a list for the encoder.
    descriptions = persona_step2_df['description'].tolist()
    embeddings = _generate_normalized_embeddings(descriptions, model_name)

    # --- Step 2: Find Similar Pairs ---
    # Use the HNSW helper to find all pairs of indices with similarity >= threshold.
    similar_pairs = _find_similar_pairs_hnsw(embeddings, threshold)

    # --- Step 3: Cluster and Select Representatives ---
    # Get the boolean mask indicating which personas to keep.
    keep_mask = _get_deduplication_mask(
        num_items=len(persona_step2_df),
        item_ids=persona_step2_df.index,
        similar_pairs=similar_pairs
    )

    # Apply the mask to filter the DataFrame.
    persona_step3_df = persona_step2_df[keep_mask].copy()

    # --- Report Generation ---
    rows_read = len(persona_step2_df)
    rows_written = len(persona_step3_df)
    rejection_rate = 1 - (rows_written / rows_read) if rows_read > 0 else 0

    report = {
        'rows_read': rows_read,
        'rows_written': rows_written,
        'rows_rejected': rows_read - rows_written,
        'rejection_rate': f"{rejection_rate:.4%}",
        'num_similar_pairs_found': len(similar_pairs),
    }

    # Log the final summary.
    logger.info("Embedding-based deduplication complete.")
    logger.info(f"Rows read: {report['rows_read']}, Rows written: {report['rows_written']} ({report['rejection_rate']} rejection rate).")

    # Return the deduplicated DataFrame and the summary report.
    return persona_step3_df, report


In [None]:
# Task 8 — LLM-as-judge triage with majority vote (persona filter step 4)

# ==============================================================================
# Task 8: LLM-as-judge triage with majority vote (persona filter step 4)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 8, Step 1 & 2 Helper: Asynchronous API call and parsing
# ------------------------------------------------------------------------------
async def _fetch_and_parse_judgment(
    client: AsyncOpenAI,
    persona_id: str,
    description: str,
    run_number: int,
    system_prompt: str,
    model_name: str,
    temperature: float,
    max_retries: int,
    semaphore: asyncio.Semaphore
) -> Dict[str, Any]:
    """
    Asynchronously calls the LLM API to get a judgment for a single persona
    and run, with retry logic and robust parsing.

    Args:
        client: The asynchronous OpenAI API client.
        persona_id: The unique identifier for the persona.
        description: The persona description text.
        run_number: The judgment run number (1, 2, or 3).
        system_prompt: The verbatim system prompt for the judge.
        model_name: The name of the LLM to use (e.g., 'gpt-4o-mini').
        temperature: The sampling temperature for the LLM.
        max_retries: The maximum number of retries on API failure.
        semaphore: An asyncio.Semaphore to control concurrency.

    Returns:
        A dictionary containing the parsed judgment results for the run.
    """
    # Acquire the semaphore to limit concurrent requests.
    async with semaphore:
        # Loop for the specified number of retries.
        for attempt in range(max_retries + 1):
            try:
                # Make the asynchronous API call.
                response = await client.chat.completions.create(
                    model=model_name,
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": description},
                    ],
                    temperature=temperature,
                    response_format={"type": "json_object"},
                )
                # Extract the JSON content from the response.
                raw_json = response.choices[0].message.content
                # Parse the JSON string into a Python dictionary.
                parsed_data = json.loads(raw_json)

                # --- Schema Validation ---
                # Define the expected keys for the JSON response.
                expected_keys = {"euro_area_centrality", "monetary_policy_depth", "neutrality", "notes"}
                # Check if all expected keys are present.
                if not expected_keys.issubset(parsed_data.keys()):
                    raise ValueError("Response JSON is missing required keys.")

                # Extract pass/fail status for each criterion.
                # Convert "pass" to True, and anything else (e.g., "fail") to False.
                eu_pass = parsed_data.get("euro_area_centrality", "fail").lower() == "pass"
                mp_pass = parsed_data.get("monetary_policy_depth", "fail").lower() == "pass"
                neu_pass = parsed_data.get("neutrality", "fail").lower() == "pass"

                # Return a structured dictionary with the results.
                return {
                    "persona_id": persona_id,
                    "run_number": run_number,
                    "status": "SUCCESS",
                    "eu_centrality_pass": eu_pass,
                    "monetary_policy_depth_pass": mp_pass,
                    "neutrality_pass": neu_pass,
                    "raw_response": raw_json,
                }

            except (APIError, RateLimitError, json.JSONDecodeError, ValueError) as e:
                # Log the error with details about the attempt.
                logger.warning(f"Run {run_number} for persona {persona_id} failed on attempt {attempt + 1}: {e}")
                # If this was the last attempt, break the loop to return a failure.
                if attempt >= max_retries:
                    break
                # Wait before retrying.
                await asyncio.sleep(2.0) # Simple backoff

    # If all attempts fail, return a failure record.
    return {
        "persona_id": persona_id,
        "run_number": run_number,
        "status": "FAILURE",
        "eu_centrality_pass": False,
        "monetary_policy_depth_pass": False,
        "neutrality_pass": False,
        "raw_response": None,
    }

# ------------------------------------------------------------------------------
# Task 8, Step 3 Helper: Apply majority vote and filter
# ------------------------------------------------------------------------------
def _apply_majority_vote_and_filter(
    persona_df: pd.DataFrame,
    judgments_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Applies the majority vote rule to the judgments and filters the persona DataFrame.

    Args:
        persona_df: The DataFrame of candidate personas.
        judgments_df: A DataFrame containing the results of all judgment runs.

    Returns:
        A tuple containing:
        - The final, filtered DataFrame of personas (P*).
        - A dictionary with statistics from the voting process.
    """
    # Log the start of the majority vote process.
    logger.info("Applying majority vote rule to judgment results...")

    # Group the judgments by persona_id to aggregate the votes.
    # For each criterion, sum the boolean 'pass' flags (True=1, False=0).
    # Equation: vote_c(i) = Σ_{r=1 to 3} 1{run r returned pass for c}
    vote_counts = judgments_df.groupby("persona_id")[
        ["eu_centrality_pass", "monetary_policy_depth_pass", "neutrality_pass"]
    ].sum()

    # Apply the majority vote rule: a criterion passes if it received 2 or more 'pass' votes.
    # Equation: pass_c(i) = 1{vote_c(i) >= 2}
    vote_counts["eu_pass_maj"] = vote_counts["eu_centrality_pass"] >= 2
    vote_counts["mp_pass_maj"] = vote_counts["monetary_policy_depth_pass"] >= 2
    vote_counts["neu_pass_maj"] = vote_counts["neutrality_pass"] >= 2

    # Determine the final decision: a persona is kept only if it passes all three criteria.
    # Equation: keep_i = pass_EU(i) * pass_MP(i) * pass_NEU(i)
    vote_counts["final_pass"] = (
        vote_counts["eu_pass_maj"] &
        vote_counts["mp_pass_maj"] &
        vote_counts["neu_pass_maj"]
    )

    # Get the list of persona IDs that passed the final check.
    passed_ids = vote_counts[vote_counts["final_pass"]].index

    # Filter the original persona DataFrame to keep only the passing personas.
    final_persona_df = persona_df.loc[passed_ids].copy()

    # --- Generate Statistics ---
    # Calculate pass rates for each criterion and the final pass rate.
    stats = {
        "pass_rate_eu_centrality": vote_counts["eu_pass_maj"].mean(),
        "pass_rate_monetary_policy": vote_counts["mp_pass_maj"].mean(),
        "pass_rate_neutrality": vote_counts["neu_pass_maj"].mean(),
        "final_pass_rate": vote_counts["final_pass"].mean(),
        "final_count": len(final_persona_df),
    }

    # Return the final DataFrame and the statistics.
    return final_persona_df, stats

# ------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# ------------------------------------------------------------------------------
async def apply_llm_judge_filter(
    persona_step3_df: pd.DataFrame,
    config: Dict[str, Any],
    checkpoint_path: str
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the LLM-as-judge filtering pipeline.

    This function manages the entire process of using an LLM to evaluate personas:
    1.  Sets up an asynchronous API client and concurrency controls.
    2.  Executes three independent judgment calls for each persona.
    3.  Handles API errors, retries, and resumability via checkpointing.
    4.  Applies a majority vote rule to the collected judgments.
    5.  Filters the dataset to produce the final set of personas (P*).

    Args:
        persona_step3_df: The deduplicated DataFrame of candidate personas.
        config: The study configuration dictionary.
        checkpoint_path: Path to a file for saving/resuming raw judgment results.

    Returns:
        A tuple containing:
        - persona_final_df (pd.DataFrame): The final, filtered DataFrame.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the task.
    logger.info("Starting Task 8: LLM-as-judge triage.")

    # --- Configuration Extraction ---
    judge_config = config['phase_1_parameters']['llm_as_judge']
    api_config = judge_config['api_settings']
    model_name = judge_config['model_name']
    num_runs = judge_config['judging_runs']

    # --- System Prompt ---
    # This must be the exact prompt from the paper's Appendix A.
    system_prompt = """You are assessing expert personas for their suitability in euro-area monetary-policy research. Return one JSON object only-no additional text.
                    TASK
                    - Read the biography supplied by the user.
                    - Evaluate it against the three pass-fail criteria below.
                    - Provide a concise one-sentence reason for each decision.
                    CRITERIA
                    1. Euro-area centrality
                      Fail: Focus is non-EU or purely global with no euro-area anchor.
                      Pass: The euro area or an ECB institution is mentioned - this includes references to EU countries, central banks or Europe in general, - references to other contexts are allowed as long as the euro-area context is mentioned.
                    2. Monetary-policy depth
                      Fail: Monetary policy is not mentioned at all, or only mentioned in passing with none of the above signals present.
                      Pass: The biography engages substantively with monetary policy by satisfying at least one of:
                            - names an operational tool (e.g., deposit rate, APP/PEPP, LSAP),
                            - discusses a recognised policy rule or doctrine (e.g., Taylor rule, money - growth targeting, rules vs. discretion),
                            - analyses a transmission channel or macro outcome (inflation, output, employment, exchange rate, asset prices),
                            - references an empirical method used to evaluate policy (event study, VAR, DSGE, natural experiment).
                    3. Neutrality
                      Fail: The biography expresses opinion, advocacy or bias. Look for:
                            - Emotive or value-laden terms ("reckless", "dangerous", "unsustainable").
                            - Framing of personal advocacy or judgment ("skeptical of...", "a critic of...", "optimistic about...").
                            - Any implicit stance that goes beyond analysis.
                      Pass: Tone is descriptive, analytical, or exploratory, without any judgment, prescription, or stance.
                    OUTPUT SCHEMA
                    {
                      "euro_area_centrality": "pass" | "fail",
                      "monetary_policy_depth": "pass" | "fail",
                      "neutrality": "pass" | "fail",
                      "notes": {
                        "euro_area_centrality": "<one-sentence reason>",
                        "monetary_policy_depth": "<one-sentence reason>",
                        "neutrality": "<one-sentence reason>"
                      }
                    }"""

    # --- Resumability and Task Preparation ---
    # Load existing results from the checkpoint file if it exists.
    try:
        with open(checkpoint_path, 'r') as f:
            completed_results = [json.loads(line) for line in f]
        completed_tasks = {(res['persona_id'], res['run_number']) for res in completed_results}
        logger.info(f"Resuming from checkpoint. Found {len(completed_results)} completed judgments.")
    except FileNotFoundError:
        completed_results = []
        completed_tasks = set()
        logger.info("No checkpoint file found. Starting from scratch.")

    # --- Create Asynchronous Tasks ---
    # Initialize the async OpenAI client.
    client = AsyncOpenAI()
    # Create a semaphore to limit concurrency.
    semaphore = asyncio.Semaphore(config['phase_1_parameters']['orchestration']['max_concurrency'])
    # Prepare the list of asynchronous tasks to run.
    tasks: List[Coroutine] = []
    for persona_id, row in persona_step3_df.iterrows():
        for run in range(1, num_runs + 1):
            # If this task has already been completed, skip it.
            if (persona_id, run) in completed_tasks:
                continue
            # Create a coroutine for the API call.
            task = _fetch_and_parse_judgment(
                client, str(persona_id), row['description'], run, system_prompt,
                model_name, api_config['temperature'], api_config['max_retries'], semaphore
            )
            tasks.append(task)

    # --- Execute Tasks and Save Results ---
    # If there are tasks to run...
    if tasks:
        logger.info(f"Executing {len(tasks)} new judgment API calls...")
        # Open the checkpoint file in append mode.
        with open(checkpoint_path, 'a') as f:
            # Use tqdm_asyncio for a progress bar over the async tasks.
            for result_coro in tqdm_asyncio.as_completed(tasks, total=len(tasks), desc="LLM Judging"):
                # Await the result of the next completed task.
                result = await result_coro
                # Write the result as a new line in the JSONL checkpoint file.
                f.write(json.dumps(result) + '\n')
                # Add the completed task to our set to avoid re-running it if resumed.
                completed_tasks.add((result['persona_id'], result['run_number']))
        # Add the new results to our in-memory list.
        with open(checkpoint_path, 'r') as f:
             completed_results = [json.loads(line) for line in f]
    else:
        logger.info("All judgment tasks were already complete.")

    # Convert the full list of results to a DataFrame.
    judgments_df = pd.DataFrame(completed_results)

    # --- Step 3: Apply Majority Vote and Filter ---
    # Call the helper to perform the final filtering step.
    persona_final_df, vote_stats = _apply_majority_vote_and_filter(
        persona_step3_df, judgments_df
    )

    # --- Report Generation ---
    # Compile the final report.
    report = {
        'rows_read': len(persona_step3_df),
        'rows_written': len(persona_final_df),
        'rows_rejected': len(persona_step3_df) - len(persona_final_df),
        'rejection_rate': f"{1 - len(persona_final_df) / len(persona_step3_df):.4%}",
        'total_api_calls': len(judgments_df),
        'failed_api_calls': (judgments_df['status'] == 'FAILURE').sum(),
        'vote_statistics': vote_stats
    }

    # Check if the final count matches the paper's expected count.
    EXPECTED_FINAL_COUNT = 2368
    if report['vote_statistics']['final_count'] != EXPECTED_FINAL_COUNT:
        logger.warning(
            f"Final persona count is {report['vote_statistics']['final_count']}, "
            f"which does not match the paper's expected count of {EXPECTED_FINAL_COUNT}."
        )

    # Log the final summary.
    logger.info("LLM-as-judge filtering complete.")
    logger.info(f"Final persona count: {report['rows_written']}.")

    # Return the final DataFrame and the report.
    return persona_final_df, report


In [None]:
# Task 9 — Validate LLM-as-judge reliability (Cohen's kappa)

# ==============================================================================
# Task 9: Validate LLM-as-judge reliability (Cohen's kappa)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 9, Step 1 Helper: Sample personas and align judgments
# ------------------------------------------------------------------------------
def _prepare_kappa_validation_data(
    persona_step3_df: pd.DataFrame,
    all_judgments_df: pd.DataFrame,
    human_annotations_df: pd.DataFrame,
    sample_size: int,
    random_seed: int
) -> pd.DataFrame:
    """
    Samples personas and aligns judgments from the LLM and human annotators.

    Args:
        persona_step3_df: The deduplicated DataFrame of candidate personas.
        all_judgments_df: DataFrame with all LLM judgment runs.
        human_annotations_df: DataFrame with human annotations, in tidy format.
        sample_size: The number of personas to sample for validation.
        random_seed: A random seed for reproducible sampling.

    Returns:
        A DataFrame where each row is a persona and columns contain the
        judgments from each rater for each criterion.
    """
    # --- Sample Personas ---
    # Draw a reproducible random sample of persona IDs.
    logger.info(f"Sampling {sample_size} personas for kappa validation...")
    sample_ids = persona_step3_df.sample(n=sample_size, random_state=random_seed).index

    # --- Process LLM Judgments ---
    # Filter the full judgment set to only include the sampled personas.
    llm_judgments_sample = all_judgments_df[all_judgments_df['persona_id'].isin(sample_ids)]
    # Group by persona and calculate the majority vote for each criterion.
    llm_vote_counts = llm_judgments_sample.groupby("persona_id")[
        ["eu_centrality_pass", "monetary_policy_depth_pass", "neutrality_pass"]
    ].sum()
    # The LLM's final judgment is 'pass' (True) if it got >= 2 votes.
    llm_majority_judgments = (llm_vote_counts >= 2).reset_index()
    llm_majority_judgments = llm_majority_judgments.rename(columns={
        "eu_centrality_pass": "llm_eu",
        "monetary_policy_depth_pass": "llm_mp",
        "neutrality_pass": "llm_neu"
    })

    # --- Process Human Annotations ---
    # Filter human annotations to the sampled personas.
    human_annotations_sample = human_annotations_df[human_annotations_df['persona_id'].isin(sample_ids)]
    # Pivot the tidy data into a wide format for easier comparison.
    # One row per persona, with columns for each annotator's judgment on each criterion.
    human_judgments_wide = human_annotations_sample.pivot_table(
        index='persona_id',
        columns=['annotator_id', 'criterion'],
        values='judgment'
    ).reset_index()
    # Flatten the multi-level column index.
    human_judgments_wide.columns = ['_'.join(col).strip() if isinstance(col, tuple) and col[1] else col[0] for col in human_judgments_wide.columns.values]

    # --- Align All Judgments ---
    # Merge the LLM and human judgments into a single DataFrame.
    aligned_df = pd.merge(llm_majority_judgments, human_judgments_wide, on='persona_id')

    logger.info(f"Successfully aligned judgments for {len(aligned_df)} personas.")
    return aligned_df

# ------------------------------------------------------------------------------
# Task 9, Step 2 & 3 Helper: Compute kappa scores and generate report
# ------------------------------------------------------------------------------
def _compute_and_report_kappa(aligned_judgments_df: pd.DataFrame) -> pd.DataFrame:
    """
    Computes Cohen's kappa scores and confusion matrices for all rater pairs.

    Args:
        aligned_judgments_df: A DataFrame with aligned judgments from all raters.

    Returns:
        A DataFrame summarizing the reliability analysis results.
    """
    # Define the criteria and the raters to be compared.
    criteria = {"eu": "EU Centrality", "mp": "MP Depth", "neu": "Neutrality"}
    # Assuming annotator IDs are 'A1' and 'A2' from the pivoted columns.
    raters = {"A1": "Human 1", "A2": "Human 2", "llm": "LLM Majority"}
    rater_pairs = [("A1", "A2"), ("A1", "llm"), ("A2", "llm")]

    # Initialize a list to store the results.
    results = []

    # Iterate through each criterion to compute kappa scores.
    for crit_key, crit_name in criteria.items():
        # Iterate through each pair of raters.
        for rater1_key, rater2_key in rater_pairs:
            # Construct the column names for the current criterion and raters.
            col1 = f"{rater1_key}_{crit_key}"
            col2 = f"{rater2_key}_{crit_key}"

            # Extract the judgment vectors for the two raters.
            y1 = aligned_judgments_df[col1]
            y2 = aligned_judgments_df[col2]

            # --- Compute Cohen's Kappa ---
            # Equation: κ = (p_o - p_e) / (1 - p_e)
            kappa_score = cohen_kappa_score(y1, y2)

            # --- Compute Confusion Matrix ---
            # tn, fp, fn, tp
            cm = confusion_matrix(y1, y2, labels=[False, True]).ravel()

            # Store the results in a dictionary.
            results.append({
                "Criterion": crit_name,
                "Rater 1": raters[rater1_key],
                "Rater 2": raters[rater2_key],
                "Cohen's Kappa": kappa_score,
                "Confusion Matrix (tn, fp, fn, tp)": tuple(cm)
            })

    # Convert the list of results into a DataFrame.
    report_df = pd.DataFrame(results)

    # --- Assess Reliability ---
    # Add a qualitative interpretation of the kappa score.
    def interpret_kappa(score: float) -> str:
        if score < 0.2: return "Poor"
        if score < 0.4: return "Fair"
        if score < 0.6: return "Moderate"
        if score >= 0.6 and score < 0.8: return "Substantial"
        if score >= 0.8: return "Almost Perfect"
        return "N/A"

    report_df["Interpretation"] = report_df["Cohen's Kappa"].apply(interpret_kappa)

    # Check if the results meet the paper's reported reliability standard.
    # The paper reports kappa values in the range [0.61, 0.81].
    report_df["Meets Paper Standard (κ > 0.6)"] = report_df["Cohen's Kappa"] > 0.6

    return report_df

# ------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# ------------------------------------------------------------------------------
def validate_llm_judge_reliability(
    persona_step3_df: pd.DataFrame,
    all_judgments_df: pd.DataFrame,
    human_annotations_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the validation of the LLM-as-judge's reliability.

    This function performs a full inter-rater reliability analysis:
    1.  Draws a reproducible random sample of personas.
    2.  Aligns the LLM's majority-vote judgments with judgments from two
        human annotators for the sampled personas.
    3.  Computes Cohen's kappa and confusion matrices for all pairs of raters
        (Human-Human, Human-LLM) across all three judgment criteria.
    4.  Generates a comprehensive report assessing if the LLM's reliability
        meets the "substantial agreement" standard reported in the paper.

    Args:
        persona_step3_df: The deduplicated DataFrame of candidate personas.
        all_judgments_df: DataFrame containing all raw LLM judgment runs.
        human_annotations_df: A tidy DataFrame with human annotations, containing
                              columns ['persona_id', 'annotator_id', 'criterion', 'judgment'].
        config: The study configuration dictionary.

    Returns:
        A tuple containing:
        - A pandas DataFrame summarizing the kappa analysis results.
        - A dictionary containing a high-level summary and overall assessment.
    """
    # Log the start of the task.
    logger.info("Starting Task 9: Validate LLM-as-judge reliability.")

    # --- Configuration Extraction ---
    kappa_config = config['phase_1_parameters']['llm_as_judge']['kappa_validation']
    sample_size = kappa_config['human_annotated_sample_size']
    # Use the main Monte Carlo seed for reproducibility in sampling.
    random_seed = config['phase_2_parameters']['scoring_and_inference']['monte_carlo_seed']

    # --- Step 1: Sample and Align Data ---
    # Prepare the analysis-ready DataFrame with judgments from all sources.
    aligned_judgments_df = _prepare_kappa_validation_data(
        persona_step3_df, all_judgments_df, human_annotations_df, sample_size, random_seed
    )

    # --- Step 2 & 3: Compute Kappa and Generate Report ---
    # Compute all statistics and generate the detailed report DataFrame.
    kappa_report_df = _compute_and_report_kappa(aligned_judgments_df)

    # --- Final Assessment ---
    # Determine if the overall process meets the paper's standard.
    # The validation passes if all computed kappa scores are > 0.6.
    overall_assessment = "SUCCESS" if kappa_report_df["Meets Paper Standard (κ > 0.6)"].all() else "FAILURE"

    # Create a high-level summary dictionary.
    summary = {
        "overall_assessment": overall_assessment,
        "assessment_details": (
            "All computed Cohen's kappa scores exceed 0.6, indicating 'Substantial Agreement' "
            "and successfully replicating the paper's reliability findings."
            if overall_assessment == "SUCCESS"
            else "One or more kappa scores were below the 0.6 threshold for 'Substantial Agreement'."
        ),
        "sample_size": sample_size,
    }

    # Log the final assessment.
    logger.info(f"LLM judge reliability validation complete. Overall Assessment: {overall_assessment}")

    # Return the detailed report and the high-level summary.
    return kappa_report_df, summary


In [None]:
# Task 10 — Assemble forecasting prompts (persona and no-persona arms)

# ==============================================================================
# Task 10: Assemble forecasting prompts (persona and no-persona arms)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 10, Step 1 & 2 Helper: Prompt Templates
# ------------------------------------------------------------------------------

# Define the verbatim system prompt template for the persona arm.
_SYSTEM_PROMPT_PERSONA = """You are participating in the European Central Bank's Survey of Professional Forecasters (ECB-SPF) for round: {survey_round}.
Today's date is {survey_date_str}.
You will be asked to provide point forecasts for a set of key macroeconomic indicators (inflation, core inflation, GDP growth and unemployment) for the euro area at different time horizons.

You are: {persona_blurb}"""

# Define the verbatim system prompt template for the no-persona (ablation) arm.
_SYSTEM_PROMPT_NO_PERSONA = """You are participating in the European Central Bank's Survey of Professional Forecasters (ECB-SPF) for round: {survey_round}.
Today's date is {survey_date_str}.
You will be asked to provide point forecasts for a set of key macroeconomic indicators (inflation, core inflation, GDP growth and unemployment) for the euro area at different time horizons."""

# Define the verbatim user prompt template, shared by both arms.
_USER_PROMPT_TEMPLATE = """ECONOMIC CONTEXT:
- Latest realized data:
Variable  Period       Value
HICP      {hicp_period:<9} {hicp_value}
HICPX     {hicpx_period:<9} {hicpx_value}
UNR       {unr_period:<9} {unr_value}
RGDP      {rgdp_period:<9} {rgdp_value}

- Median forecasts from the previous SPF round:
Variable  Horizon  Median forecast
HICP      {year_cy:<7}  {prev_spf_hicp_cy}
HICP      {year_cy1:<7}  {prev_spf_hicp_cy1}
HICP      {year_cy2:<7}  {prev_spf_hicp_cy2}
HICP      {year_lt:<7}  {prev_spf_hicp_lt}
HICPX     {year_cy:<7}  {prev_spf_hicpx_cy}
HICPX     {year_cy1:<7}  {prev_spf_hicpx_cy1}
HICPX     {year_cy2:<7}  {prev_spf_hicpx_cy2}
HICPX     {year_lt:<7}  {prev_spf_hicpx_lt}
UNR       {year_cy:<7}  {prev_spf_unr_cy}
UNR       {year_cy1:<7}  {prev_spf_unr_cy1}
UNR       {year_cy2:<7}  {prev_spf_unr_cy2}
UNR       {year_lt:<7}  {prev_spf_unr_lt}
RGDP      {year_cy:<7}  {prev_spf_rgdp_cy}
RGDP      {year_cy1:<7}  {prev_spf_rgdp_cy1}
RGDP      {year_cy2:<7}  {prev_spf_rgdp_cy2}
RGDP      {year_lt:<7}  {prev_spf_rgdp_lt}

- ECB monetary policy communication from the latest Governing Council meeting on {ecb_meeting_date_str}:
{ecb_communication_text}

TASK:
You are asked to provide one numeric point forecast for each target macroeconomic variable listed below, at multiple time horizons.
Do not use ranges or confidence intervals.
All forecasts should be expressed in percent (%), do not include units in the answer.

TARGETS:
For each of the following variables, provide a point forecast:
- HICP: HICP inflation
- HICPX: HICP inflation excluding food and energy
- RGDP: Real GDP growth
- UNR: Unemployment rate

Each variable should be forecast at the following horizons:
- t0: current calendar year ({year_cy})
- t1: next year ({year_cy1})
- t2: year after next ({year_cy2})
- lt: long-term ({year_lt})

OUTPUT SCHEMA:
{{
  "forecasts": [
    {{ "variable": "hicp",  "horizon": "t0", "value": "<<numeric>>" }},
    {{ "variable": "hicp",  "horizon": "t1", "value": "<<numeric>>" }},
    {{ "variable": "hicp",  "horizon": "t2", "value": "<<numeric>>" }},
    {{ "variable": "hicp",  "horizon": "lt", "value": "<<numeric>>" }},
    {{ "variable": "hicpx", "horizon": "t0", "value": "<<numeric>>" }},
    {{ "variable": "hicpx", "horizon": "t1", "value": "<<numeric>>" }},
    {{ "variable": "hicpx", "horizon": "t2", "value": "<<numeric>>" }},
    {{ "variable": "hicpx", "horizon": "lt", "value": "<<numeric>>" }},
    {{ "variable": "rgdp",  "horizon": "t0", "value": "<<numeric>>" }},
    {{ "variable": "rgdp",  "horizon": "t1", "value": "<<numeric>>" }},
    {{ "variable": "rgdp",  "horizon": "t2", "value": "<<numeric>>" }},
    {{ "variable": "rgdp",  "horizon": "lt", "value": "<<numeric>>" }},
    {{ "variable": "unr",   "horizon": "t0", "value": "<<numeric>>" }},
    {{ "variable": "unr",   "horizon": "t1", "value": "<<numeric>>" }},
    {{ "variable": "unr",   "horizon": "t2", "value": "<<numeric>>" }},
    {{ "variable": "unr",   "horizon": "lt", "value": "<<numeric>>" }}
  ]
}}

Reply only with the JSON, no additional text."""

# ------------------------------------------------------------------------------
# Task 10, Step 3 Helper: Populate placeholders for a single round
# ------------------------------------------------------------------------------
def _prepare_context_placeholders(
    context_row: pd.Series
) -> Dict[str, Union[str, int, float]]:
    """
    Prepares a dictionary of placeholder values from a row of contextual_data_df.

    Args:
        context_row: A pandas Series representing one row (one survey round).

    Returns:
        A dictionary where keys are placeholder names and values are the
        formatted data to be injected into the prompt templates.
    """
    # Helper to format values, converting NaN to "N/A".
    def fmt(val: Any, precision: int = 2) -> str:
        if pd.isna(val):
            return "N/A"
        if isinstance(val, (float, np.floating)):
            return f"{val:.{precision}f}"
        return str(val)

    # --- Date and Year Placeholders ---
    base_year = context_row['survey_date'].year
    placeholders = {
        "survey_round": context_row['survey_round'],
        "survey_date_str": context_row['survey_date'].strftime('%d/%m/%Y'),
        "ecb_meeting_date_str": context_row['ecb_meeting_date'].strftime('%Y-%m-%d'),
        "year_cy": base_year,
        "year_cy1": base_year + 1,
        "year_cy2": base_year + 2,
        "year_lt": context_row['lt_year'],
    }

    # --- Latest Realized Data Placeholders ---
    realized = context_row['latest_realized_data']
    placeholders.update({
        "hicp_period": realized['hicp']['period'] or "N/A",
        "hicp_value": fmt(realized['hicp']['value']),
        "hicpx_period": realized['hicpx']['period'] or "N/A",
        "hicpx_value": fmt(realized['hicpx']['value']),
        "unr_period": realized['unr']['period'] or "N/A",
        "unr_value": fmt(realized['unr']['value']),
        "rgdp_period": realized['rgdp']['period'] or "N/A",
        "rgdp_value": fmt(realized['rgdp']['value']),
    })

    # --- Previous SPF Medians Placeholders ---
    medians = context_row['previous_spf_medians']
    placeholders.update({
        "prev_spf_hicp_cy": fmt(medians['hicp'].get(base_year)),
        "prev_spf_hicp_cy1": fmt(medians['hicp'].get(base_year + 1)),
        "prev_spf_hicp_cy2": fmt(medians['hicp'].get(base_year + 2)),
        "prev_spf_hicp_lt": fmt(medians['hicp'].get(context_row['lt_year'])),
        "prev_spf_hicpx_cy": fmt(medians['hicpx'].get(base_year)),
        "prev_spf_hicpx_cy1": fmt(medians['hicpx'].get(base_year + 1)),
        "prev_spf_hicpx_cy2": fmt(medians['hicpx'].get(base_year + 2)),
        "prev_spf_hicpx_lt": fmt(medians['hicpx'].get(context_row['lt_year'])),
        "prev_spf_unr_cy": fmt(medians['unr'].get(base_year)),
        "prev_spf_unr_cy1": fmt(medians['unr'].get(base_year + 1)),
        "prev_spf_unr_cy2": fmt(medians['unr'].get(base_year + 2)),
        "prev_spf_unr_lt": fmt(medians['unr'].get(context_row['lt_year'])),
        "prev_spf_rgdp_cy": fmt(medians['rgdp'].get(base_year)),
        "prev_spf_rgdp_cy1": fmt(medians['rgdp'].get(base_year + 1)),
        "prev_spf_rgdp_cy2": fmt(medians['rgdp'].get(base_year + 2)),
        "prev_spf_rgdp_lt": fmt(medians['rgdp'].get(context_row['lt_year'])),
    })

    # --- ECB Communication Text ---
    placeholders["ecb_communication_text"] = context_row['ecb_communication_text']

    return placeholders

# ------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# ------------------------------------------------------------------------------
def assemble_forecasting_prompts(
    persona_final_df: pd.DataFrame,
    contextual_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Generator[Dict[str, Any], None, None]:
    """
    Assembles and yields all forecasting prompts for both experimental arms.

    This function iterates through each survey round and generates the full set
    of prompts required for the experiment:
    1.  Persona Arm: One prompt for each of the 2,368 final personas for each
        of the 50 survey rounds.
    2.  No-Persona Arm: 100 baseline prompts for each of the 50 survey rounds.

    The function is implemented as a generator to be memory-efficient, yielding
    one prompt object at a time.

    Args:
        persona_final_df: The final, filtered DataFrame of personas (P*).
        contextual_data_df: The cleansed DataFrame of contextual data.
        config: The study configuration dictionary.

    Yields:
        A dictionary representing a single, fully-formed prompt object,
        containing API messages and metadata.
    """
    # Log the start of the prompt assembly process.
    logger.info("Assembling forecasting prompts for both persona and no-persona arms...")

    # Extract the number of baseline runs from the configuration.
    baseline_runs = config['phase_2_parameters']['ablation_study']['baseline_runs_per_round']

    # Iterate through each survey round in the contextual data.
    for _, context_row in contextual_data_df.iterrows():
        # Prepare the dictionary of placeholder values for the current round.
        placeholders = _prepare_context_placeholders(context_row)
        # Format the shared user message for this round.
        user_message = _USER_PROMPT_TEMPLATE.format(**placeholders)

        # --- Persona Arm Prompt Generation ---
        # Iterate through each persona in the final set.
        for persona_id, persona_row in persona_final_df.iterrows():
            # Add the persona-specific placeholder.
            placeholders["persona_blurb"] = persona_row['description']
            # Format the persona-specific system message.
            system_message = _SYSTEM_PROMPT_PERSONA.format(**placeholders)

            # Yield a structured prompt object.
            yield {
                "metadata": {
                    "arm": "persona",
                    "id": str(persona_id),
                    "round": context_row['survey_round'],
                },
                "messages": [
                    {"role": "system", "content": system_message},
                    {"role": "user", "content": user_message},
                ],
            }

        # --- No-Persona Arm Prompt Generation ---
        # Iterate to create the specified number of baseline prompts.
        for i in range(baseline_runs):
            # Format the no-persona system message.
            system_message = _SYSTEM_PROMPT_NO_PERSONA.format(**placeholders)

            # Yield a structured prompt object.
            yield {
                "metadata": {
                    "arm": "baseline",
                    "id": i + 1, # Use a 1-based index for baseline runs.
                    "round": context_row['survey_round'],
                },
                "messages": [
                    {"role": "system", "content": system_message},
                    {"role": "user", "content": user_message},
                ],
            }

    # Log the completion of the generator.
    logger.info("Finished yielding all forecasting prompts.")


In [None]:
# Task 11 — Generate forecasts (persona arm)

# ==============================================================================
# Task 11: Generate forecasts (persona arm)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 11, Step 1 Helper: Configure LLM forecaster
# ------------------------------------------------------------------------------
def _initialize_openai_client(config: Dict[str, Any]) -> AsyncOpenAI:
    """
    Initializes the asynchronous OpenAI client.

    Args:
        config: The study configuration dictionary.

    Returns:
        An initialized AsyncOpenAI client.
    """
    # Input validation: Ensure an API key is available.
    if not config.get("openai_api_key"):
        raise ValueError("OpenAI API key not found in environment variables or config.")

    # Initialize the asynchronous OpenAI client.
    client = AsyncOpenAI(api_key=config["openai_api_key"])

    # Log the successful initialization.
    logger.info("OpenAI client initialized successfully.")

    # Return the client.
    return client

# ------------------------------------------------------------------------------
# Task 11, Step 2 Helpers: Concurrent API call execution and response capture
# ------------------------------------------------------------------------------
async def _fetch_forecast(
    client: AsyncOpenAI,
    prompt_obj: Dict[str, Any],
    api_config: Dict[str, Any],
    semaphore: asyncio.Semaphore
) -> Dict[str, Any]:
    """
    Asynchronously calls the LLM API for a single forecasting prompt.

    This is a dedicated helper function for the forecasting task. It correctly
    uses the `messages` list from the provided prompt object. It includes robust
    error handling and a retry mechanism.

    Args:
        client: The asynchronous OpenAI API client.
        prompt_obj: The prompt object, containing `metadata` and `messages`.
        api_config: The API configuration dictionary for the forecasting LLM.
        semaphore: An asyncio.Semaphore to control concurrency.

    Returns:
        A dictionary containing the original metadata and the API response,
        or a failure record.
    """
    # Acquire the semaphore to ensure we do not exceed the max concurrency limit.
    async with semaphore:
        # Extract metadata from the prompt object for precise logging and error reporting.
        metadata = prompt_obj["metadata"]

        # Loop to implement the retry mechanism. It will run for `max_retries` + 1 attempts.
        for attempt in range(api_config['api_settings']['max_retries'] + 1):
            try:
                # Make the asynchronous API call to the chat completions endpoint.
                response = await client.chat.completions.create(
                    # The model to use for the completion.
                    model=api_config['model_name'],
                    # The list of messages comprising the conversation, passed directly from the prompt object.
                    messages=prompt_obj["messages"],
                    # The sampling temperature. 1.0 is used for stochasticity.
                    temperature=api_config['api_settings']['temperature'],
                    # Instruct the model to return a guaranteed JSON object.
                    response_format={"type": "json_object"},
                )
                # If the call is successful, return a structured success record.
                return {
                    "metadata": metadata,
                    "status": "SUCCESS",
                    "raw_response": response.choices[0].message.content,
                }
            # Catch specific, retryable API errors (e.g., rate limits, server errors).
            except (APIError, RateLimitError) as e:
                # Log the failure of the current attempt.
                logger.warning(f"API call for {metadata['id']} round {metadata['round']} failed on attempt {attempt + 1}: {e}")
                # If this was the final attempt, break the loop to return a failure.
                if attempt >= api_config['api_settings']['max_retries']:
                    break
                # Wait for the specified backoff period before the next attempt.
                await asyncio.sleep(api_config['api_settings']['retry_backoff_sec'])
            # Catch any other unexpected exceptions (e.g., network issues, validation errors).
            except Exception as e:
                # Log the unexpected error. These are typically not retried.
                logger.error(f"Unexpected error for {metadata['id']} round {metadata['round']} on attempt {attempt + 1}: {e}")
                # Break the loop immediately as the error is likely not transient.
                break

    # If all attempts fail, return a structured failure record.
    return {"metadata": metadata, "status": "FAILURE", "raw_response": None}

# ------------------------------------------------------------------------------
# API Call Orchestrator (previously _execute_api_calls)
# ------------------------------------------------------------------------------
async def _run_forecast_generation(
    prompt_generator: Generator[Dict[str, Any], None, None],
    arm_name: str,
    config: Dict[str, Any],
    checkpoint_path: str
) -> List[Dict[str, Any]]:
    """
    Orchestrates the concurrent generation of forecasts for a specific arm.

    This function manages the entire asynchronous workflow for making a high
    volume of API calls. It is designed for resilience and efficiency:
    1.  Filters prompts for the specified experimental arm ('persona' or 'baseline').
    2.  Implements resumability by loading completed tasks from a checkpoint file.
    3.  Uses a semaphore to control concurrency and respect rate limits.
    4.  Calls a dedicated helper (`_fetch_forecast`) for each API request.
    5.  Writes results immediately to the checkpoint file as they complete.

    Args:
        prompt_generator: The generator yielding all prompt objects for all arms.
        arm_name: The name of the experimental arm to run ('persona' or 'baseline').
        config: The study configuration dictionary.
        checkpoint_path: Path to the JSONL file for saving/resuming raw results.

    Returns:
        A list of all raw result dictionaries (both resumed and newly fetched).
    """
    # --- Configuration and Initialization ---
    # Extract the API configuration for the forecasting LLM.
    api_config = config['phase_2_parameters']['forecasting_llm']
    # Extract the concurrency limit from the orchestration config.
    concurrency = config['phase_1_parameters']['orchestration']['max_concurrency']

    # Filter the main prompt generator to get only the prompts for the specified arm.
    prompts_for_arm = [p for p in prompt_generator if p["metadata"]["arm"] == arm_name]

    # --- Resumability Logic ---
    # Initialize a list to hold results from the checkpoint file.
    completed_results: List[Dict[str, Any]] = []
    # Initialize a set to track the unique identifiers of completed tasks.
    completed_tasks: set[tuple] = set()
    try:
        # Open and read the checkpoint file line by line.
        with open(checkpoint_path, 'r') as f:
            # Each line is a JSON object representing a completed result.
            for line in f:
                # Parse the JSON line.
                res = json.loads(line)
                # Add the result to our list.
                completed_results.append(res)
                # Add the task's unique ID to the set of completed tasks.
                completed_tasks.add((str(res['metadata']['id']), res['metadata']['round']))
        # Log how many results were successfully resumed.
        logger.info(f"Resuming from checkpoint '{checkpoint_path}'. Found {len(completed_tasks)} completed forecasts for arm '{arm_name}'.")
    except FileNotFoundError:
        # If the file doesn't exist, simply log that we are starting fresh.
        logger.info(f"No checkpoint file found at '{checkpoint_path}'. Starting from scratch for arm '{arm_name}'.")

    # --- Asynchronous Task Preparation ---
    # Initialize the asynchronous OpenAI client.
    client = AsyncOpenAI()
    # Initialize a semaphore to control the maximum number of concurrent API calls.
    semaphore = asyncio.Semaphore(concurrency)

    # Create a list of coroutine tasks for the prompts that have not yet been completed.
    tasks_to_run: List[Coroutine] = []
    # Iterate through all prompts designated for this arm.
    for prompt in prompts_for_arm:
        # Extract the unique identifier for the current prompt.
        metadata = prompt["metadata"]
        task_id = (str(metadata["id"]), metadata["round"])
        # If this task's ID is not in the set of completed tasks, create a new task for it.
        if task_id not in completed_tasks:
            # Create the coroutine by calling the dedicated `_fetch_forecast` helper.
            tasks_to_run.append(
                _fetch_forecast(client, prompt, api_config, semaphore)
            )

    # --- Asynchronous Task Execution ---
    # Proceed only if there are new tasks to execute.
    if tasks_to_run:
        # Log the number of new API calls that will be made.
        logger.info(f"Executing {len(tasks_to_run)} new forecast API calls for arm '{arm_name}'...")
        # Open the checkpoint file in append mode to add new results.
        with open(checkpoint_path, 'a') as f:
            # Use tqdm_asyncio.as_completed to process tasks as they finish and show a progress bar.
            for result_coro in tqdm_asyncio.as_completed(tasks_to_run, total=len(tasks_to_run), desc=f"Forecasting ({arm_name} arm)"):
                # Await the result of the next completed coroutine.
                result = await result_coro
                # Append the new result to our in-memory list.
                completed_results.append(result)
                # Immediately write the JSON result as a new line in the checkpoint file for persistence.
                f.write(json.dumps(result) + '\n')
    else:
        # If all tasks were already completed, log this and do nothing.
        logger.info(f"All forecasting tasks for arm '{arm_name}' were already complete.")

    # Return the complete list of results, including those loaded from the checkpoint and newly fetched ones.
    return completed_results

# ------------------------------------------------------------------------------
# Task 11, Step 3 Helper: Response persistence and initial quality control
# ------------------------------------------------------------------------------
def _parse_and_persist_forecasts(
    raw_results: List[Dict[str, Any]],
    id_column_name: str
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Parses raw forecast results, validates them, and creates a tidy DataFrame.

    This generic function is designed to process the raw output from the API
    execution stage for either the 'persona' or 'baseline' arm. It performs:
    1.  Robust JSON parsing of the raw LLM response.
    2.  Strict schema validation (must contain 16 forecast items).
    3.  Data type conversion and validation for the forecast 'value'.

    Args:
        raw_results: A list of raw result dictionaries from the API calls.
        id_column_name: The name for the identifier column in the output
                        DataFrame ('persona_id' or 'baseline_id').

    Returns:
        A tuple containing:
        - A pandas DataFrame of the clean, parsed forecasts.
        - A dictionary reporting parsing statistics (successes and failures).
    """
    # Initialize a list to store successfully parsed forecast records.
    parsed_records: List[Dict[str, Any]] = []
    # Initialize a counter for failed or unparsable API calls.
    failure_count = 0

    # Log the start of the parsing process.
    logger.info(f"Parsing {len(raw_results)} raw results for arm '{id_column_name.split('_')[0]}'.")

    # Iterate through each raw result dictionary from the API execution stage.
    for result in raw_results:
        # Check the status of the API call. If it failed, increment the counter and skip.
        if result.get("status") == "FAILURE":
            # Increment the failure count.
            failure_count += 1
            # Continue to the next result.
            continue

        try:
            # Extract the metadata and the raw JSON response string.
            metadata = result["metadata"]
            raw_response_str = result["raw_response"]

            # Ensure the raw response is not null or empty before parsing.
            if not raw_response_str:
                raise ValueError("Raw response content is empty.")

            # Parse the JSON string into a Python dictionary.
            response_data = json.loads(raw_response_str)
            # Extract the list of forecasts from the parsed data.
            forecast_list = response_data["forecasts"]

            # Validate that the response contains exactly 16 forecast items as per the prompt schema.
            if len(forecast_list) != 16:
                raise ValueError(f"Expected 16 forecasts, but found {len(forecast_list)}")

            # Process each individual forecast item in the list.
            for item in forecast_list:
                # Append a structured record to our list for DataFrame creation.
                parsed_records.append({
                    id_column_name: metadata["id"],
                    "round": metadata["round"],
                    "variable": item["variable"].upper(), # Standardize to uppercase
                    "horizon": item["horizon"],
                    "value": float(item["value"]), # Convert value to float, will raise error if not possible
                })
        # Catch any exceptions during parsing or validation.
        except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
            # Log a warning with specific details about the failure.
            metadata = result.get("metadata", {})
            logger.warning(
                f"Failed to parse response for ID {metadata.get('id')} "
                f"in round {metadata.get('round')}: {e}"
            )
            # Increment the failure count.
            failure_count += 1

    # Create the final pandas DataFrame from the list of successfully parsed records.
    forecasts_df = pd.DataFrame(parsed_records)

    # Compile statistics for the final report.
    parsing_stats = {
        "total_api_calls": len(raw_results),
        "successful_parses": len(raw_results) - failure_count,
        "failed_parses": failure_count,
    }

    # Return the clean DataFrame and the parsing statistics.
    return forecasts_df, parsing_stats

# ------------------------------------------------------------------------------
# Orchestrator for Task 11
# ------------------------------------------------------------------------------
def generate_persona_forecasts(
    prompt_generator: Generator[Dict[str, Any], None, None],
    config: Dict[str, Any],
    checkpoint_path: str
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the generation and parsing of forecasts for the persona arm.

    This function manages the end-to-end workflow for the persona arm:
    1.  It calls a robust, asynchronous helper (`_run_forecast_generation`) to
        execute all required API calls, handling concurrency, rate limiting,
        and resumability.
    2.  It then passes the raw results to a generic parsing and validation
        function (`_parse_and_persist_forecasts`) to produce a clean,
        analysis-ready DataFrame.

    Args:
        prompt_generator: The generator yielding all prompt objects for all arms.
        config: The study configuration dictionary.
        checkpoint_path: Path to the JSONL file for saving/resuming raw results
                         for the persona arm.

    Returns:
        A tuple containing:
        - persona_forecasts_df (pd.DataFrame): The final DataFrame of persona forecasts.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the persona arm forecast generation process.
    logger.info("Starting Task 11: Generate forecasts for the persona arm.")

    # --- Step 1: Asynchronous API Call Execution ---
    # Run the main asynchronous generation function for the 'persona' arm.
    # This function handles all the complexity of concurrency, retries, and checkpointing.
    raw_results = asyncio.run(_run_forecast_generation(
        prompt_generator=prompt_generator,
        arm_name='persona',
        config=config,
        checkpoint_path=checkpoint_path
    ))

    # --- Step 2: Parse, Validate, and Persist Results ---
    # Call the generic parsing helper to process the raw results into a clean DataFrame.
    # Specify 'persona_id' as the name for the identifier column.
    persona_forecasts_df, parsing_stats = _parse_and_persist_forecasts(
        raw_results=raw_results,
        id_column_name='persona_id'
    )

    # --- Report Generation ---
    # Compile the final report with statistics from both stages.
    report = {
        "arm": "persona",
        "total_api_calls_processed": parsing_stats["total_api_calls"],
        "successful_forecast_sets": parsing_stats["successful_parses"],
        "failed_or_unparsable_sets": parsing_stats["failed_parses"],
        "total_forecast_points_generated": len(persona_forecasts_df),
    }

    # Log the final summary of the process.
    logger.info(f"Persona arm forecast generation complete. "
                f"Successfully parsed {report['successful_forecast_sets']} forecast sets, "
                f"yielding {report['total_forecast_points_generated']} data points.")

    # Return the final DataFrame and the summary report.
    return persona_forecasts_df, report


In [None]:
# Task 12 — Generate forecasts (no-persona baseline arm)

# ==============================================================================
# Task 12: Generate forecasts (no-persona baseline arm)
# =============================================================================

# ------------------------------------------------------------------------------
# Task 12, Step 3 Helper: Validate baseline variation
# ------------------------------------------------------------------------------
def _validate_baseline_variation(
    baseline_df: pd.DataFrame,
    zero_std_threshold: float = 1e-9
) -> Dict[str, Any]:
    """
    Validates the stochastic variation in the baseline forecasts.

    This function checks if the `temperature=1.0` setting produced diverse
    outputs across the 100 baseline runs for each forecast group. It calculates
    the standard deviation for each group and reports the proportion of groups
    that exhibit near-zero variation.

    Args:
        baseline_df: The clean DataFrame of baseline forecasts.
        zero_std_threshold: The numerical tolerance for defining "zero"
                            standard deviation.

    Returns:
        A dictionary containing the validation statistics and an overall assessment.
    """
    # Log the start of the validation check.
    logger.info("Validating stochastic variation in baseline forecasts...")

    # Return an empty report if the input DataFrame is empty.
    if baseline_df.empty:
        logger.warning("Baseline DataFrame is empty. Skipping variation check.")
        return {"status": "SKIPPED_NO_DATA"}

    # Group the forecasts by round, variable, and horizon to analyze variation across runs.
    # The `std()` aggregation computes the standard deviation of the 'value' for each group.
    variation_stats = baseline_df.groupby(["round", "variable", "horizon"])["value"].std()

    # Count the total number of distinct forecast groups.
    total_groups = len(variation_stats)

    # If there are no groups, there's nothing to validate.
    if total_groups == 0:
        return {"status": "SKIPPED_NO_GROUPS"}

    # Count the number of groups where the standard deviation is below the threshold (effectively zero).
    zero_variation_groups = (variation_stats < zero_std_threshold).sum()

    # Calculate the ratio of groups that showed no meaningful variation.
    zero_variation_ratio = zero_variation_groups / total_groups

    # Determine the overall status of the check based on a predefined tolerance (e.g., 10%).
    # A high ratio might indicate a problem with the LLM's stochastic sampling.
    status = "SUCCESS" if zero_variation_ratio <= 0.10 else "WARNING"

    # Log a warning if the ratio of zero-variation groups is high.
    if status == "WARNING":
        logger.warning(
            f"High ratio of zero-variation groups found: {zero_variation_ratio:.2%}. "
            "This may indicate an issue with model stochasticity."
        )
    else:
        logger.info(f"Stochastic variation check passed. Zero-variation ratio: {zero_variation_ratio:.2%}.")

    # Return a structured dictionary with the detailed validation results.
    return {
        "status": status,
        "total_groups_analyzed": total_groups,
        "zero_variation_groups": int(zero_variation_groups),
        "zero_variation_ratio": zero_variation_ratio,
    }


# ------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# ------------------------------------------------------------------------------
def generate_baseline_forecasts(
    prompt_generator: Generator[Dict[str, Any], None, None],
    config: Dict[str, Any],
    checkpoint_path: str
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the generation and validation of forecasts for the baseline arm.

    This function manages the end-to-end workflow for the control group:
    1.  It calls the generic, robust asynchronous helper (`_run_forecast_generation`)
        to execute all API calls for the 'baseline' arm.
    2.  It passes the raw results to the generic parsing function to produce a
        clean, analysis-ready DataFrame.
    3.  It performs a baseline-specific validation to ensure that the model's
        stochasticity produced sufficient variation across the runs.

    Args:
        prompt_generator: The generator yielding all prompt objects for all arms.
        config: The study configuration dictionary.
        checkpoint_path: Path to the JSONL file for saving/resuming raw results
                         for the baseline arm.

    Returns:
        A tuple containing:
        - baseline_forecasts_df (pd.DataFrame): The final DataFrame of baseline forecasts.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the baseline arm forecast generation process.
    logger.info("Starting Task 12: Generate forecasts for the baseline arm.")

    # --- Step 1: Asynchronous API Call Execution ---
    # Run the main asynchronous generation function, specifying the 'baseline' arm.
    # This function handles concurrency, retries, and checkpointing.
    raw_results = asyncio.run(_run_forecast_generation(
        prompt_generator=prompt_generator,
        arm_name='baseline',
        config=config,
        checkpoint_path=checkpoint_path
    ))

    # --- Step 2: Parse, Validate, and Persist Results ---
    # Call the generic parsing helper to process the raw results.
    # Specify 'baseline_id' as the name for the identifier column.
    baseline_forecasts_df, parsing_stats = _parse_and_persist_forecasts(
        raw_results=raw_results,
        id_column_name='baseline_id'
    )

    # --- Step 3: Validate Stochastic Variation ---
    # Perform the baseline-specific check for forecast diversity.
    variation_report = _validate_baseline_variation(baseline_forecasts_df)

    # --- Report Generation ---
    # Compile the final, comprehensive report for the baseline arm.
    report = {
        "arm": "baseline",
        "total_api_calls_processed": parsing_stats["total_api_calls"],
        "successful_forecast_sets": parsing_stats["successful_parses"],
        "failed_or_unparsable_sets": parsing_stats["failed_parses"],
        "total_forecast_points_generated": len(baseline_forecasts_df),
        "variation_validation_report": variation_report,
    }

    # Log the final summary of the process.
    logger.info(f"Baseline arm forecast generation complete. "
                f"Successfully parsed {report['successful_forecast_sets']} forecast sets. "
                f"Variation check status: {variation_report.get('status', 'N/A')}.")

    # Return the final DataFrame and the summary report.
    return baseline_forecasts_df, report


In [None]:
# Task 13 — Parse and QC all LLM outputs

# ==============================================================================
# Task 13: Parse and QC all LLM outputs
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 13, Step 1 Helper: Consolidate persona and baseline forecasts
# ------------------------------------------------------------------------------
def _consolidate_forecasts(
    persona_forecasts_df: pd.DataFrame,
    baseline_forecasts_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Consolidates persona and baseline forecast DataFrames into a single,
    unified DataFrame with a common schema.

    Args:
        persona_forecasts_df: DataFrame of forecasts from the persona arm.
        baseline_forecasts_df: DataFrame of forecasts from the baseline arm.

    Returns:
        A single DataFrame containing all forecasts from both arms.
    """
    # Log the start of the consolidation process.
    logger.info("Consolidating persona and baseline forecast DataFrames...")

    # --- Input Validation ---
    # Ensure input DataFrames are not empty.
    if persona_forecasts_df.empty or baseline_forecasts_df.empty:
        raise ValueError("Input forecast DataFrames cannot be empty for consolidation.")

    # --- Schema Unification ---
    # Create a copy of the persona DataFrame to avoid modifying the original.
    df_persona = persona_forecasts_df.copy()
    # Add a 'source_type' column to identify the experimental arm.
    df_persona['source_type'] = 'persona'
    # Rename the 'persona_id' column to the generic 'source_id'.
    df_persona.rename(columns={'persona_id': 'source_id'}, inplace=True)

    # Create a copy of the baseline DataFrame.
    df_baseline = baseline_forecasts_df.copy()
    # Add the 'source_type' column.
    df_baseline['source_type'] = 'baseline'
    # Rename the 'baseline_id' column to the generic 'source_id'.
    df_baseline.rename(columns={'baseline_id': 'source_id'}, inplace=True)

    # --- Concatenation ---
    # Concatenate the two prepared DataFrames vertically.
    unified_df = pd.concat([df_persona, df_baseline], ignore_index=True)

    # Define and enforce a canonical column order for consistency.
    final_columns = [
        'source_type', 'source_id', 'round', 'variable', 'horizon', 'value'
    ]
    # Reorder the columns.
    unified_df = unified_df[final_columns]

    # Log the result of the consolidation.
    logger.info(f"Successfully consolidated {len(unified_df)} total forecast points.")

    # Return the unified DataFrame.
    return unified_df

# ------------------------------------------------------------------------------
# Task 13, Step 2 Helper: Enforce value constraints and units
# ------------------------------------------------------------------------------
def _apply_final_qc_and_flagging(
    unified_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Applies final validation checks, flags outliers, and generates a QC report.

    Args:
        unified_df: The consolidated DataFrame of all forecasts.

    Returns:
        A tuple containing:
        - The final, quality-controlled DataFrame.
        - A dictionary summarizing the QC checks.
    """
    # Log the start of the final QC process.
    logger.info("Applying final quality control checks and flagging outliers...")
    # Create a copy to work on.
    df = unified_df.copy()

    # --- Final Numeric Validation ---
    # Check for any null values in the 'value' column, which indicate parsing failures.
    initial_rows = len(df)
    null_values_mask = df['value'].isna()
    num_nulls = null_values_mask.sum()

    # Define the final QC pass status. A row passes if its 'value' is not null.
    df['qc_pass'] = ~null_values_mask

    # --- Sanity Bounds Check (Flagging Outliers) ---
    # Define the plausible ranges for each macroeconomic variable.
    sanity_bounds = {
        'HICP': (-5.0, 25.0),
        'HICPX': (-5.0, 25.0),
        'RGDP': (-15.0, 15.0),
        'UNR': (0.0, 30.0),
    }

    # Initialize the 'is_outlier' column to False.
    df['is_outlier'] = False

    # Iterate through each variable to apply its specific bounds.
    for var, (min_val, max_val) in sanity_bounds.items():
        # Create a mask for rows corresponding to the current variable.
        var_mask = df['variable'] == var
        # Create a mask for values that are outside the defined bounds.
        outlier_mask = ~df['value'].between(min_val, max_val)
        # Apply the outlier flag only to the relevant rows.
        df.loc[var_mask & outlier_mask, 'is_outlier'] = True

    # --- Report Generation ---
    # Compile a summary report of the QC process.
    qc_report = {
        'total_rows_processed': initial_rows,
        'rows_passing_qc': int(df['qc_pass'].sum()),
        'rows_failing_qc (dropped)': int(num_nulls),
        'outliers_flagged': {
            var: int(df.loc[df['variable'] == var, 'is_outlier'].sum())
            for var in sanity_bounds.keys()
        }
    }

    # --- Final Filtering ---
    # Drop any rows that failed the basic QC check (i.e., had a null value).
    final_df = df[df['qc_pass']].copy()

    # Log a summary of the QC results.
    logger.info(f"QC complete. Passed: {qc_report['rows_passing_qc']}, Failed/Dropped: {qc_report['rows_failing_qc (dropped)']}.")

    # Return the clean DataFrame and the QC report.
    return final_df, qc_report

# ------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# ------------------------------------------------------------------------------
def parse_and_qc_all_forecasts(
    persona_forecasts_df: pd.DataFrame,
    baseline_forecasts_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Consolidates and performs final quality control on all generated forecasts.

    This function serves as the final gateway before the analysis phase. It
    takes the separate forecast DataFrames from the persona and baseline arms,
    and performs the following steps:
    1.  Consolidates them into a single, unified DataFrame with a common schema.
    2.  Applies a final, rigorous set of quality control checks, including
        validating numeric types and flagging outliers based on plausible
        macroeconomic ranges.
    3.  Drops any fundamentally invalid records (e.g., with null values).
    4.  Returns the final, clean DataFrame ready for analysis, along with a
        comprehensive QC report.

    Args:
        persona_forecasts_df: The DataFrame of forecasts from the persona arm.
        baseline_forecasts_df: The DataFrame of forecasts from the baseline arm.

    Returns:
        A tuple containing:
        - unified_forecasts_df (pd.DataFrame): The final, clean, consolidated DataFrame.
        - report (Dict[str, Any]): A dictionary summarizing the consolidation and QC process.
    """
    # Log the start of the task.
    logger.info("Starting Task 13: Parse and QC all LLM outputs.")

    # --- Step 1: Consolidate Forecasts ---
    # Call the helper to merge the two DataFrames into one.
    unified_df = _consolidate_forecasts(persona_forecasts_df, baseline_forecasts_df)

    # --- Step 2 & 3: Apply Final QC, Flagging, and Filtering ---
    # Call the helper to perform the final validation and outlier flagging.
    final_df, qc_report = _apply_final_qc_and_flagging(unified_df)

    # --- Final Report ---
    # The main report is the qc_report generated by the helper.
    logger.info("Consolidation and QC of all forecasts is complete.")

    # Return the final clean DataFrame and the summary report.
    return final_df, qc_report


In [None]:
# Task 14 — Compute AI panel medians (cross-sectional aggregation)

# ==============================================================================
# Task 14: Compute AI panel medians (cross-sectional aggregation)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 14, Step 1 & 2 Helper: Compute medians for a specific arm
# ------------------------------------------------------------------------------
def _compute_medians_for_arm(
    forecasts_df: pd.DataFrame,
    arm_name: str
) -> pd.DataFrame:
    """
    Computes the cross-sectional median forecast for a specific experimental arm.

    This function filters the unified forecast DataFrame for a given arm
    ('persona' or 'baseline'), then groups by each forecast combination
    (round, variable, horizon) and computes the median. It also counts the
    number of individual forecasts contributing to each median for quality control.

    Args:
        forecasts_df: The unified DataFrame containing all individual forecasts.
        arm_name: The name of the arm to process ('persona' or 'baseline').

    Returns:
        A DataFrame containing the median forecasts and sample sizes for the arm,
        indexed by ['round', 'variable', 'horizon'].
    """
    # Log the start of the aggregation for the specified arm.
    logger.info(f"Computing panel medians for '{arm_name}' arm...")

    # --- Input Validation ---
    # Ensure the arm_name is valid.
    if arm_name not in ['persona', 'baseline']:
        raise ValueError("arm_name must be either 'persona' or 'baseline'.")

    # --- Filtering and Aggregation ---
    # Filter the DataFrame to include only records from the specified arm.
    arm_df = forecasts_df[forecasts_df['source_type'] == arm_name]

    # Define the columns to group by for aggregation.
    grouping_cols = ['round', 'variable', 'horizon']

    # Perform the groupby and aggregation in one step for efficiency.
    # The .agg() method allows computing multiple statistics simultaneously.
    medians_df = arm_df.groupby(grouping_cols).agg(
        # Compute the median of the 'value' column for each group.
        # Equation: median(ŷ_i) for i in group
        median_forecast=('value', 'median'),
        # Count the number of forecasts in each group.
        sample_size=('value', 'size')
    ).reset_index() # Convert the grouped output back to a DataFrame.

    # Rename the 'median_forecast' column to be specific to the arm.
    medians_df.rename(columns={'median_forecast': f'ai_median_{arm_name}'}, inplace=True)

    # Log a summary of the aggregation.
    logger.info(f"Computed {len(medians_df)} median forecasts for '{arm_name}' arm.")

    # Return the resulting DataFrame.
    return medians_df

# ------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_ai_panel_medians(
    unified_forecasts_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the computation of AI panel medians for both experimental arms.

    This function takes the unified DataFrame of all individual forecasts and
    calculates the consensus forecast (median) for each experimental arm
    (persona and baseline) for every forecast instance (round, variable, horizon).

    Args:
        unified_forecasts_df: The clean, consolidated DataFrame of all forecasts
                              from Task 13.

    Returns:
        A tuple containing:
        - ai_panel_medians_df (pd.DataFrame): A DataFrame with median forecasts
          for both arms, ready for comparison.
        - report (Dict[str, Any]): A dictionary summarizing the aggregation process.
    """
    # Log the start of the task.
    logger.info("Starting Task 14: Compute AI panel medians.")

    # --- Input Validation ---
    # Ensure the input DataFrame is not empty and has the required columns.
    required_cols = {'source_type', 'round', 'variable', 'horizon', 'value'}
    if not required_cols.issubset(unified_forecasts_df.columns):
        raise ValueError(f"Input DataFrame is missing required columns. Expected: {required_cols}")

    # --- Step 1: Compute Persona Medians ---
    # Call the reusable helper to compute medians for the 'persona' arm.
    persona_medians_df = _compute_medians_for_arm(unified_forecasts_df, 'persona')

    # --- Step 2: Compute Baseline Medians ---
    # Call the reusable helper again to compute medians for the 'baseline' arm.
    baseline_medians_df = _compute_medians_for_arm(unified_forecasts_df, 'baseline')

    # --- Step 3: Combine Median DataFrames ---
    # Log the merging step.
    logger.info("Merging persona and baseline median forecasts...")
    # Perform an outer merge on the grouping keys. This is a robust way to combine
    # the two datasets and will reveal any inconsistencies in coverage (i.e., if a
    # group exists in one arm but not the other).
    ai_panel_medians_df = pd.merge(
        persona_medians_df,
        baseline_medians_df,
        on=['round', 'variable', 'horizon'],
        how='outer',
        suffixes=('_persona', '_baseline'),
        # 'validate' ensures the merge keys are unique, acting as an integrity check.
        validate='one_to_one'
    )

    # --- Report Generation ---
    # Check for any nulls created by the outer merge, which would indicate a problem.
    merge_issues = ai_panel_medians_df.isnull().sum().sum()
    if merge_issues > 0:
        logger.warning(f"Found {merge_issues} null values after merging. "
                       "This may indicate inconsistent coverage between arms.")

    # Compile the final report.
    report = {
        "total_median_forecasts": len(ai_panel_medians_df),
        "persona_arm_stats": {
            "min_sample_size": int(ai_panel_medians_df['sample_size_persona'].min()),
            "max_sample_size": int(ai_panel_medians_df['sample_size_persona'].max()),
            "mean_sample_size": ai_panel_medians_df['sample_size_persona'].mean(),
        },
        "baseline_arm_stats": {
            "min_sample_size": int(ai_panel_medians_df['sample_size_baseline'].min()),
            "max_sample_size": int(ai_panel_medians_df['sample_size_baseline'].max()),
            "mean_sample_size": ai_panel_medians_df['sample_size_baseline'].mean(),
        },
        "merge_consistency_check": "SUCCESS" if merge_issues == 0 else "WARNING",
    }

    # Log the completion of the task.
    logger.info("Successfully computed and merged AI panel medians.")

    # Return the final DataFrame and the summary report.
    return ai_panel_medians_df, report


In [None]:
# Task 15 — Align AI medians to human medians and realized outcomes

# ==============================================================================
# Task 15: Align AI medians to human medians and realized outcomes
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 15, Step 1 Helper: Map horizons to target years
# ------------------------------------------------------------------------------
def _add_target_year(
    forecasts_df: pd.DataFrame,
    contextual_data_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes and adds a canonical 'target_year' column to a forecast DataFrame.

    This function translates the forecast horizon (e.g., 'CY', 'CY+1', 'LT')
    into a specific calendar year, which is the essential key for joining
    forecasts with realized outcomes.

    Args:
        forecasts_df: A DataFrame containing forecasts with 'round' and 'horizon' columns.
        contextual_data_df: The DataFrame with contextual data, needed for the
                            'LT' (long-term) horizon mapping.

    Returns:
        The input DataFrame with a new 'target_year' column added.
    """
    # Log the start of the target year computation.
    logger.info("Computing 'target_year' from 'round' and 'horizon'...")
    # Create a copy to avoid modifying the original DataFrame.
    df = forecasts_df.copy()

    # --- Input Validation ---
    # Ensure required columns are present.
    if not {'round', 'horizon'}.issubset(df.columns):
        raise ValueError("Input DataFrame must contain 'round' and 'horizon' columns.")
    if not {'survey_round', 'lt_year'}.issubset(contextual_data_df.columns):
        raise ValueError("Contextual DataFrame must contain 'survey_round' and 'lt_year' columns.")

    # --- Vectorized Computation for Standard Horizons ---
    # Extract the base year (as an integer) from the 'round' string (e.g., '2013Q1' -> 2013).
    df['base_year'] = df['round'].str[:4].astype(int)
    # Define the integer offset for each standard horizon.
    horizon_offset_map = {'CY': 0, 'CY+1': 1, 'CY+2': 2}
    # Apply the offset to the base year to get the target year. 'LT' will result in NaN for now.
    df['target_year'] = df['base_year'] + df['horizon'].map(horizon_offset_map)

    # --- Mapping for 'LT' Horizon ---
    # Create an efficient mapping dictionary from survey round to its long-term target year.
    lt_year_map = contextual_data_df.set_index('survey_round')['lt_year']
    # Create a boolean mask to identify all rows with the 'LT' horizon.
    lt_mask = df['horizon'] == 'LT'
    # Use the map to fill in the 'target_year' for only the 'LT' rows.
    df.loc[lt_mask, 'target_year'] = df.loc[lt_mask, 'round'].map(lt_year_map)

    # --- Finalization ---
    # Drop the temporary helper column.
    df = df.drop(columns=['base_year'])
    # Convert the final 'target_year' to a nullable integer type for safety.
    df['target_year'] = df['target_year'].astype('Int64')

    # Validate that no nulls remain in the 'target_year' column.
    if df['target_year'].isnull().any():
        raise ValueError("Failed to compute 'target_year' for all rows. Null values remain.")

    # Log completion.
    logger.info("'target_year' column successfully added.")
    # Return the modified DataFrame.
    return df

# ------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# ------------------------------------------------------------------------------
def align_forecasts_for_scoring(
    ai_panel_medians_df: pd.DataFrame,
    human_benchmark_df: pd.DataFrame,
    realized_outcomes_df: pd.DataFrame,
    contextual_data_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Creates the master analysis DataFrame by aligning AI forecasts, human
    benchmarks, and realized outcomes.

    This function executes a critical data preparation pipeline:
    1.  Computes the `target_year` for all AI forecasts.
    2.  Joins the AI forecasts with the human expert panel medians.
    3.  Filters out forecasts for which no human benchmark is available (e.g., HICPX
        before 2016Q4).
    4.  Joins the aligned forecasts with the ground-truth realized outcomes.
    5.  Applies business logic to filter out-of-sample (OOS) forecasts that
        are not yet scoreable.

    Args:
        ai_panel_medians_df: DataFrame of AI median forecasts from Task 14.
        human_benchmark_df: The cleansed DataFrame of human median forecasts.
        realized_outcomes_df: The cleansed DataFrame of realized macro data.
        contextual_data_df: The cleansed DataFrame of contextual data.
        config: The study configuration dictionary.

    Returns:
        A tuple containing:
        - aligned_df (pd.DataFrame): The final, fully aligned analysis DataFrame.
        - report (Dict[str, Any]): A dictionary summarizing the alignment process.
    """
    # Log the start of the task.
    logger.info("Starting Task 15: Aligning all data sources for scoring.")

    # --- Step 1: Map Horizons to Target Years ---
    # Add the 'target_year' column to the AI medians DataFrame.
    df_with_target_year = _add_target_year(ai_panel_medians_df, contextual_data_df)

    # --- Step 2: Join to Human Benchmarks and Filter ---
    # Log the join operation.
    logger.info("Joining AI medians with human benchmarks...")
    # Perform a left merge to bring in the human median for each forecast.
    aligned_df = pd.merge(
        df_with_target_year,
        human_benchmark_df.rename(columns={'value': 'human_median'}),
        on=['round', 'variable', 'horizon', 'target_year'],
        how='left',
        validate='one_to_one'
    )

    # Filter out rows where no human benchmark exists. This correctly handles
    # the HICPX availability rule.
    rows_before_filter = len(aligned_df)
    aligned_df.dropna(subset=['human_median'], inplace=True)
    rows_after_filter = len(aligned_df)
    logger.info(f"Dropped {rows_before_filter - rows_after_filter} rows with no human benchmark.")

    # --- Step 3: Join to Realized Outcomes and Apply OOS Filter ---
    # Log the join operation.
    logger.info("Joining forecasts with realized outcomes...")
    # Perform a left merge to bring in the ground-truth realized value for each forecast.
    aligned_df = pd.merge(
        aligned_df,
        realized_outcomes_df.rename(columns={'reference_year': 'target_year', 'value': 'realized_value'}),
        on=['target_year', 'variable'],
        how='left',
        validate='many_to_one' # Multiple forecasts can map to the same realized outcome.
    )

    # Create the 'period' column to distinguish in-sample from out-of-sample rounds.
    oos_rounds = set(config['phase_2_parameters']['oos_and_availability']['oos_rounds'])
    aligned_df['period'] = aligned_df['round'].apply(
        lambda r: 'out-of-sample' if r in oos_rounds else 'in-sample'
    )

    # Apply the OOS horizon filter: only keep specified horizons for OOS rounds.
    oos_horizons = set(config['phase_2_parameters']['oos_and_availability']['oos_scored_horizons'])
    # Create a mask to identify rows that are OOS and have a disallowed horizon.
    oos_filter_mask = (
        (aligned_df['period'] == 'out-of-sample') &
        (~aligned_df['horizon'].isin(oos_horizons))
    )
    rows_before_oos_filter = len(aligned_df)
    # Apply the filter by keeping all rows that are NOT in the filter mask.
    aligned_df = aligned_df[~oos_filter_mask].copy()
    rows_after_oos_filter = len(aligned_df)
    logger.info(f"Dropped {rows_before_oos_filter - rows_after_oos_filter} OOS rows with unscorable horizons.")

    # --- Report Generation ---
    # Compile the final report.
    report = {
        "initial_ai_medians": len(df_with_target_year),
        "rows_after_human_join": rows_after_filter,
        "rows_after_realized_join": len(aligned_df),
        "in_sample_rows": (aligned_df['period'] == 'in-sample').sum(),
        "out_of_sample_rows": (aligned_df['period'] == 'out-of-sample').sum(),
        "scoreable_rows": aligned_df['realized_value'].notna().sum(),
    }

    # Log the completion of the task.
    logger.info("Alignment complete. Master analysis DataFrame is ready.")

    # Return the final aligned DataFrame and the summary report.
    return aligned_df, report


In [None]:
# Task 16 — Compute AI persona dispersion (within-round disagreement)

# ==============================================================================
# Task 16: Compute AI persona dispersion (within-round disagreement)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_ai_persona_dispersion(
    unified_forecasts_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Computes and summarizes the dispersion of AI persona forecasts.

    This function quantifies the level of disagreement within the AI panel.
    It follows a three-step process:
    1.  Filters for individual forecasts generated by the 'persona' arm.
    2.  For each forecast instance (round, variable, horizon), it calculates
        two measures of dispersion across all personas: the Interquartile Range (IQR)
        and the population Standard Deviation (SD).
    3.  It then summarizes these per-round dispersion metrics by computing their
        median across all rounds for each (variable, horizon) pair.

    This produces a summary table that directly corresponds to the AI panel
    portion of Table 3 in the source paper.

    Args:
        unified_forecasts_df: The clean, consolidated DataFrame of all forecasts
                              from Task 13.

    Returns:
        A tuple containing:
        - ai_dispersion_summary_df (pd.DataFrame): A summary DataFrame with the
          median IQR and SD for each (variable, horizon) pair.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the task.
    logger.info("Starting Task 16: Compute AI persona forecast dispersion.")

    # --- Input Validation ---
    # Ensure the input DataFrame is not empty and has the required columns.
    required_cols = {'source_type', 'round', 'variable', 'horizon', 'value'}
    if not required_cols.issubset(unified_forecasts_df.columns):
        raise ValueError(f"Input DataFrame is missing required columns. Expected: {required_cols}")

    # --- Step 1: Persona Forecast Extraction ---
    # Filter the unified DataFrame to select only the forecasts from the 'persona' arm.
    persona_forecasts = unified_forecasts_df[unified_forecasts_df['source_type'] == 'persona'].copy()

    # Check if there is any data to process.
    if persona_forecasts.empty:
        logger.warning("No 'persona' forecasts found. Cannot compute dispersion.")
        # Return an empty DataFrame and a corresponding report.
        return pd.DataFrame(), {"status": "SKIPPED_NO_DATA"}

    # --- Step 2: IQR and Standard Deviation Computation (per round) ---
    # Define custom aggregation functions for IQR and population standard deviation.
    # Using numpy ensures precise control over the statistical calculations.
    def iqr_func(x: pd.Series) -> float:
        # Equation: IQR = q_75(x) - q_25(x)
        # Calculate the 75th and 25th percentiles.
        # `interpolation='linear'` is used for consistency with pandas' default.
        return np.percentile(x, 75, method='linear') - np.percentile(x, 25, method='linear')

    def pop_std_func(x: pd.Series) -> float:
        # Equation: SD = sqrt( (1/K) * Σ(y_i - ȳ)^2 )
        # Calculate the population standard deviation.
        # `np.std` uses ddof=0 by default, which is the correct population formula.
        return np.std(x)

    # Group by each forecast instance and apply the aggregation functions.
    logger.info("Calculating per-round dispersion metrics (IQR and SD)...")
    per_round_dispersion = persona_forecasts.groupby(['round', 'variable', 'horizon']).agg(
        # Apply the custom IQR function to the 'value' column of each group.
        iqr=('value', iqr_func),
        # Apply the custom population standard deviation function.
        std_dev=('value', pop_std_func),
        # Also count the number of personas in each group for quality control.
        sample_size=('value', 'size')
    ).reset_index()

    # --- Step 3: Summarize Across Rounds (Median) ---
    # Group the per-round statistics by variable and horizon.
    logger.info("Summarizing dispersion metrics across all rounds by taking the median...")
    ai_dispersion_summary_df = per_round_dispersion.groupby(['variable', 'horizon']).agg(
        # Compute the median of the IQR values across all rounds for each group.
        # Equation: median_r { IQR_rvh }
        median_iqr_ai=('iqr', 'median'),
        # Compute the median of the standard deviation values.
        # Equation: median_r { SD_rvh }
        median_sd_ai=('std_dev', 'median')
    ).reset_index()

    # --- Final Formatting ---
    # Round the final values to match the precision in the paper's Table 3.
    ai_dispersion_summary_df = ai_dispersion_summary_df.round(3)

    # --- Report Generation ---
    # Compile a summary report of the process.
    report = {
        "total_persona_forecasts": len(persona_forecasts),
        "num_groups_analyzed": len(per_round_dispersion),
        "mean_personas_per_group": per_round_dispersion['sample_size'].mean(),
        "final_summary_rows": len(ai_dispersion_summary_df),
    }

    # Log the completion of the task.
    logger.info("Successfully computed AI persona dispersion summary.")

    # Return the final summary DataFrame and the report.
    return ai_dispersion_summary_df, report


In [None]:
# Task 17 — Compute human respondent dispersion (within-round disagreement)

# ==============================================================================
# Task 17: Compute human respondent dispersion (within-round disagreement)
# ==============================================================================

# ------------------------------------------------------------------------------
# Prerequisite Helpers (Reused from Task 16 for methodological consistency)
# ------------------------------------------------------------------------------
def _iqr_func(x: Union[pd.Series, np.ndarray]) -> float:
    """
    Calculates the Interquartile Range (IQR) of a numerical series.

    This function computes the difference between the 75th and 25th percentiles
    of the input data. It is designed to be a robust aggregation function for use
    in pandas `groupby().agg()` operations.

    **Equation from LaTeX Context:**
    This function implements the standard definition of IQR:
    IQR = q_75(x) - q_25(x)

    Args:
        x: A pandas Series or numpy array of numerical data for which to
           calculate the IQR.

    Returns:
        The calculated Interquartile Range as a float. Returns np.nan if the
        input series is empty or contains only NaNs.

    Raises:
        TypeError: If the input data contains non-numeric types that cannot be
                   processed by numpy.percentile.
    """
    # --- Input Validation ---
    # Check if the input series is empty after dropping NaNs.
    if x.dropna().empty:
        # If so, return NaN as the IQR is undefined.
        return np.nan

    # --- Percentile Calculation ---
    # Calculate the 75th percentile (third quartile) of the data.
    # `method='linear'` is specified for linear interpolation between data points,
    # ensuring consistency with standard statistical software like pandas.
    q75 = np.percentile(x.dropna(), 75, method='linear')

    # Calculate the 25th percentile (first quartile) of the data.
    q25 = np.percentile(x.dropna(), 25, method='linear')

    # --- IQR Computation ---
    # Compute the final IQR by subtracting the 25th percentile from the 75th.
    iqr = q75 - q25

    # Return the result as a float.
    return float(iqr)


def _pop_std_func(x: Union[pd.Series, np.ndarray]) -> float:
    """
    Calculates the Population Standard Deviation of a numerical series.

    This function computes the standard deviation for an entire population,
    using N in the denominator. This is distinct from the sample standard
    deviation, which uses N-1. This choice is made to ensure direct,
    methodologically consistent comparison between the AI panel (a full
    population of personas) and the human panel (treated as a population
    for comparison purposes).

    **Equation from LaTeX Context:**
    This function implements the population standard deviation:
    SD = sqrt( (1/N) * Σ(y_i - ȳ)^2 )

    Args:
        x: A pandas Series or numpy array of numerical data.

    Returns:
        The calculated population standard deviation as a float. Returns np.nan
        if the input series has fewer than one element after dropping NaNs.

    Raises:
        TypeError: If the input data contains non-numeric types that cannot be
                   processed by numpy.std.
    """
    # --- Input Validation ---
    # Check if the input series is empty after dropping NaNs.
    if x.dropna().empty:
        # The standard deviation of an empty set is undefined.
        return np.nan

    # --- Population Standard Deviation Computation ---
    # Calculate the population standard deviation using numpy's `std` function.
    # By default, `np.std` sets the delta degrees of freedom (ddof) to 0,
    # which correctly calculates the population standard deviation by dividing by N.
    std_dev = np.std(x.dropna())

    # Return the result as a float.
    return float(std_dev)

# ------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_human_dispersion_and_compare(
    human_micro_df: pd.DataFrame,
    ai_dispersion_summary_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Computes human panel dispersion and creates a comparison table against AI dispersion.

    This function calculates the same dispersion metrics (IQR and population SD)
    as in Task 16, but for the human expert panel using the micro-data. It then
    merges the summarized results with the AI dispersion summary to produce a
    final comparison table that replicates the structure of Table 3 in the paper.

    Args:
        human_micro_df: The cleansed DataFrame of individual human forecasts.
        ai_dispersion_summary_df: The summary DataFrame of AI persona dispersion
                                  from Task 16.

    Returns:
        A tuple containing:
        - dispersion_comparison_df (pd.DataFrame): The final comparison table.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the task.
    logger.info("Starting Task 17: Compute human respondent forecast dispersion.")

    # --- Input Validation ---
    # Ensure input DataFrames are not empty and have required columns.
    required_micro_cols = {'round', 'variable', 'horizon', 'forecaster_id', 'value'}
    if not required_micro_cols.issubset(human_micro_df.columns):
        raise ValueError(f"Human micro DataFrame is missing required columns. Expected: {required_micro_cols}")
    if ai_dispersion_summary_df.empty:
        raise ValueError("AI dispersion summary DataFrame cannot be empty.")

    # --- Step 1: Human Micro-Data Extraction and Validation ---
    # Create a clean copy, dropping any rows with null forecast values.
    clean_micro_df = human_micro_df.dropna(subset=['value']).copy()

    # Validate that there are no duplicate forecasts from the same forecaster for the same item.
    key_cols = ['round', 'variable', 'horizon', 'forecaster_id']
    if clean_micro_df.duplicated(subset=key_cols).any():
        raise ValueError("Found duplicate entries in human micro data.")

    # Check if there is any data to process.
    if clean_micro_df.empty:
        logger.warning("No valid human micro-data found. Cannot compute dispersion.")
        return pd.DataFrame(), {"status": "SKIPPED_NO_DATA"}

    # --- Step 2: Human Panel IQR and SD Computation (per round) ---
    # Group by each forecast instance and apply the identical aggregation functions from Task 16.
    logger.info("Calculating per-round dispersion metrics (IQR and SD) for human panel...")
    per_round_human_dispersion = clean_micro_df.groupby(['round', 'variable', 'horizon']).agg(
        # Apply the custom IQR function.
        iqr=('value', _iqr_func),
        # Apply the custom population standard deviation function for consistency.
        std_dev=('value', _pop_std_func),
        # Count the number of human respondents in each group.
        sample_size=('value', 'size')
    ).reset_index()

    # --- Step 3: Human Dispersion Summarization ---
    # Group the per-round statistics by variable and horizon and compute the median.
    logger.info("Summarizing human dispersion metrics across all rounds by taking the median...")
    human_dispersion_summary_df = per_round_human_dispersion.groupby(['variable', 'horizon']).agg(
        # Compute the median of the IQR values across all rounds.
        median_iqr_human=('iqr', 'median'),
        # Compute the median of the standard deviation values.
        median_sd_human=('std_dev', 'median')
    ).reset_index()

    # --- Final Step: Create Comparison Table ---
    # Merge the human summary with the AI summary to create the final table.
    logger.info("Merging AI and human dispersion summaries into a final comparison table.")
    dispersion_comparison_df = pd.merge(
        ai_dispersion_summary_df,
        human_dispersion_summary_df,
        on=['variable', 'horizon'],
        how='left' # Left join to preserve all AI results, even if human data is missing.
    )

    # --- Final Formatting and Report Generation ---
    # Round the final values to match the precision in the paper's Table 3.
    dispersion_comparison_df = dispersion_comparison_df.round(3)

    # Compile a summary report.
    report = {
        "total_human_forecasts": len(clean_micro_df),
        "num_human_groups_analyzed": len(per_round_human_dispersion),
        "mean_respondents_per_group": per_round_human_dispersion['sample_size'].mean(),
        "final_comparison_rows": len(dispersion_comparison_df),
    }

    # Log the completion of the task.
    logger.info("Successfully computed human dispersion and created comparison table.")

    # Return the final comparison DataFrame and the report.
    return dispersion_comparison_df, report


In [None]:
# Task 18 — Compute absolute errors (AI and Human vs. realized)

# ==============================================================================
# Task 18: Compute absolute errors (AI and Human vs. realized)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_absolute_errors(
    aligned_forecasts_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Computes the absolute forecast errors for both AI and human panels.

    This function takes the fully aligned DataFrame of forecasts and realized
    outcomes and calculates the absolute error for each forecast instance. It
    filters the data to include only "scoreable" forecasts (i.e., those for
    which a realized outcome is available) and computes the error for both the
    AI panel median and the human panel median.

    Args:
        aligned_forecasts_df: The master analysis DataFrame from Task 15,
                              containing AI medians, human medians, and
                              realized outcomes.

    Returns:
        A tuple containing:
        - scored_forecasts_df (pd.DataFrame): A DataFrame containing only the
          scoreable forecasts with the new absolute error columns.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the task.
    logger.info("Starting Task 18: Compute absolute forecast errors.")

    # --- Input Validation ---
    # Define the set of columns required for this operation.
    required_cols = {
        'ai_median_persona', 'human_median', 'realized_value'
    }
    # Check if all required columns are present in the input DataFrame.
    if not required_cols.issubset(aligned_forecasts_df.columns):
        # Raise a ValueError if any required columns are missing.
        raise ValueError(f"Input DataFrame is missing required columns. Expected: {required_cols}")

    # --- Filter to Scoreable Forecasts ---
    # A forecast is "scoreable" only if a corresponding realized_value exists.
    # Create a boolean mask to identify rows where 'realized_value' is not null.
    scoreable_mask = aligned_forecasts_df['realized_value'].notna()
    # Apply the mask to create a new DataFrame containing only the scoreable rows.
    scored_df = aligned_forecasts_df[scoreable_mask].copy()

    # Check if there is any data left to score.
    if scored_df.empty:
        # Log a warning if no scoreable rows were found.
        logger.warning("No scoreable forecasts found (no realized outcomes available). Cannot compute errors.")
        # Return an empty DataFrame and a corresponding report.
        return pd.DataFrame(), {"status": "SKIPPED_NO_SCOREABLE_DATA"}

    # --- Step 1: Compute Absolute Error for AI Persona Median ---
    # This is a vectorized operation for high performance.
    # Equation: e_rvh^AI = |ŷ_rvh^AI - y_rvh|
    scored_df['abs_error_ai'] = (scored_df['ai_median_persona'] - scored_df['realized_value']).abs()

    # --- Step 2: Compute Absolute Error for Human Median ---
    # The same operation is applied to the human median forecasts.
    # Equation: e_rvh^H = |ŷ_rvh^SPF - y_rvh|
    scored_df['abs_error_human'] = (scored_df['human_median'] - scored_df['realized_value']).abs()

    # --- Step 3: Final Validation and DataFrame Persistence ---
    # Verify that the new error columns do not contain any null values.
    # This confirms the integrity of the calculation.
    if scored_df[['abs_error_ai', 'abs_error_human']].isnull().any().any():
        # Raise an error if any nulls are found, as this indicates a logical flaw.
        raise RuntimeError("Null values found in computed error columns. This should not happen.")

    # Log the successful computation of errors.
    logger.info(f"Successfully computed absolute errors for {len(scored_df)} scoreable forecasts.")

    # --- Report Generation ---
    # Compile a summary report of the process.
    report = {
        "total_aligned_forecasts": len(aligned_forecasts_df),
        "scoreable_forecasts": len(scored_df),
        "unscoreable_forecasts (no realized value)": len(aligned_forecasts_df) - len(scored_df),
        "mean_abs_error_ai": scored_df['abs_error_ai'].mean(),
        "mean_abs_error_human": scored_df['abs_error_human'].mean(),
    }

    # Log the completion of the task.
    logger.info("Absolute error computation complete.")

    # Return the final DataFrame with error columns and the summary report.
    return scored_df, report


In [None]:
# Task 19 — Compute MAE (by variable and horizon)

# ==============================================================================
# Task 19: Compute MAE (by variable and horizon)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_mae_results(
    scored_forecasts_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Computes Mean Absolute Error (MAE) for AI and human panels.

    This function calculates the primary forecast accuracy metric, MAE, for both
    the AI and human panels. It partitions the data by period (in-sample vs.
    out-of-sample), variable, and forecast horizon, then computes the mean of
    the absolute errors for each group. The final output is a tidy DataFrame
    that corresponds to the data presented in Table 4 of the source paper.

    Args:
        scored_forecasts_df: The DataFrame from Task 18, containing scoreable
                             forecasts and their absolute errors.

    Returns:
        A tuple containing:
        - mae_results_df (pd.DataFrame): A tidy DataFrame with the MAE results
          for each forecast group.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the MAE computation task.
    logger.info("Starting Task 19: Compute Mean Absolute Error (MAE) results.")

    # --- Input Validation ---
    # Define the set of columns required for this operation.
    required_cols = {'period', 'variable', 'horizon', 'abs_error_ai', 'abs_error_human'}
    # Check if all required columns are present in the input DataFrame.
    if not required_cols.issubset(scored_forecasts_df.columns):
        # Raise a ValueError if any required columns are missing.
        raise ValueError(f"Input DataFrame is missing required columns. Expected: {required_cols}")

    # Check if there is any data to process.
    if scored_forecasts_df.empty:
        # Log a warning if the input DataFrame is empty.
        logger.warning("Scored forecasts DataFrame is empty. Cannot compute MAE.")
        # Return an empty DataFrame and a corresponding report.
        return pd.DataFrame(), {"status": "SKIPPED_NO_DATA"}

    # --- Step 1 & 2: Grouping and MAE Computation ---
    # Define the columns to group by for the analysis.
    grouping_cols = ['period', 'variable', 'horizon']

    # Log the start of the aggregation process.
    logger.info(f"Grouping by {grouping_cols} and computing MAE...")

    # Perform the groupby and aggregation in a single, efficient operation.
    mae_results_df = scored_forecasts_df.groupby(grouping_cols).agg(
        # Compute the MAE for the AI panel by taking the mean of its absolute errors.
        # Equation: MAE_vh^AI = (1/n_vh) * Σ e_rvh^AI
        mae_ai=('abs_error_ai', 'mean'),

        # Compute the MAE for the human panel similarly.
        # Equation: MAE_vh^H = (1/n_vh) * Σ e_rvh^H
        mae_human=('abs_error_human', 'mean'),

        # Count the number of observations (rounds) in each group for reference.
        n_vh=('abs_error_ai', 'size')
    ).reset_index() # Convert the grouped output back into a DataFrame.

    # --- Step 3: Add Comparison Flags and Format ---
    # Determine the better-performing panel for each group based on the lower MAE.
    # This flag will be used for formatting (e.g., bolding) in the final table.
    mae_results_df['ai_is_better'] = mae_results_df['mae_ai'] < mae_results_df['mae_human']

    # Round the MAE values to two decimal places to match the paper's presentation.
    mae_results_df['mae_ai'] = mae_results_df['mae_ai'].round(2)
    mae_results_df['mae_human'] = mae_results_df['mae_human'].round(2)

    # --- Report Generation ---
    # Compile a summary report of the MAE computation.
    report = {
        "total_groups_analyzed": len(mae_results_df),
        "in_sample_groups": (mae_results_df['period'] == 'in-sample').sum(),
        "out_of_sample_groups": (mae_results_df['period'] == 'out-of-sample').sum(),
        "ai_wins": int(mae_results_df['ai_is_better'].sum()),
        "human_wins": int((mae_results_df['mae_human'] < mae_results_df['mae_ai']).sum()),
        "ties": int((mae_results_df['mae_human'] == mae_results_df['mae_ai']).sum()),
    }

    # Log the completion of the task.
    logger.info("Successfully computed MAE results.")

    # Return the final tidy DataFrame of MAE results and the summary report.
    return mae_results_df, report


In [None]:
# Task 20 — Construct win-share statistics

# ==============================================================================
# Task 20: Construct win-share statistics
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 20, Step 1 Helper: Define win indicator per match
# ------------------------------------------------------------------------------
def _add_win_indicators(
    scored_forecasts_df: pd.DataFrame,
    tolerance: float = 1e-9
) -> pd.DataFrame:
    """
    Adds win, loss, and tie indicators to the scored forecasts DataFrame.

    This function compares the absolute errors of the AI and human panels for
    each forecast match and adds boolean columns to indicate the outcome.
    It uses a numerical tolerance for equality checks to robustly handle
    floating-point precision.

    Args:
        scored_forecasts_df: The DataFrame containing absolute errors for both panels.
        tolerance: The tolerance for considering two errors as a tie.

    Returns:
        The input DataFrame with three new boolean columns: 'ai_wins',
        'human_wins', and 'is_tie'.
    """
    # Create a copy to avoid modifying the original DataFrame.
    df = scored_forecasts_df.copy()

    # --- Input Validation ---
    # Ensure the required error columns are present.
    if not {'abs_error_ai', 'abs_error_human'}.issubset(df.columns):
        raise ValueError("Input DataFrame must contain 'abs_error_ai' and 'abs_error_human' columns.")

    # --- Win/Loss/Tie Calculation ---
    # Determine if the AI's error is strictly less than the human's error.
    # Equation: win_rvh = 1{e_rvh^AI < e_rvh^H}
    df['ai_wins'] = df['abs_error_ai'] < df['abs_error_human']

    # Determine if the human's error is strictly less than the AI's error.
    df['human_wins'] = df['abs_error_human'] < df['abs_error_ai']

    # Determine if the errors are a tie, using a numerical tolerance for robust comparison.
    # This is more robust than a direct `==` comparison with floating-point numbers.
    df['is_tie'] = np.isclose(df['abs_error_ai'], df['abs_error_human'], atol=tolerance)

    # --- Integrity Check ---
    # The three outcomes (AI win, human win, tie) must be mutually exclusive and exhaustive.
    # We adjust the win/loss flags to ensure strict inequality, respecting the tie definition.
    df.loc[df['is_tie'], ['ai_wins', 'human_wins']] = False

    # Final validation: for each row, exactly one of the three flags must be True.
    outcome_sum = df[['ai_wins', 'human_wins', 'is_tie']].sum(axis=1)
    if not (outcome_sum == 1).all():
        raise RuntimeError("Win/loss/tie indicators are not mutually exclusive and exhaustive.")

    # Return the DataFrame with the new indicator columns.
    return df

# ------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# ------------------------------------------------------------------------------
def construct_win_share_statistics(
    scored_forecasts_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Constructs win-share statistics by aggregating individual match outcomes.

    This function first determines the winner for each forecast match (AI win,
    human win, or tie) and then aggregates these outcomes to compute the
    overall win-share for the AI panel in each forecast category (period,
    variable, horizon). The win-share is calculated excluding ties, as per
    the paper's methodology.

    Args:
        scored_forecasts_df: The DataFrame from Task 18, containing scoreable
                             forecasts and their absolute errors.

    Returns:
        A tuple containing:
        - win_share_df (pd.DataFrame): A tidy DataFrame with the win-share
          statistics for each forecast group, ready for hypothesis testing.
        - report (Dict[str, Any]): A dictionary summarizing the process.
    """
    # Log the start of the task.
    logger.info("Starting Task 20: Construct win-share statistics.")

    # --- Input Validation ---
    # Check if the input DataFrame is empty.
    if scored_forecasts_df.empty:
        logger.warning("Scored forecasts DataFrame is empty. Cannot construct win-share stats.")
        return pd.DataFrame(), {"status": "SKIPPED_NO_DATA"}

    # --- Step 1: Define Win Indicator Per Match ---
    # Call the helper to add the 'ai_wins', 'human_wins', and 'is_tie' columns.
    df_with_indicators = _add_win_indicators(scored_forecasts_df)

    # --- Step 2: Aggregate Wins and Compute Win-Share ---
    # Define the columns to group by for the analysis.
    grouping_cols = ['period', 'variable', 'horizon']

    # Log the aggregation step.
    logger.info(f"Grouping by {grouping_cols} and aggregating win/loss/tie counts...")

    # Perform the groupby and aggregation.
    win_share_df = df_with_indicators.groupby(grouping_cols).agg(
        # Count the number of AI wins in each group by summing the boolean column.
        W_vh=('ai_wins', 'sum'),
        # Count the number of human wins.
        human_wins=('human_wins', 'sum'),
        # Count the number of ties.
        ties=('is_tie', 'sum'),
        # Get the total number of matches in the group, including ties.
        total_matches=('round', 'size')
    ).reset_index()

    # Calculate the denominator for the win-share calculation (total non-tie matches).
    # Equation: n_vh = (number of AI wins) + (number of human wins)
    win_share_df['n_vh'] = win_share_df['W_vh'] + win_share_df['human_wins']

    # Calculate the AI win-share.
    # Equation: w_vh = W_vh / n_vh
    # Use a where clause to handle the edge case of division by zero.
    win_share_df['win_share'] = (
        win_share_df['W_vh'] / win_share_df['n_vh']
    ).where(win_share_df['n_vh'] > 0, np.nan)

    # --- Report Generation ---
    # Compile a summary report.
    total_ties = int(win_share_df['ties'].sum())
    total_matches = int(win_share_df['total_matches'].sum())
    report = {
        "total_matches_analyzed": total_matches,
        "total_ties": total_ties,
        "tie_ratio": total_ties / total_matches if total_matches > 0 else 0,
        "total_groups": len(win_share_df),
    }

    # Log the completion of the task.
    logger.info(f"Successfully constructed win-share statistics. Found {total_ties} ties ({report['tie_ratio']:.2%}).")

    # Return the final DataFrame of win-share statistics and the summary report.
    return win_share_df, report


In [None]:
# Task 21 — In-sample Monte Carlo hypothesis tests

# ==============================================================================
# Task 21: In-sample Monte Carlo hypothesis tests
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 21, Step 1 & 2 Helper: Run a single Monte Carlo test
# ------------------------------------------------------------------------------
def _run_single_mc_test(
    observed_wins: int,
    num_trials: int,
    num_simulations: int
) -> Dict[str, float]:
    """
    Runs a single Monte Carlo simulation to calculate p-values for a win-share result.

    This function generates a null distribution based on the binomial distribution
    (B(n, 0.5)) and then calculates the one-tailed and two-tailed empirical
    p-values by comparing the observed number of wins to this simulated distribution.

    Args:
        observed_wins: The actual number of wins observed (W_vh).
        num_trials: The total number of non-tie matches (n_vh).
        num_simulations: The number of Monte Carlo simulations to run (N).

    Returns:
        A dictionary containing the one-tailed and two-tailed p-values.
    """
    # --- Input Validation ---
    # Ensure the inputs are valid for a binomial simulation.
    if not (isinstance(observed_wins, (int, np.integer)) and isinstance(num_trials, (int, np.integer))):
        raise TypeError("observed_wins and num_trials must be integers.")
    if observed_wins > num_trials or observed_wins < 0:
        raise ValueError("observed_wins must be between 0 and num_trials.")

    # --- Step 1: Generate Null Distribution ---
    # Generate the null distribution by drawing samples from a binomial distribution.
    # H0: W_vh ~ Binom(n_vh, 0.5)
    # This simulates the number of wins we would expect to see by chance if both
    # panels had an equal probability of winning each match.
    null_distribution = np.random.binomial(n=num_trials, p=0.5, size=num_simulations)

    # --- Step 2: Compute P-Values ---
    # Calculate the one-tailed p-value.
    # This is the proportion of simulated outcomes that are as extreme or more
    # extreme (>=) than the observed number of wins.
    # Equation: p_vh^(1) = (1/N) * Σ 1{W_j^* >= W_vh}
    p_one_tailed = np.mean(null_distribution >= observed_wins)

    # Calculate the two-tailed p-value.
    # This requires calculating the probability in both tails.
    # Equation: p_vh^(2) = 2 * min( P(W* >= W_vh), P(W* <= W_vh) )
    p_lower_tail = np.mean(null_distribution <= observed_wins)
    # The two-tailed p-value is twice the smaller of the two tail probabilities.
    p_two_tailed = 2 * min(p_one_tailed, p_lower_tail)

    # Return the computed p-values.
    return {
        "p_one_tailed": p_one_tailed,
        "p_two_tailed": p_two_tailed,
    }

# ------------------------------------------------------------------------------
# Task 21, Step 3 Helper: Classify significance
# ------------------------------------------------------------------------------
def _classify_significance(p_value: float) -> str:
    """
    Applies significance star notation based on a p-value.

    Args:
        p_value: The one-tailed p-value to classify.

    Returns:
        A string with the corresponding significance stars ('***', '**', '*', or '').
    """
    # Check for NaN or invalid p-values.
    if pd.isna(p_value):
        return ""
    # Apply the classification rules from the paper.
    if p_value <= 0.01:
        return '***'
    elif p_value <= 0.05: # Note: paper says p < 0.05, but standard is <=
        return '**'
    elif p_value <= 0.10:
        return '*'
    else:
        return ''

# ------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# ------------------------------------------------------------------------------
def run_in_sample_mc_tests(
    win_share_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Performs Monte Carlo hypothesis tests for in-sample win-share statistics.

    This function isolates the in-sample results, then for each forecast category,
    it runs a Monte Carlo simulation to generate a null distribution and calculate
    empirical p-values. It then adds these p-values and corresponding significance
    stars back to the main win-share DataFrame.

    Args:
        win_share_df: The DataFrame of win-share statistics from Task 20.
        config: The study configuration dictionary.

    Returns:
        A tuple containing:
        - The updated win_share_df with new columns for p-values and significance.
        - A dictionary summarizing the testing process.
    """
    # Log the start of the task.
    logger.info("Starting Task 21: In-sample Monte Carlo hypothesis tests.")

    # --- Configuration and Initialization ---
    # Extract simulation parameters from the config.
    mc_config = config['phase_2_parameters']['scoring_and_inference']
    num_simulations = mc_config['monte_carlo_simulations']
    seed = mc_config['monte_carlo_seed']

    # Set the random seed for reproducibility of the entire simulation process.
    np.random.seed(seed)

    # Filter the DataFrame to include only the in-sample results.
    in_sample_df = win_share_df[win_share_df['period'] == 'in-sample'].copy()

    # Check if there is any data to process.
    if in_sample_df.empty:
        logger.warning("No in-sample data found. Skipping Monte Carlo tests.")
        return win_share_df, {"status": "SKIPPED_NO_IN_SAMPLE_DATA"}

    # --- Run Simulations for Each Group ---
    # Initialize a list to store the results of each test.
    results = []
    # Log the start of the simulations.
    logger.info(f"Running {num_simulations} simulations for {len(in_sample_df)} in-sample groups...")

    # Iterate through each row (each forecast group) in the in-sample data.
    for _, row in in_sample_df.iterrows():
        # Run the Monte Carlo test for the current group.
        p_values = _run_single_mc_test(
            observed_wins=int(row['W_vh']),
            num_trials=int(row['n_vh']),
            num_simulations=num_simulations
        )
        # Store the results along with the identifying keys.
        results.append({
            'period': row['period'],
            'variable': row['variable'],
            'horizon': row['horizon'],
            **p_values
        })

    # Convert the list of results into a DataFrame.
    p_values_df = pd.DataFrame(results)

    # --- Step 3: Classify Significance and Merge Results ---
    # Apply the significance classification to the one-tailed p-values.
    p_values_df['significance'] = p_values_df['p_one_tailed'].apply(_classify_significance)

    # Merge the p-values and significance stars back into the original win_share_df.
    # A left merge ensures that all original rows are kept.
    updated_win_share_df = pd.merge(
        win_share_df,
        p_values_df,
        on=['period', 'variable', 'horizon'],
        how='left'
    )

    # --- Report Generation ---
    # Compile a summary report.
    report = {
        "groups_tested": len(p_values_df),
        "num_simulations_per_group": num_simulations,
        "random_seed_used": seed,
    }

    # Log the completion of the task.
    logger.info("In-sample Monte Carlo hypothesis tests complete.")

    # Return the updated DataFrame and the summary report.
    return updated_win_share_df, report


In [None]:
# Task 22 — Out-of-sample exact Binomial hypothesis tests

# ==============================================================================
# Task 22: Out-of-sample exact Binomial hypothesis tests
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 22, Step 2 Helper: Run a single exact Binomial test
# ------------------------------------------------------------------------------
def _run_single_exact_binomial_test(
    observed_wins: int,
    num_trials: int
) -> Dict[str, float]:
    """
    Runs a single exact binomial test to calculate p-values for a win-share result.

    This function is used for small sample sizes where the binomial distribution
    can be calculated directly, avoiding the need for simulation.

    Args:
        observed_wins: The actual number of wins observed (W_vh).
        num_trials: The total number of non-tie matches (n_vh).

    Returns:
        A dictionary containing the one-tailed and two-tailed p-values.
    """
    # --- Input Validation ---
    # If there are no trials, the test is undefined.
    if num_trials == 0:
        return {"p_one_tailed": np.nan, "p_two_tailed": np.nan}
    if not (isinstance(observed_wins, (int, np.integer)) and isinstance(num_trials, (int, np.integer))):
        raise TypeError("observed_wins and num_trials must be integers.")
    if observed_wins > num_trials or observed_wins < 0:
        raise ValueError("observed_wins must be between 0 and num_trials.")

    # --- P-Value Computation ---
    # Define the null hypothesis: W ~ Binom(n, p=0.5).
    # Calculate the one-tailed p-value: P(W >= observed_wins).
    # This is computed using the survival function (sf), which is 1 - CDF(k-1).
    # Equation: p_vh^(1) = Pr{W >= W_vh}
    p_one_tailed = binom.sf(k=observed_wins - 1, n=num_trials, p=0.5)

    # Calculate the two-tailed p-value.
    # Equation: p_vh^(2) = 2 * min( Pr{W >= W_vh}, Pr{W <= W_vh} )
    # The lower-tail probability, P(W <= observed_wins), is the CDF.
    p_lower_tail = binom.cdf(k=observed_wins, n=num_trials, p=0.5)
    # The two-tailed p-value is twice the smaller of the two tail probabilities.
    p_two_tailed = 2 * min(p_one_tailed, p_lower_tail)
    # For discrete distributions, it's possible for 2*min > 1, so we cap it at 1.0.
    p_two_tailed = min(p_two_tailed, 1.0)

    # Return the computed p-values.
    return {
        "p_one_tailed": p_one_tailed,
        "p_two_tailed": p_two_tailed,
    }

# ------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# ------------------------------------------------------------------------------
def run_out_of_sample_exact_tests(
    win_share_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Performs exact binomial hypothesis tests for out-of-sample win-share statistics.

    This function isolates the out-of-sample (OOS) results, which have small
    sample sizes, and applies the statistically appropriate exact binomial test
    to calculate p-values. It then merges these results back into the main
    win-share DataFrame.

    Args:
        win_share_df: The DataFrame of win-share statistics, potentially already
                      containing p-values for the in-sample period.

    Returns:
        A tuple containing:
        - The updated win_share_df with OOS p-values and significance populated.
        - A dictionary summarizing the testing process.
    """
    # Log the start of the task.
    logger.info("Starting Task 22: Out-of-sample exact binomial hypothesis tests.")

    # --- Step 1: Isolate Out-of-Sample Data ---
    # Filter the DataFrame to include only the out-of-sample results.
    oos_df = win_share_df[win_share_df['period'] == 'out-of-sample'].copy()

    # Check if there is any data to process.
    if oos_df.empty:
        logger.warning("No out-of-sample data found. Skipping exact binomial tests.")
        return win_share_df, {"status": "SKIPPED_NO_OOS_DATA"}

    # --- Step 2: Run Exact Tests for Each Group ---
    # Initialize a list to store the results of each test.
    results = []
    # Log the start of the tests.
    logger.info(f"Running exact binomial tests for {len(oos_df)} out-of-sample groups...")

    # Iterate through each row (each forecast group) in the OOS data.
    for _, row in oos_df.iterrows():
        # Run the exact binomial test for the current group.
        p_values = _run_single_exact_binomial_test(
            observed_wins=int(row['W_vh']),
            num_trials=int(row['n_vh'])
        )
        # Store the results along with the identifying keys.
        results.append({
            'period': row['period'],
            'variable': row['variable'],
            'horizon': row['horizon'],
            **p_values
        })

    # Convert the list of results into a DataFrame.
    p_values_df = pd.DataFrame(results)

    # --- Step 3: Classify Significance and Merge Results ---
    # Apply the significance classification to the one-tailed p-values.
    p_values_df['significance'] = p_values_df['p_one_tailed'].apply(_classify_significance)

    # Merge the OOS p-values back into the main win_share_df.
    # We use a left merge and then combine the columns to avoid overwriting
    # the in-sample results that may already be present.
    updated_win_share_df = pd.merge(
        win_share_df,
        p_values_df,
        on=['period', 'variable', 'horizon'],
        how='left',
        suffixes=('', '_oos') # Add suffix to new columns to avoid name clashes
    )

    # Coalesce the results from the in-sample (MC) and out-of-sample (exact) tests
    # into the final p-value and significance columns.
    for col in ['p_one_tailed', 'p_two_tailed', 'significance']:
        # Use the value from the OOS test if the original is null, otherwise keep original.
        updated_win_share_df[col] = updated_win_share_df[col].fillna(updated_win_share_df[f'{col}_oos'])
        # Drop the temporary OOS column.
        updated_win_share_df = updated_win_share_df.drop(columns=[f'{col}_oos'])

    # --- Report Generation ---
    # Compile a summary report.
    report = {
        "groups_tested": len(p_values_df),
        "test_type": "Exact Binomial Test",
    }

    # Log the completion of the task.
    logger.info("Out-of-sample exact binomial hypothesis tests complete.")

    # Return the fully updated DataFrame and the summary report.
    return updated_win_share_df, report


In [None]:
# Task 23 — Create orchestrator for end-to-end pipeline execution

# ==============================================================================
# Task 23: Create orchestrator for end-to-end pipeline execution
# ==============================================================================

def run_full_replication_pipeline(
    data_paths: Dict[str, str],
    config: Dict[str, Any],
    output_dir: str,
    total_persona_rows: int,
    run_kappa_validation: bool = True
) -> Dict[str, Any]:
    """
    Orchestrates the full end-to-end replication of the research pipeline.

    This master function executes the entire sequence of 22 tasks described in
    the research paper, from initial raw data validation to final statistical
    hypothesis testing. It is designed for robustness, auditability, and
    reproducibility.

    **Pipeline Workflow:**
    1.  **Setup**: Creates a structured directory for all outputs and configures
        file-based logging for a persistent audit trail.
    2.  **Data Loading**: Loads all raw analytical datasets into memory.
    3.  **Phase I (Validation & Cleansing)**: Executes Tasks 1-4 to validate
        and cleanse all raw input data, creating analysis-ready files.
    4.  **Phase II (Persona Filtering)**: Executes Tasks 5-9, a sequential
        filtering pipeline to derive the final set of 2,368 personas. This
        includes an optional but critical kappa validation step.
    5.  **Phase III (Forecast Generation)**: Executes Tasks 10-12 to assemble
        all prompts and run the high-volume, asynchronous forecast generation
        for both the persona and baseline arms.
    6.  **Phase IV (Analysis & Scoring)**: Executes Tasks 13-22, a linear
        sequence of data analysis steps including aggregation, error calculation,
        and hypothesis testing.
    7.  **Artifact Persistence**: Saves all key final result tables (e.g., MAE,
        win-share) and a comprehensive JSON report of the entire run.

    Args:
        data_paths: A dictionary mapping logical data names (e.g., 'persona_hub',
                    'contextual_data') to their raw file paths.
        config: The main study configuration dictionary containing all parameters.
        output_dir: The root directory where all outputs (processed data,
                    results, checkpoints, logs) will be saved.
        total_persona_rows: The total number of rows in the raw persona hub file,
                            required for streaming validation.
        run_kappa_validation: A flag to enable/disable the optional (but
                              recommended) kappa validation step (Task 9).

    Returns:
        A dictionary containing all generated artifacts (key DataFrames) and
        a nested dictionary of all task-specific reports.

    Raises:
        ValueError: If initial data validation fails.
        Exception: If any subsequent pipeline step fails, the exception is logged
                   and re-raised to halt execution.
    """
    # --- Step 1: Environment Setup ---
    # Create the main output directory Path object.
    root_path = Path(output_dir)
    # Define paths for all subdirectories.
    processed_path = root_path / "processed"
    results_path = root_path / "results"
    checkpoints_path = root_path / "checkpoints"
    # Create all directories, including parents, if they don't already exist.
    for p in [processed_path, results_path, checkpoints_path]:
        p.mkdir(parents=True, exist_ok=True)

    # Configure logging to write to a file for a persistent audit trail.
    # This provides a complete record of the pipeline's execution.
    log_file_path = root_path / "pipeline_run.log"
    # Create a file handler.
    file_handler = logging.FileHandler(log_file_path, mode='w') # Overwrite log on each run
    # Set a clear format for the log messages.
    file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s'))
    # Add the handler to the root logger.
    logging.getLogger().addHandler(file_handler)

    # Initialize a dictionary to store all intermediate and final data artifacts.
    artifacts: Dict[str, Any] = {}
    # Initialize a dictionary to store the summary report from each task.
    all_reports: Dict[str, Any] = {}
    # Log the start of the pipeline execution.
    logger.info(f"Pipeline started. All artifacts will be saved in '{root_path}'.")

    try:
        # --- Step 2: Data Loading ---
        # Log the data loading phase.
        logger.info("Loading raw analytical data files into memory...")
        # Load the contextual data CSV into a pandas DataFrame.
        artifacts['raw_contextual_df'] = pd.read_csv(data_paths['contextual_data'])
        # Load the human benchmark data.
        artifacts['raw_human_benchmark_df'] = pd.read_csv(data_paths['human_benchmark'])
        # Load the human micro-data.
        artifacts['raw_human_micro_df'] = pd.read_csv(data_paths['human_micro'])
        # Load the realized outcomes data.
        artifacts['raw_realized_outcomes_df'] = pd.read_csv(data_paths['realized_outcomes'])
        # Conditionally load human annotations if kappa validation is enabled.
        if run_kappa_validation:
            artifacts['raw_human_annotations_df'] = pd.read_csv(data_paths['human_annotations'])

        # --- Phase I: Validation and Cleansing (Tasks 1-4) ---
        # Log the beginning of Phase I.
        logger.info("--- Running Phase I: Data Validation and Cleansing ---")
        # Execute Task 1: Validate the large, raw persona hub file via streaming.
        all_reports['task_01_validation'] = validate_persona_hub_df(data_paths['persona_hub'], config, total_persona_rows)

        # Execute Task 2: Validate all analytical inputs that were loaded into memory.
        all_reports['task_02_validation'] = validate_analytical_inputs(
            artifacts['raw_contextual_df'], artifacts['raw_human_benchmark_df'],
            artifacts['raw_human_micro_df'], artifacts['raw_realized_outcomes_df'], config
        )

        # Define the path for the cleansed persona data.
        cleansed_persona_path = processed_path / "persona_hub_clean.parquet"
        # Execute Task 3: Cleanse the raw persona hub file and write to the new path.
        all_reports['task_03_cleansing'] = cleanse_persona_hub_df(data_paths['persona_hub'], str(cleansed_persona_path), config)
        # Store the path to the cleansed file for the next step.
        artifacts['cleansed_persona_path'] = str(cleansed_persona_path)

        # Execute Task 4: Cleanse all analytical DataFrames in memory.
        cleansed_dfs = cleanse_analytical_inputs(
            artifacts['raw_contextual_df'], artifacts['raw_human_benchmark_df'],
            artifacts['raw_human_micro_df'], artifacts['raw_realized_outcomes_df'], config
        )
        # Unpack and store the cleansed DataFrames in the artifacts dictionary.
        artifacts['clean_contextual_df'], artifacts['clean_human_benchmark_df'], artifacts['clean_human_micro_df'], artifacts['clean_realized_outcomes_df'] = cleansed_dfs

        # --- Phase II: Persona Filtering (Tasks 5-9) ---
        # Log the beginning of Phase II.
        logger.info("--- Running Phase II: Persona Filtering Pipeline ---")
        # Define the path for the output of the first filtering step.
        persona_step1_path = processed_path / "persona_step1.parquet"
        # Execute Task 5: Apply the keyword and domain filter.
        all_reports['task_05_filter1'] = apply_keyword_domain_filter(artifacts['cleansed_persona_path'], str(persona_step1_path), config)

        # Load the result of the first filter into memory (it's now small enough).
        artifacts['persona_step1_df'] = pd.read_parquet(persona_step1_path)
        # Execute Task 6: Apply the NER filter to the in-memory DataFrame.
        artifacts['persona_step2_df'], all_reports['task_06_filter2'] = apply_ner_person_filter(artifacts['persona_step1_df'], config)
        # Execute Task 7: Apply embedding-based deduplication.
        artifacts['persona_step3_df'], all_reports['task_07_filter3'] = apply_embedding_deduplication(artifacts['persona_step2_df'], config)

        # Define the path for the LLM judge checkpoint file.
        judge_checkpoint_path = checkpoints_path / "judge_results.jsonl"
        # Execute Task 8: Run the asynchronous LLM-as-judge filter.
        artifacts['persona_final_df'], all_reports['task_08_filter4'] = asyncio.run(apply_llm_judge_filter(artifacts['persona_step3_df'], config, str(judge_checkpoint_path)))

        # Conditionally execute Task 9: Validate the reliability of the LLM judge.
        if run_kappa_validation:
            # Load the full set of judgments from the checkpoint file.
            all_judgments_df = pd.read_json(judge_checkpoint_path, lines=True)
            # Run the kappa validation.
            kappa_report_df, summary_t9 = validate_llm_judge_reliability(artifacts['persona_step3_df'], all_judgments_df, artifacts['raw_human_annotations_df'], config)
            # Store the results.
            all_reports['task_09_kappa'] = summary_t9
            artifacts['kappa_report_df'] = kappa_report_df

        # --- Phase III: Forecast Generation (Tasks 10-12) ---
        # Log the beginning of Phase III.
        logger.info("--- Running Phase III: Forecast Generation ---")
        # Execute Task 10: Assemble all prompts for both arms. Convert generator to list to allow multiple passes.
        all_prompts = list(assemble_forecasting_prompts(artifacts['persona_final_df'], artifacts['clean_contextual_df'], config))

        # Define the checkpoint path for the persona arm forecasts.
        persona_checkpoint_path = checkpoints_path / "persona_forecasts.jsonl"
        # Execute Task 11: Generate forecasts for the persona arm.
        artifacts['persona_forecasts_df'], all_reports['task_11_gen_persona'] = generate_persona_forecasts((p for p in all_prompts), config, str(persona_checkpoint_path))

        # Define the checkpoint path for the baseline arm forecasts.
        baseline_checkpoint_path = checkpoints_path / "baseline_forecasts.jsonl"
        # Execute Task 12: Generate forecasts for the baseline arm.
        artifacts['baseline_forecasts_df'], all_reports['task_12_gen_baseline'] = asyncio.run(generate_baseline_forecasts((p for p in all_prompts), config, str(baseline_checkpoint_path)))

        # --- Phase IV: Analysis and Scoring (Tasks 13-22) ---
        # Log the beginning of Phase IV.
        logger.info("--- Running Phase IV: Analysis and Scoring ---")
        # Execute Task 13: Consolidate and QC all generated forecasts.
        artifacts['unified_forecasts_df'], all_reports['task_13_qc'] = parse_and_qc_all_forecasts(artifacts['persona_forecasts_df'], artifacts['baseline_forecasts_df'])
        # Execute Task 14: Compute the AI panel medians for both arms.
        artifacts['ai_panel_medians_df'], all_reports['task_14_aggregation'] = compute_ai_panel_medians(artifacts['unified_forecasts_df'])
        # Execute Task 15: Create the master analysis DataFrame by aligning all data sources.
        artifacts['aligned_df'], all_reports['task_15_alignment'] = align_forecasts_for_scoring(artifacts['ai_panel_medians_df'],
                                                                                                artifacts['clean_human_benchmark_df'],
                                                                                                artifacts['clean_realized_outcomes_df'],
                                                                                                artifacts['clean_contextual_df'], config)

        # Execute Tasks 16 & 17: Compute and compare forecast dispersion.
        ai_dispersion_df, report_t16 = compute_ai_persona_dispersion(artifacts['unified_forecasts_df'])
        artifacts['dispersion_comparison_df'], report_t17 = compute_human_dispersion_and_compare(artifacts['clean_human_micro_df'], ai_dispersion_df)
        all_reports['task_16_17_dispersion'] = {'ai': report_t16, 'human': report_t17}

        # Execute Task 18: Compute absolute errors for all scoreable forecasts.
        artifacts['scored_forecasts_df'], all_reports['task_18_errors'] = compute_absolute_errors(artifacts['aligned_df'])
        # Execute Task 19: Compute Mean Absolute Error (MAE) results.
        artifacts['mae_results_df'], all_reports['task_19_mae'] = compute_mae_results(artifacts['scored_forecasts_df'])

        # Execute Task 20: Construct win-share statistics.
        win_share_df, report_t20 = construct_win_share_statistics(artifacts['scored_forecasts_df'])
        all_reports['task_20_win_share'] = report_t20

        # Execute Tasks 21 & 22: Run hypothesis tests for in-sample and out-of-sample periods.
        win_share_with_insample_tests, report_t21 = run_in_sample_mc_tests(win_share_df, config)
        artifacts['final_win_share_df'], report_t22 = run_out_of_sample_exact_tests(win_share_with_insample_tests)
        all_reports['task_21_22_hyp_tests'] = {'insample': report_t21, 'oos': report_t22}

        # Log the successful completion of all tasks.
        logger.info("--- All pipeline tasks completed successfully. ---")

    # Catch any exception from any task in the pipeline.
    except Exception as e:
        # Log the critical error, including the stack trace.
        logger.critical(f"Pipeline execution failed with a critical error: {e}", exc_info=True)
        # Add failure information to the report.
        all_reports['pipeline_status'] = 'FAILURE'
        all_reports['failure_reason'] = str(e)
        # Save the partial report for debugging purposes.
        report_path = results_path / "FAILED_pipeline_report.json"
        # Use a robust JSON dump that can handle non-serializable types like Path objects.
        with open(report_path, 'w') as f:
            json.dump(all_reports, f, indent=2, default=str)
        # Re-raise the exception to halt execution and signal failure.
        raise

    # --- Final Artifact Persistence ---
    # Log the final step of saving all result tables.
    logger.info("--- Saving final result artifacts to CSV ---")
    # Save the dispersion comparison table.
    artifacts['dispersion_comparison_df'].to_csv(results_path / "dispersion_comparison.csv")
    # Save the MAE results table.
    artifacts['mae_results_df'].to_csv(results_path / "mae_results.csv")
    # Save the final win-share results table with hypothesis tests.
    artifacts['final_win_share_df'].to_csv(results_path / "win_share_results.csv", index=False)
    # Conditionally save the kappa validation report.
    if 'kappa_report_df' in artifacts:
        artifacts['kappa_report_df'].to_csv(results_path / "kappa_validation_report.csv", index=False)

    # --- Final Reporting ---
    # Set the final status to SUCCESS.
    all_reports['pipeline_status'] = 'SUCCESS'
    # Define the path for the final, comprehensive JSON report.
    report_path = results_path / "full_pipeline_report.json"
    # Write the report to a file.
    with open(report_path, 'w') as f:
        json.dump(all_reports, f, indent=2, default=str)

    # Log the location of the final report.
    logger.info(f"Full pipeline report saved to '{report_path}'.")

    # Add the final collection of reports to the artifacts dictionary to be returned.
    artifacts['all_reports'] = all_reports

    # Return the dictionary containing all key in-memory artifacts and the final report.
    return artifacts


In [None]:
# Task 24 — Ablation: paired t-test (persona vs. no-persona)

# ==============================================================================
# Task 24: Ablation: paired t-test (persona vs. no-persona)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 24, Step 1 Helper: Construct paired differences
# ------------------------------------------------------------------------------
def _prepare_paired_ablation_data(
    aligned_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Prepares the data for a paired t-test by calculating error differences.

    This function filters the aligned data to find all forecast matches where
    results from both the persona and no-persona arms are available, along with
    a realized outcome. It then calculates the absolute error for each arm and
    the paired difference between these errors.

    Args:
        aligned_df: The master analysis DataFrame from Task 15.

    Returns:
        A DataFrame containing the paired absolute errors and their differences,
        ready for statistical testing.
    """
    # Log the start of the data preparation.
    logger.info("Preparing paired data for ablation t-test...")

    # --- Input Validation ---
    # Define the columns required for this paired comparison.
    required_cols = {
        'ai_median_persona', 'ai_median_no_persona', 'realized_value'
    }
    # Check if all required columns are present.
    if not required_cols.issubset(aligned_df.columns):
        raise ValueError(f"Input DataFrame is missing required columns for paired test: {required_cols}")

    # --- Filter for Complete Paired Cases ---
    # A case is complete if we have a forecast from both arms and a realized value.
    # Drop all rows where any of these three key values are missing.
    paired_data = aligned_df.dropna(subset=list(required_cols)).copy()

    # Check if any data remains after filtering.
    if paired_data.empty:
        logger.warning("No complete paired cases found for ablation test. Cannot proceed.")
        return pd.DataFrame()

    # --- Compute Absolute Errors for Each Arm ---
    # Equation: e_j^(AI,P) = |ŷ_j^(AI,P) - y_j|
    paired_data['abs_error_persona'] = (
        paired_data['ai_median_persona'] - paired_data['realized_value']
    ).abs()

    # Equation: e_j^(AI,NP) = |ŷ_j^(AI,NP) - y_j|
    paired_data['abs_error_no_persona'] = (
        paired_data['ai_median_no_persona'] - paired_data['realized_value']
    ).abs()

    # --- Compute Paired Difference ---
    # The difference is defined as the error of the persona arm minus the error of the no-persona arm.
    # A negative value means the persona arm had a lower error (was better).
    # Equation: d_j = e_j^(AI,P) - e_j^(AI,NP)
    paired_data['error_difference'] = (
        paired_data['abs_error_persona'] - paired_data['abs_error_no_persona']
    )

    # Log the number of pairs prepared for the test.
    logger.info(f"Prepared {len(paired_data)} paired observations for the t-test.")

    # Return the prepared DataFrame.
    return paired_data

# ------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# ------------------------------------------------------------------------------
def run_ablation_paired_ttest(
    aligned_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Performs a paired t-test to compare the forecast accuracy of the persona
    and no-persona arms.

    This function executes the first statistical test of the ablation study. It
    compares the mean absolute error of the persona-prompted forecasts against
    the no-persona baseline forecasts to determine if there is a statistically
    significant difference in accuracy.

    Args:
        aligned_df: The master analysis DataFrame containing forecasts from both
                    arms and the realized outcomes.

    Returns:
        A tuple containing:
        - A single-row DataFrame summarizing the t-test results.
        - A dictionary report of the process.
    """
    # Log the start of the task.
    logger.info("Starting Task 24: Ablation paired t-test.")

    # --- Step 1: Construct Paired Differences ---
    # Call the helper to prepare the paired dataset.
    paired_data = _prepare_paired_ablation_data(aligned_df)

    # If no paired data could be created, exit gracefully.
    if paired_data.empty:
        return pd.DataFrame(), {"status": "SKIPPED_NO_PAIRED_DATA"}

    # --- Step 2: Compute Paired T-Test Statistics ---
    # Log the execution of the statistical test.
    logger.info("Performing paired t-test on error differences...")

    # Use scipy's ttest_rel for a paired t-test. This is equivalent to a
    # one-sample t-test on the `error_difference` column.
    # H0: The true mean difference between the paired samples is zero.
    t_statistic, p_value = ttest_rel(
        a=paired_data['abs_error_persona'],
        b=paired_data['abs_error_no_persona']
    )

    # --- Step 3: Report and Interpret Results ---
    # Calculate descriptive statistics for the report.
    mean_diff = paired_data['error_difference'].mean()
    std_diff = paired_data['error_difference'].std()
    n_pairs = len(paired_data)
    degrees_freedom = n_pairs - 1

    # Compare the results to the values reported in the paper for validation.
    expected_t = -1.02
    expected_p = 0.31
    t_match = np.isclose(t_statistic, expected_t, atol=0.01)
    p_match = np.isclose(p_value, expected_p, atol=0.01)
    replication_status = "SUCCESS" if t_match and p_match else "FAILURE"

    # Create a structured dictionary of the final results.
    results = {
        "test_name": "Paired t-test (Persona vs. No-Persona)",
        "n_pairs": n_pairs,
        "mean_difference": mean_diff,
        "std_dev_of_difference": std_diff,
        "t_statistic": t_statistic,
        "degrees_of_freedom": degrees_freedom,
        "p_value_two_sided": p_value,
        "conclusion": "Fail to reject H0" if p_value >= 0.05 else "Reject H0",
        "interpretation": "No statistically significant difference in mean absolute error.",
        "replication_status": replication_status
    }

    # Convert the results dictionary to a single-row DataFrame for consistent output format.
    results_df = pd.DataFrame([results])

    # Log the conclusion.
    logger.info(f"Paired t-test complete. p-value={p_value:.4f}. Conclusion: {results['interpretation']}")
    if replication_status == "FAILURE":
        logger.warning("T-test results do not match the values reported in the paper.")

    # Return the results DataFrame and a simple report.
    return results_df, {"status": "SUCCESS", "pairs_analyzed": n_pairs}


In [None]:
# Task 25 — Ablation: Kolmogorov–Smirnov test (distributional equivalence)

# ==============================================================================
# Task 25: Ablation: Kolmogorov–Smirnov test (distributional equivalence)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 25, Orchestrator Function
# ------------------------------------------------------------------------------
def run_ablation_ks_test(
    aligned_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Performs a two-sample Kolmogorov-Smirnov (KS) test to compare the error
    distributions of the persona and no-persona arms.

    This function executes the second statistical test of the ablation study.
    While the t-test compares the means, the KS test compares the entire shape
    of the error distributions. A non-significant result provides strong evidence
    that the two distributions are statistically indistinguishable.

    Args:
        aligned_df: The master analysis DataFrame containing forecasts from both
                    arms and the realized outcomes.

    Returns:
        A tuple containing:
        - A single-row DataFrame summarizing the KS-test results.
        - A dictionary report of the process.
    """
    # Log the start of the task.
    logger.info("Starting Task 25: Ablation Kolmogorov-Smirnov (KS) test.")

    # --- Step 1: Construct Error Distribution Samples ---
    # Use the same helper from the t-test task to get the paired error data.
    # This ensures we are testing on the exact same set of matches.
    paired_data = _prepare_paired_ablation_data(aligned_df)

    # If no paired data could be created, exit gracefully.
    if paired_data.empty:
        # Log a warning and return empty results.
        logger.warning("No complete paired cases found for ablation KS test. Cannot proceed.")
        return pd.DataFrame(), {"status": "SKIPPED_NO_PAIRED_DATA"}

    # Extract the two error distributions as numpy arrays.
    error_dist_persona = paired_data['abs_error_persona'].values
    error_dist_no_persona = paired_data['abs_error_no_persona'].values

    # --- Input Validation ---
    # The two-sample KS test requires two independent samples. While these are
    # paired, the test is still valid for testing distributional equivalence.
    # We must ensure they are of equal size.
    if len(error_dist_persona) != len(error_dist_no_persona):
        raise ValueError("Error distribution samples must have the same size for this test.")

    # --- Step 2: Compute Two-Sample KS Test ---
    # Log the execution of the statistical test.
    logger.info("Performing two-sample KS test on error distributions...")

    # Use scipy's ks_2samp function to perform the test.
    # H0: The two samples are drawn from the same distribution.
    # The function returns the D statistic and the two-sided p-value.
    # Equation: D = sup_x |F_P(x) - F_NP(x)|
    d_statistic, p_value = ks_2samp(
        data1=error_dist_persona,
        data2=error_dist_no_persona
    )

    # --- Step 3: Report and Interpret Results ---
    # Get the sample size for the report.
    n_samples = len(error_dist_persona)

    # Compare the results to the values reported in the paper for validation.
    expected_d = 0.05
    expected_p = 0.28
    d_match = np.isclose(d_statistic, expected_d, atol=0.01)
    p_match = np.isclose(p_value, expected_p, atol=0.01)
    replication_status = "SUCCESS" if d_match and p_match else "FAILURE"

    # Create a structured dictionary of the final results.
    results = {
        "test_name": "Two-Sample Kolmogorov-Smirnov Test (Persona vs. No-Persona)",
        "n_samples_per_dist": n_samples,
        "d_statistic": d_statistic,
        "p_value": p_value,
        "conclusion": "Fail to reject H0" if p_value >= 0.05 else "Reject H0",
        "interpretation": "The error distributions are statistically indistinguishable.",
        "replication_status": replication_status
    }

    # Convert the results dictionary to a single-row DataFrame.
    results_df = pd.DataFrame([results])

    # Log the conclusion.
    logger.info(f"KS test complete. p-value={p_value:.4f}. Conclusion: {results['interpretation']}")
    if replication_status == "FAILURE":
        logger.warning("KS-test results do not match the values reported in the paper.")

    # Return the results DataFrame and a simple report.
    return results_df, {"status": "SUCCESS", "samples_analyzed": n_samples}


In [None]:
# Top-Level Orchestrator

# ==============================================================================
# Top-Level Orchestrator for the Entire Study
# ==============================================================================

def run_synthetic_economist_study(
    data_paths: Dict[str, str],
    config: Dict[str, Any],
    output_dir: str,
    total_persona_rows: int,
    run_kappa_validation: bool = True
) -> Dict[str, Any]:
    """
    Executes the complete research study, including the main replication pipeline
    and the final ablation study statistical tests.

    This top-level function serves as the single entry point for the entire
    project. It orchestrates the execution of all 25 tasks in the correct
    sequence, ensuring that the outputs of the main pipeline (Tasks 1-22) are
    correctly passed as inputs to the final ablation tests (Tasks 24-25).

    **Workflow:**
    1.  Calls `run_full_replication_pipeline` to execute the main data
        processing, forecast generation, and analysis pipeline.
    2.  Extracts the key `aligned_df` artifact from the pipeline's output.
    3.  Calls `run_ablation_paired_ttest` on the `aligned_df` to compare the
        mean absolute errors of the two experimental arms.
    4.  Calls `run_ablation_ks_test` on the `aligned_df` to compare the
        error distributions of the two arms.
    5.  Collects and returns all results in a single, comprehensive dictionary.

    Args:
        data_paths: A dictionary mapping logical data names to their raw file paths.
        config: The main study configuration dictionary.
        output_dir: The root directory where all outputs will be saved.
        total_persona_rows: The total number of rows in the raw persona hub file.
        run_kappa_validation: A flag to enable/disable the kappa validation step.

    Returns:
        A dictionary containing all major artifacts and reports from the entire study.
    """
    # Log the start of the top-level orchestration.
    logger.info(">>> Starting the top-level orchestrator for the Synthetic Economist study. <<<")

    # Initialize a dictionary to hold the final, comprehensive results.
    final_results: Dict[str, Any] = {}

    try:
        # --- Step 1: Run the main replication pipeline (Tasks 1-22) ---
        # This function is the master orchestrator for the main body of the paper.
        # It handles all data processing, filtering, forecast generation, and initial analysis.
        # It returns a dictionary of all key artifacts produced during its run.
        replication_artifacts = run_full_replication_pipeline(
            data_paths=data_paths,
            config=config,
            output_dir=output_dir,
            total_persona_rows=total_persona_rows,
            run_kappa_validation=run_kappa_validation
        )
        # Store the entire collection of artifacts from the main pipeline.
        final_results['replication_pipeline_artifacts'] = replication_artifacts

        # --- Step 2: Unpack results and prepare for ablation tests ---
        # Log the transition to the ablation study phase.
        logger.info("--- Main replication pipeline complete. Proceeding to final ablation tests. ---")

        # Extract the critical `aligned_df` artifact, which is required for both ablation tests.
        # This DataFrame contains the aligned forecasts from both arms and the realized outcomes.
        aligned_df = replication_artifacts.get('aligned_df')

        # --- Input Validation for Ablation Tests ---
        # Perform a rigorous check to ensure the required DataFrame exists and is valid.
        if aligned_df is None or not isinstance(aligned_df, pd.DataFrame) or aligned_df.empty:
            # If the required data is missing, the ablation tests cannot run. Raise a critical error.
            error_msg = "The 'aligned_df' artifact was not found or is empty. Cannot run ablation tests."
            logger.critical(error_msg)
            raise RuntimeError(error_msg)

        # --- Step 3: Run the paired t-test (Task 24) ---
        # Execute the paired t-test to compare the mean absolute errors.
        ttest_df, ttest_report = run_ablation_paired_ttest(aligned_df=aligned_df)
        # Store the results of the t-test.
        final_results['ablation_ttest_report'] = ttest_df
        # Save the t-test report to a file for easy access.
        ttest_df.to_csv(Path(output_dir) / "results" / "ablation_ttest_report.csv", index=False)

        # --- Step 4: Run the Kolmogorov-Smirnov test (Task 25) ---
        # Execute the KS test to compare the error distributions.
        ks_test_df, ks_test_report = run_ablation_ks_test(aligned_df=aligned_df)
        # Store the results of the KS test.
        final_results['ablation_ks_test_report'] = ks_test_df
        # Save the KS test report to a file.
        ks_test_df.to_csv(Path(output_dir) / "results" / "ablation_ks_test_report.csv", index=False)

        # Log the successful completion of the entire study.
        logger.info(">>> Top-level orchestrator completed all tasks successfully. <<<")

    # Catch any exception from any part of the entire process.
    except Exception as e:
        # Log the critical failure.
        logger.critical(f"The top-level orchestrator failed with a critical error: {e}", exc_info=True)
        # Add failure information to the final results dictionary.
        final_results['pipeline_status'] = 'FAILURE'
        final_results['failure_reason'] = str(e)
        # Re-raise the exception to halt execution and signal failure to the caller.
        raise

    # If successful, set the final status.
    final_results['pipeline_status'] = 'SUCCESS'

    # Return the final, comprehensive dictionary of all results.
    return final_results
