# **`README.md`**

# Measuring Corruption from Text Data: Automated Quantification of Institutional Quality

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2512.09652-b31b1b.svg)](https://arxiv.org/abs/2512.09652)
[![Journal](https://img.shields.io/badge/Journal-Political%20Economy%20(econ.GN)-003366)](https://arxiv.org/abs/2512.09652)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/measuring_corruption_from_text_data)
[![Discipline](https://img.shields.io/badge/Discipline-Political%20Economy%20%7C%20NLP-00529B)](https://github.com/chirindaopensource/measuring_corruption_from_text_data)
[![Data Sources](https://img.shields.io/badge/Data-CGU%20Audit%20Reports-lightgrey)](https://www.gov.br/cgu/pt-br)
[![Data Sources](https://img.shields.io/badge/Data-IBGE%20(Municipal%20Covariates)-lightgrey)](https://www.ibge.gov.br/)
[![Data Sources](https://img.shields.io/badge/Data-Ferraz%20%26%20Finan%20(2011)-lightgrey)](https://www.aeaweb.org/articles?id=10.1257/aer.101.4.1274)
[![Data Sources](https://img.shields.io/badge/Data-Timmons%20%26%20Garfias%20(2015)-lightgrey)](https://www.sciencedirect.com/science/article/abs/pii/S030438781400138X)
[![Core Method](https://img.shields.io/badge/Method-Dictionary--Based%20Classification-orange)](https://github.com/chirindaopensource/measuring_corruption_from_text_data)
[![Analysis](https://img.shields.io/badge/Analysis-Principal%20Component%20Analysis%20(PCA)-red)](https://github.com/chirindaopensource/measuring_corruption_from_text_data)
[![Validation](https://img.shields.io/badge/Validation-Econometric%20Fixed%20Effects-green)](https://github.com/chirindaopensource/measuring_corruption_from_text_data)
[![Robustness](https://img.shields.io/badge/Robustness-Supervised%20Learning%20(LR%2FNB)-yellow)](https://github.com/chirindaopensource/measuring_corruption_from_text_data)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![Scikit-Learn](https://img.shields.io/badge/scikit--learn-%23F7931E.svg?style=flat&logo=scikit-learn&logoColor=white)](https://scikit-learn.org/)
[![NLTK](https://img.shields.io/badge/NLTK-%23339933.svg?style=flat&logo=python&logoColor=white)](https://www.nltk.org/)
[![Statsmodels](https://img.shields.io/badge/statsmodels-blue?logo=python&logoColor=white)](https://www.statsmodels.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/measuring_corruption_from_text_data`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Measuring Corruption from Text Data"** by:

*   **Arieda Muço** (Central European University)

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from the heuristic extraction of irregularities from unstructured audit reports and dictionary-based classification to dimensionality reduction via PCA, rigorous econometric validation against human experts, and supervised learning robustness checks.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `execute_full_research_pipeline`](#key-callable-execute_full_research_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Muço (2025). The core of this repository is the iPython Notebook `measuring_corruption_from_text_data_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed to be a generalizable toolkit for quantifying institutional quality from unstructured administrative text, specifically focusing on the Brazilian municipal audit program (CGU).

The paper addresses the fundamental challenge of measuring corruption—a hidden phenomenon—by leveraging the "administrative exhaust" of government audits. This codebase operationalizes the paper's framework, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Extract granular irregularity segments from heterogeneous PDF-derived text using regex-based heuristics.
-   Classify irregularities as "severe" or "non-severe" using a domain-specific Portuguese dictionary.
-   Construct a latent **Corruption Index** via Principal Component Analysis (PCA) on text-derived features.
-   Validate the automated measure against hand-coded datasets (Ferraz & Finan, Timmons & Garfias) using fixed-effects regression models.
-   Verify robustness using supervised machine learning classifiers (Logistic Regression, Naive Bayes) and Leave-One-Out (LOO) sensitivity analysis.

## Theoretical Background

The implemented methods combine techniques from Natural Language Processing (NLP), Unsupervised Learning, and Econometrics.

**1. Text-as-Data Extraction & Classification:**
The pipeline treats audit reports as data. It handles structural shifts in reporting (introduction of summaries in later lotteries) to isolate "irregularity segments."
-   **Dictionary Method:** A deterministic rule classifies an irregularity $I_{ij}$ as severe if it contains specific n-grams (e.g., "empresa fantasma", "fraud") from a curated lexicon $\mathcal{L}$:
    $$ Severe(I_{ij}) = \mathbb{1}\{\exists \ell \in \mathcal{L} : Match(\ell, \phi(I_{ij}))\} $$
    where $\phi(\cdot)$ represents the text normalization pipeline (stemming, stopword removal).

**2. Dimensionality Reduction (PCA):**
To synthesize a single measure from correlated text features (image counts, page counts, severe irregularity counts), PCA is applied to the standardized feature matrix $Z$. The first principal component $v_1$ serves as the index:
    $$ \text{Corruption Index}_i = Z_i^\top v_1 $$
This component captures ~80% of the common variation, representing the latent "severity" dimension.

**3. Econometric Validation:**
The automated index is validated by regressing human-coded corruption counts ($HC_i$) on the index, controlling for state fixed effects ($\tau_t$) to account for auditor team heterogeneity:
    $$ HC_i = \alpha + \beta \text{Corruption Index}_i + \tau_t + \varepsilon_i $$
Strong predictive power ($R^2 > 0.70$) in high-agreement samples confirms criterion validity.

## Features

The provided iPython Notebook (`measuring_corruption_from_text_data_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The pipeline is decomposed into 19 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters (lottery cutoffs, regex markers, PCA thresholds) are managed in an external `config.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks schema integrity, key uniqueness, and logical consistency of the corpus and validation data.
-   **Advanced NLP Pipeline:** Implements NFD normalization, accent stripping, and Porter stemming tailored for Portuguese administrative text.
-   **Robustness Verification:** Includes automated Leave-One-Out (LOO) analysis and a parallel Supervised Learning pipeline to cross-validate the dictionary-based measure.
-   **Reproducible Artifacts:** Generates structured dictionaries and serializable outputs for every intermediate result, ensuring full auditability.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Cleansing (Tasks 1-6):** Ingests raw corpus and validation data, normalizes identifiers (7-digit IBGE codes), cleanses text encoding, and validates numeric metadata.
2.  **Extraction & NLP (Tasks 7-9):** Applies heuristic parsing to extract irregularity segments, normalizes text and lexicon into a shared matching space, and classifies irregularities as severe/non-severe.
3.  **Index Construction (Task 10):** Builds the feature matrix, standardizes data, computes PCA, and generates the Corruption Index.
4.  **Econometric Validation (Tasks 11-14):** Merges the index with external datasets, constructs agreement samples, and estimates validation regressions (Tables 1, 2, and 3).
5.  **Robustness Checks (Tasks 16-19):** Performs LOO sensitivity analysis and executes a full supervised learning pipeline (training classifiers, rebuilding the index with ML predictions) to confirm result stability.

## Core Components (Notebook Structure)

The `measuring_corruption_from_text_data_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 19 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `execute_full_research_pipeline`

The project is designed around a single, top-level user-facing interface function:

-   **`execute_full_research_pipeline`:** This master orchestrator function runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, managing data flow between the main analysis, LOO robustness, and supervised learning robustness modules.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scikit-learn`, `statsmodels`, `nltk`, `pyyaml`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/measuring_corruption_from_text_data.git
    cd measuring_corruption_from_text_data
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scikit-learn statsmodels nltk pyyaml
    ```

4.  **Download NLTK Data:**
    The pipeline will attempt to download necessary NLTK data (stopwords) automatically, but you can pre-install them:
    ```python
    import nltk
    nltk.download('stopwords')
    ```

## Input Data Structure

The pipeline requires two primary DataFrames:
1.  **`df_raw_corpus`**: The corpus of audit reports with columns: `report_id`, `municipality_id`, `report_full_text`, `report_summary_text`, `lottery`, `year`, `image_count`, `page_count`, etc.
2.  **`df_validation_raw`**: External validation data with columns: `municipality_id`, `ff_corruption_count`, `gt_corruption_count`, `cgu_severe_count`, and municipal covariates (`literacy_rate`, `gdp_per_capita`, etc.).

## Usage

The `measuring_corruption_from_text_data_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `execute_full_research_pipeline` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load the master configuration from the YAML file.
    import yaml
    with open('config.yaml', 'r') as f:
        study_config = yaml.safe_load(f)
    
    # 2. Load raw datasets (Example using synthetic generator provided in the notebook)
    # In production, load from CSV/Parquet: pd.read_csv(...)
    df_raw_corpus = ...
    df_validation_raw = ...
    
    # 3. Define Lexicon
    raw_lexicon_list = ["empresa fantasma", "fraud", "conluio", ...]

    # 4. Execute the entire replication study.
    results = execute_full_research_pipeline(
        df_raw_corpus=df_raw_corpus,
        df_validation_raw=df_validation_raw,
        language="Portuguese",
        raw_lexicon_list=raw_lexicon_list,
        study_configuration=study_config
    )
    
    # 5. Access results
    print(f"Main Pipeline R2 (Strict Agreement): {results['main_pipeline']['table1_results']['Strict Agreement']['R2']}")
```

## Output Structure

The pipeline returns a master dictionary containing all analytical artifacts:
-   **`main_pipeline`**: Contains `df_corpus_with_index` (the final index), `table1_results` (validation regressions), `table2_results` (CGU validation), `table3_results` (correlates), and `pca_artifacts`.
-   **`loo_analysis`**: Contains `detailed_results` (per-iteration stats) and `summary` (min/max ranges for $\beta$ and $R^2$).
-   **`supervised_robustness`**: Contains `classification_reports`, `ml_pca_index`, and `comparison_stats` (correlation between dictionary and ML indices).

## Project Structure

```
measuring_corruption_from_text_data/
│
├── measuring_corruption_from_text_data_draft.ipynb  # Main implementation notebook
├── config.yaml                                      # Master configuration file
├── requirements.txt                                 # Python package dependencies
│
├── LICENSE                                          # MIT Project License File
└── README.md                                        # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify study parameters such as:
-   **Parsing Logic:** `lottery_cutoff`, `regex_start_marker`.
-   **NLP Settings:** `stemmer_algorithm`, `dictionary_ngram_range`.
-   **PCA Settings:** `pca_input_features`, `eigenvalue_threshold`.
-   **Econometrics:** `robust_se_type`, `validation_fixed_effects`.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **LLM Integration:** Replacing the dictionary classifier with Large Language Models (e.g., BERT, GPT) to test if contextual embeddings improve severity classification.
-   **Temporal Analysis:** Extending the model to analyze trends in corruption severity over time.
-   **Cross-National Application:** Adapting the dictionary and extraction logic for audit reports from other countries (e.g., Mexico, Puerto Rico).

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{muco2025measuring,
  title={Measuring Corruption from Text Data},
  author={Muço, Arieda},
  journal={arXiv preprint arXiv:2512.09652},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). Automated Quantification of Institutional Quality: An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/measuring_corruption_from_text_data
```

## Acknowledgments

-   Credit to **Arieda Muço** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, Scikit-Learn, and Statsmodels**.

--

*This README was generated based on the structure and content of the `measuring_corruption_from_text_data_draft.ipynb` notebook and follows best practices for research software documentation.*


# Paper

Title: "*Measuring Corruption from Text Data*"

Authors: Arieda Muço

E-Journal Submission Date: 10 December 2025

Link: https://arxiv.org/abs/2512.09652

Authors' Github Project Reprository: : https://github.com/ariedamuco/Audit-reports/blob/master/list-words

Reformulated Abstract:

Corruption undermines democratic governance and economic development, yet its hidden nature poses fundamental measurement challenges. Traditional approaches—perception-based indices and manual audit coding—suffer from subjectivity, limited scalability, and high costs. This paper introduces an automated corruption measure constructed from the full text of Brazilian municipal audit reports (2003–2011, N = 2,194 inspections). The methodology combines a domain-specific dictionary that classifies irregularities by severity with principal component analysis to extract a latent corruption dimension. The first principal component captures 80% of the common variation across five text-derived features. Validation against independent human coders demonstrates strong criterion validity, with the automated index explaining 71–73% of variance in hand-coded corruption counts where coder agreement is highest. The index also predicts official government severity classifications (R² ≈ 0.31) and correlates with municipal characteristics theoretically linked to corruption risk, while capturing substantial information beyond these observables (incremental R² = 0.17). Robustness checks using supervised learning classifiers (logistic regression, Naive Bayes) yield nearly identical municipal rankings (R² > 0.98), confirming that the dictionary approach recovers the same underlying construct. The method offers advantages over both manual coding—in scalability and consistency—and large language models—in transparency, cost, and long-term replicability. This validated measure expands feasible sample sizes by a factor of three to five, enabling fine-grained analyses of corruption's political and economic consequences.

# Summary

### The Identification Problem & Data Generating Process
The paper identifies a critical bottleneck in political economy: corruption is inherently hidden, and existing measures are either subjective (perception indices) or non-scalable (manual coding of audit reports).
*   **The Data Source:** The study utilizes the corpus of Brazilian municipal audit reports generated by the *Controladoria Geral da União* (CGU) between 2003 and 2011. These audits are assigned via a random lottery, providing a quasi-experimental setting.
*   **The Challenge:** The data is high-dimensional and unstructured text. The objective is to map this text to a scalar index representing "corruption severity."

### Feature Extraction (The Dictionary Approach)
Instead of relying immediately on "black box" deep learning, the author employs a transparent, rule-based extraction method (Natural Language Processing).
*   **Preprocessing:** Text is stemmed and tokenized.
*   **Dictionary Construction:** A domain-specific dictionary of n-grams is created to distinguish "severe" irregularities (e.g., *fraud, ghost firm, procurement simulation*) from administrative errors.
*   **Feature Vector:** For each municipality $i$, the algorithm extracts a vector of variables:
    1.  Count of severe irregularities (via dictionary).
    2.  Total count of irregularities.
    3.  Metadata proxies for audit depth: Number of pages, number of lines, and number of photographic images (evidence).

### Dimensionality Reduction (Principal Component Analysis)
The extracted features exhibit high multicollinearity (pairwise correlations range from 0.56 to 0.96). To isolate the latent variable—corruption severity—the author applies Principal Component Analysis (PCA).
*   **The First Component:** The first principal component accounts for **80% of the common variation** and is the only component with an eigenvalue greater than one (Kaiser criterion).
*   **Loadings:** All variables load positively and with similar magnitude (0.37–0.48), confirming that the index is a composite measure of severity and audit intensity, not merely a proxy for report length.

### Econometric Validation (Criterion Validity)
The core contribution is the rigorous validation of this automated index against "ground truth" human coding.
*   **Benchmarks:** The index is regressed against hand-coded datasets from seminal papers: Ferraz and Finan (2011) and Timmons and Garfias (2015).
*   **The Model:** $HC_i = \alpha + \beta \text{Index}_i + \tau_t + \epsilon_i$, where $HC$ is the human count and $\tau$ are state fixed effects.
*   **Results:**
    *   In samples where human coders are in **strict agreement**, the automated index explains **71–73% of the variation** ($R^2 \approx 0.72$).
    *   This suggests the automated measure captures the same underlying signal as expert humans, with the remaining error likely attributable to human inconsistency in ambiguous cases.

### External and Construct Validity
The author tests the index against alternative specifications to ensure it is not measuring noise or unrelated structural factors.
*   **CGU Validation:** The index correlates with the CGU’s own later classification of "severe" irregularities ($R^2 \approx 0.31$). The lower correlation is expected, as CGU focuses on fiscal impact while the index (and academic coders) focuses on criminality/intent.
*   **Orthogonality to Covariates:** The index correlates with municipal characteristics in theoretically predicted ways (negative correlation with literacy/GDP; positive with distance to capital). Crucially, these covariates explain only **17%** of the variance in the index. This implies the text data contains substantial unique signal that cannot be predicted simply by looking at a municipality's socio-economic status.

### Robustness via Supervised Learning
To verify the dictionary approach, the author trains supervised machine learning models (Logistic Regression and Naive Bayes) using the human-coded data as labels.
*   **Feature Importance:** The supervised models identify bigrams (e.g., "procurement contract," "release resources") that largely overlap with the hand-crafted dictionary.
*   **Convergence:** A PCA constructed from the supervised learning predictions yields a ranking that is **98% correlated** with the dictionary-based PCA.
*   **Implication:** The dictionary method is robust; it recovers the same latent dimension as supervised methods but without the dependency on expensive, potentially noisy labeled training data.

### Conclusion
The paper demonstrates that a transparent, dictionary-based PCA approach provides a valid, cost-effective, and scalable measure of corruption. It offers distinct advantages over Large Language Models (LLMs) regarding long-run replicability (avoiding API drift) and transparency, allowing researchers to expand corruption studies to the full universe of Brazilian municipalities.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  Automated Quantification of Institutional Quality
#
#  This module provides a complete, production-grade implementation of the
#  text-as-data framework presented in "Measuring Corruption from Text Data" by
#  Arieda Muço (2025). It delivers a scalable, transparent, and reproducible
#  system for quantifying corruption severity from unstructured audit reports,
#  enabling high-fidelity measurement of institutional quality at the municipal
#  level without reliance on subjective perception indices or costly manual coding.
#
#  Core Methodological Components:
#  • Heuristic extraction of irregularity segments from heterogeneous audit documents
#  • Dictionary-based classification of corruption severity using domain-specific n-grams
#  • Dimensionality reduction via Principal Component Analysis (PCA) to synthesize a latent index
#  • Rigorous econometric validation against independent human expert coding and official benchmarks
#  • Robustness verification using supervised machine learning classifiers (Logistic Regression, Naive Bayes)
#  • Leave-One-Out (LOO) sensitivity analysis to ensure index stability
#
#  Technical Implementation Features:
#  • Deterministic text normalization pipeline (NFD, stemming, stopword removal)
#  • Regex-based parsing logic handling regime shifts in document structure
#  • Sparse matrix operations for efficient TF-IDF feature engineering
#  • Fixed-effects regression modeling with heteroskedasticity-robust inference
#  • Comprehensive data quality assurance and quarantine protocols
#  • Reproducible artifact generation with full configuration and environment manifest
#
#  Paper Reference:
#  Muço, A. (2025). Measuring Corruption from Text Data.
#  arXiv preprint arXiv:2512.09652.
#  https://arxiv.org/abs/2512.09652
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

import sys
import json
import hashlib
import logging
import re
import unicodedata
import string
from datetime import datetime
from typing import (
    List,
    Dict,
    Any,
    Tuple,
    Optional,
    Set,
    Union,
    Callable
)
from dataclasses import dataclass, asdict

import pandas as pd
import numpy as np
import statsmodels.formula.api as smf
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.metrics import classification_report

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# Implementation

# Draft 1

## **Discussion of the Inputs-Processes-Outputs of Key Callables**

### **1. `validate_corpus_inputs` (Task 1 Orchestrator)**

*   **Inputs**: `df_raw_corpus` (pd.DataFrame).
*   **Processes**: Sequentially invokes `validate_corpus_schema`, `validate_corpus_uniqueness`, and `validate_lottery_summary_consistency`. It aggregates errors and warnings from these sub-steps.
*   **Outputs**: None (returns `None` on success, raises `ValueError` on failure).
*   **Transformation**: This is a validation gate; it does not transform data but ensures the input state is valid for downstream processing.
*   **Research Role**: Implements the data integrity checks required before any text processing can begin. It ensures the raw data conforms to the structure implied by the audit program description (Section 2), specifically validating the existence of critical metadata like `lottery` round and `report_id` which drive the extraction logic.

### **2. `validate_validation_inputs` (Task 2 Orchestrator)**

*   **Inputs**: `df_validation_raw` (pd.DataFrame), `df_raw_corpus` (pd.DataFrame).
*   **Processes**: Invokes `validate_validation_schema`, `validate_join_feasibility`, and `validate_covariate_completeness`.
*   **Outputs**: None (returns `None` on success, raises `ValueError` on failure).
*   **Transformation**: Validation gate.
*   **Research Role**: Ensures the external validation datasets (Ferraz & Finan, Timmons & Garfias, CGU) and municipal covariates are present and joinable. This is a prerequisite for the econometric validation in Section 4, ensuring that Equation (1) ($HC_i = \alpha + \beta \text{Corruption Index}_i + \tau_t + \varepsilon_i$) can be estimated.

### **3. `validate_config_inputs` (Task 3 Orchestrator)**

*   **Inputs**: `language` (str), `raw_lexicon_list` (List[str]), `study_configuration` (Dict).
*   **Processes**: Validates language settings, lexicon content, and configuration coherence (e.g., PCA parameters, lottery cutoffs).
*   **Outputs**: `run_manifest` (Dict).
*   **Transformation**: Transforms raw configuration inputs into a validated "run manifest" dictionary.
*   **Research Role**: Enforces the methodological parameters defined in the paper, such as the lottery cutoff for summary extraction (Section 2) and the Kaiser criterion for PCA (Section 3.1).

### **4. `cleanse_identifiers` (Task 4 Orchestrator)**

*   **Inputs**: `df_raw_corpus` (pd.DataFrame), `df_validation_raw` (pd.DataFrame).
*   **Processes**: Normalizes municipality IDs (to 7-digit strings) and state codes; quarantines rows with missing identifiers.
*   **Outputs**: `df_corpus_clean`, `df_validation_clean`, `df_corpus_quarantined`, `df_validation_quarantined` (all pd.DataFrame).
*   **Transformation**: Converts raw identifiers into canonical formats (`_canon` columns) and splits datasets based on validity.
*   **Research Role**: Prepares the join keys for merging audit data with external validation sets. Accurate identifiers are critical for the fixed effects $\tau_t$ in Equation (1), which rely on correct state codes.

### **5. `cleanse_text_fields` (Task 5 Orchestrator)**

*   **Inputs**: `df_raw_corpus` (pd.DataFrame).
*   **Processes**: Normalizes text encoding (UTF-8) and line endings; validates text content presence; computes quality metrics.
*   **Outputs**: `df_clean`, `df_quarantined` (pd.DataFrame), `metrics` (Dict).
*   **Transformation**: Cleans text columns (`report_full_text`, `report_summary_text`) and filters out rows with empty text.
*   **Research Role**: Ensures the unstructured text input is readable and consistent for the NLP pipeline. This step is foundational for the "Text-as-Data" approach described in Section 3.

### **6. `cleanse_numeric_fields` (Task 6 Orchestrator)**

*   **Inputs**: `df_raw_corpus` (pd.DataFrame).
*   **Processes**: Validates and coerces numeric metadata (page count, image count, etc.); checks definition consistency; flags outliers.
*   **Outputs**: `df_clean` (pd.DataFrame), `stats` (Dict).
*   **Transformation**: Coerces numeric columns to integers and adds outlier flag columns.
*   **Research Role**: Prepares the structural features ($x_i$) used in the PCA index construction (Section 3.1). Specifically, it ensures `image_count`, `page_count`, and `report_lines_count` are valid inputs for the matrix $X$.

### **7. `extract_irregularities` (Task 7 Orchestrator)**

*   **Inputs**: `df_raw_corpus` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Applies the two-regime extraction logic (Summary parsing vs. Regex/Fallback) to each report; materializes the irregularity-level dataset.
*   **Outputs**: `df_irregularities` (pd.DataFrame).
*   **Transformation**: Explodes report-level text into a granular DataFrame where each row is a single irregularity segment.
*   **Research Role**: Implements the heuristic extraction algorithm described in Appendix A1.3. It handles the structural heterogeneity of audit reports (summaries vs. full text) to isolate the units of analysis (irregularities).

### **8. `normalize_text_and_lexicon` (Task 8 Orchestrator)**

*   **Inputs**: `df_irregularities` (pd.DataFrame), `raw_lexicon_list` (List[str]), `study_configuration` (Dict).
*   **Processes**: Defines the normalization function $\phi(\cdot)$; normalizes the lexicon; normalizes irregularity texts.
*   **Outputs**: `df_norm` (pd.DataFrame), `norm_lexicon` (Set[str]), `norm_map` (Dict).
*   **Transformation**: Applies NLP transformations (NFD, stemming, etc.) to text and lexicon, creating a shared matching space.
*   **Research Role**: Implements the text preprocessing pipeline $\phi(s)$ described in Appendix A2. This ensures that variations in surface forms (e.g., "licitatórios" vs. "licitatório") map to the same stem ("licitatori") for accurate dictionary matching.

### **9. `classify_severe_and_count` (Task 9 Orchestrator)**

*   **Inputs**: `df_irregularities` (pd.DataFrame), `normalized_lexicon` (Set[str]), `df_raw_corpus` (pd.DataFrame).
*   **Processes**: Classifies each irregularity as severe/non-severe; aggregates counts to report level; merges counts back to corpus.
*   **Outputs**: `df_classified` (pd.DataFrame), `df_corpus_counts` (pd.DataFrame).
*   **Transformation**: Adds `is_severe` column to irregularities and `severe_irregularities_count` to the corpus.
*   **Research Role**: Implements the dictionary-based classification rule: $Severe(I_{ij}) = 1 \iff \exists \ell \in \mathcal{L} : Match(\ell, \phi(I_{ij}))$. It produces the `severe_irregularities_count` feature, a key input for the PCA index (Section 3.1).

### **10. `build_pca_index` (Task 10 Orchestrator)**

*   **Inputs**: `df_corpus_with_counts` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Constructs matrix $X$; standardizes to $Z$; computes PCA; scores the index.
*   **Outputs**: `df_final` (pd.DataFrame), `artifacts` (Dict).
*   **Transformation**: Adds the `corruption_index` column to the corpus DataFrame.
*   **Research Role**: Implements the dimensionality reduction described in Section 3.1. It computes the first principal component $v_1$ of the standardized feature matrix $Z$ and projects the data: $\text{Corruption Index}_i = Z_i^\top v_1$.

### **11. `prepare_validation_samples` (Task 11 Orchestrator)**

*   **Inputs**: `df_corpus_with_index` (pd.DataFrame), `df_validation_raw` (pd.DataFrame).
*   **Processes**: Merges corpus and validation data; constructs agreement flags (`is_strict_agreement`, `is_near_agreement`); computes mean human-coded outcome.
*   **Outputs**: `df_analysis` (pd.DataFrame).
*   **Transformation**: Creates a merged analysis dataset with flags for validation subsamples.
*   **Research Role**: Constructs the specific samples used for validation in Table 1. It operationalizes "Strict Agreement" ($FF_i = GT_i$) and "Near Agreement" ($|FF_i - GT_i| \le 1$).

### **12. `run_table1_regressions` (Task 12 Orchestrator)**

*   **Inputs**: `df_analysis` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Estimates Equation (1) for four specifications (Strict, Near, Full FF, Full GT); validates against targets.
*   **Outputs**: `all_results` (Dict).
*   **Transformation**: Produces regression statistics ($\beta, R^2, N$) from the analysis data.
*   **Research Role**: Reproduces Table 1 ("Predicting Human Coders"). It estimates $HC_i = \alpha + \beta \text{Corruption Index}_i + \tau_t + \varepsilon_i$ to demonstrate criterion validity.

### **13. `run_table2_regressions` (Task 13 Orchestrator)**

*   **Inputs**: `df_analysis` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Prepares CGU samples (Levels, Log); estimates regressions; validates against targets.
*   **Outputs**: `results` (Dict).
*   **Transformation**: Produces regression statistics for CGU outcomes.
*   **Research Role**: Reproduces Table 2 ("Predicting CGU"). It validates the index against official severity classifications using both levels ($CGU_i$) and logs ($\log(CGU_i)$).

### **14. `run_table3_regressions` (Task 14 Orchestrator)**

*   **Inputs**: `df_analysis` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Prepares covariates (scaling, logs); estimates bivariate and multivariate regressions.
*   **Outputs**: `results` (Dict).
*   **Transformation**: Produces regression statistics for municipal correlates.
*   **Research Role**: Reproduces Table 3 ("Correlates of Corruption"). It tests construct validity by showing the index correlates with theoretical predictors like literacy and GDP per capita: $\text{Corruption Index}_i = \alpha + \gamma X_i + u_i$.

### **15. `run_main_pipeline` (Task 15 Orchestrator)**

*   **Inputs**: Raw dataframes, language, lexicon, configuration.
*   **Processes**: Calls Tasks 1-14 sequentially (Validation -> Cleansing -> Extraction -> NLP -> PCA -> Regression).
*   **Outputs**: `PipelineArtifacts` (Dataclass).
*   **Transformation**: Orchestrates the primary research pipeline from raw data to final regression tables.
*   **Research Role**: The central engine of the replication. It produces the "Automated Corruption Index" and all primary validation results (Tables 1, 2, 3).

### **16. `run_loo_analysis` (Task 16 Orchestrator)**

*   **Inputs**: `df_analysis` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Iteratively drops one observation, re-estimates the validation model, and records stability metrics.
*   **Outputs**: `results` (Dict containing detailed and summary stats).
*   **Transformation**: Produces a distribution of $\beta$ and $R^2$ values.
*   **Research Role**: Implements the Leave-One-Out sensitivity analysis (Figure 2). It assesses whether the validation results are driven by outliers, ensuring robustness of the measure.

### **17. `run_supervised_robustness` (Task 17 Orchestrator)**

*   **Inputs**: `df_irregularities`, `df_analysis`, `df_corpus_counts`, `study_configuration`.
*   **Processes**: Calls `build_supervised_data` and `train_and_compare`.
*   **Outputs**: `SupervisedArtifacts` (Dataclass).
*   **Transformation**: Orchestrates the entire supervised robustness pipeline.
*   **Research Role**: High-level wrapper for the supervised learning analysis described in Section 5.2 and Appendix A4.

### **18. `build_supervised_data` (Task 18 Orchestrator)**

*   **Inputs**: `df_irregularities` (pd.DataFrame), `df_analysis` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Constructs binary labels via median split; builds document corpus; vectorizes text (TF-IDF).
*   **Outputs**: `bundle` (Dict containing $X_{train}, y_{train}$, vectorizer).
*   **Transformation**: Converts text and labels into a format suitable for machine learning.
*   **Research Role**: Prepares the data for the supervised learning robustness check (Appendix A4). It implements the label definition $y_i = \mathbb{1}\{Score_i \ge \text{Median}\}$ and the TF-IDF weighting scheme.

### **19. `train_and_compare` (Task 19 Orchestrator)**

*   **Inputs**: `training_bundle` (Dict), `df_irregularities` (pd.DataFrame), `df_corpus_counts` (pd.DataFrame), `study_configuration` (Dict).
*   **Processes**: Trains classifiers (LR, NB, SVM); predicts severity for all irregularities; rebuilds PCA index; compares indices.
*   **Outputs**: `results` (Dict).
*   **Transformation**: Produces trained models, a new ML-based index, and comparison statistics.
*   **Research Role**: Executes the supervised learning robustness check. It verifies that an index built from ML-predicted severity ($CI_{ML}$) is highly correlated with the dictionary-based index ($CI_{dict}$), confirming the dictionary captures the same latent construct ($R^2 \approx 0.98$).

### **20. `execute_full_research_pipeline` (Top-Level Orchestrator)**

*   **Inputs**: Raw dataframes, language, lexicon, configuration.
*   **Processes**: Calls `run_main_pipeline`, `run_loo_analysis`, and `run_supervised_robustness`.
*   **Outputs**: `final_output` (Dict).
*   **Transformation**: Aggregates all research artifacts into a single return structure.
*   **Research Role**: The master controller for the entire project. It ensures that the main analysis, sensitivity checks, and robustness verifications are executed in the correct order and with consistent data, delivering the complete set of evidence presented in the paper.
<br><br>

### **Example Usage: End-to-End Corruption Measurement Pipeline**

This example demonstrates the execution of the `execute_full_research_pipeline` function. It covers:
1.  **Configuration Loading**: Reading the study parameters from `config.yaml`.
2.  **Data Synthesis**: Generating synthetic `df_raw_corpus` and `df_validation_raw` DataFrames that strictly adhere to the required schemas and data types.
3.  **Pipeline Execution**: Running the orchestrator and capturing the results.
4.  **Result Inspection**: Accessing the generated artifacts (indices, regression tables, robustness checks).

#### **1. Setup and Configuration Loading**

First, we import necessary libraries and load the study configuration. We assume `config.yaml` exists in the working directory.

```python
import pandas as pd
import numpy as np
import yaml
import logging

# Ensure logging is configured to see pipeline progress
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_study_configuration(config_path: str = "config.yaml") -> dict:
    """
    Loads the study configuration from a YAML file.
    """
    try:
        with open(config_path, 'r') as file:
            config = yaml.safe_load(file)
        logging.info(f"Configuration loaded successfully from {config_path}")
        return config
    except FileNotFoundError:
        logging.error(f"Configuration file not found at {config_path}")
        raise
    except yaml.YAMLError as e:
        logging.error(f"Error parsing YAML file: {e}")
        raise

# Load configuration
# Note: In a real scenario, ensure 'config.yaml' is present.
# For this example, we assume the dict structure matches the YAML content provided previously.
study_config = load_study_configuration()
```

#### **2. Synthetic Data Generation**

We generate synthetic data to simulate the raw inputs. This ensures the example is runnable and demonstrates the expected data structures.

**A. `df_raw_corpus` Generation**

This DataFrame represents the raw audit reports. We simulate both early lotteries (requiring regex extraction) and later lotteries (using summaries).

```python
def generate_synthetic_corpus(n_reports: int = 100) -> pd.DataFrame:
    """
    Generates a synthetic df_raw_corpus adhering to the strict schema.
    """
    np.random.seed(42)
    
    data = {
        'report_id': [f"RPT_{i:05d}" for i in range(n_reports)],
        'municipality_id': np.random.randint(1000000, 9999999, size=n_reports), # 7-digit IBGE
        'state': np.random.choice(['SP', 'MG', 'RJ', 'BA', 'RS'], size=n_reports),
        'lottery': np.random.randint(2, 35, size=n_reports),
        'year': np.random.randint(2003, 2012, size=n_reports),
        'text_extraction_mode': np.random.choice(['native_text', 'ocr'], size=n_reports),
        'page_count': np.random.randint(10, 200, size=n_reports),
        'image_count': np.random.randint(0, 50, size=n_reports),
        'image_count_definition': ["PDF XObjects"] * n_reports,
        'report_lines_count': np.random.randint(500, 5000, size=n_reports),
        'lines_count_definition': ["newline-delimited"] * n_reports,
        'pdf_source': [f"http://cgu.gov.br/reports/report_{i}.pdf" for i in range(n_reports)]
    }
    
    df = pd.DataFrame(data)
    
    # Logic for text fields based on lottery round
    # Lottery < 8: Full text extraction required
    # Lottery >= 8: Summary text preferred
    
    full_texts = []
    summary_texts = []
    has_summary = []
    
    for lottery in df['lottery']:
        # Simulate irregularity text
        # We inject known keywords from the lexicon to ensure matches
        irreg_text = "Constatação da Fiscalização: Indícios de fraude em licitação e empresa fantasma. Fato: Ocorreu desvio."
        
        if lottery < 8:
            full_texts.append(f"Header... {irreg_text} ... Footer")
            summary_texts.append(None) # No summary in early reports
            has_summary.append(False)
        else:
            full_texts.append(f"Full report text content...")
            # Simulate enumerated summary
            summary_texts.append("1.1 Irregularidade em licitação.\n1.2 Falta de medicamentos.")
            has_summary.append(True)
            
    df['report_full_text'] = full_texts
    df['report_summary_text'] = summary_texts
    df['has_summary_detected'] = has_summary
    
    return df

df_raw_corpus = generate_synthetic_corpus(n_reports=200)
logging.info(f"Generated df_raw_corpus with shape: {df_raw_corpus.shape}")
```

**B. `df_validation_raw` Generation**

This DataFrame contains the human-coded data and municipal covariates. We ensure overlap with the corpus `municipality_id` to allow merging.

```python
def generate_synthetic_validation(corpus_df: pd.DataFrame) -> pd.DataFrame:
    """
    Generates synthetic df_validation_raw matching corpus municipalities.
    """
    np.random.seed(42)
    
    # Get unique municipalities from corpus to ensure join matches
    unique_munis = corpus_df[['municipality_id', 'state']].drop_duplicates()
    n_munis = len(unique_munis)
    
    # Generate validation data
    data = {
        'municipality_id': unique_munis['municipality_id'].values,
        'state': unique_munis['state'].values,
        'year': np.random.choice([2003, 2004, 2005], size=n_munis), # Audit year context
        
        # Human coded counts (Poisson-like)
        'ff_corruption_count': np.random.poisson(lam=1.5, size=n_munis).astype(float),
        'gt_corruption_count': np.random.poisson(lam=2.0, size=n_munis).astype(float),
        
        # CGU counts (higher magnitude, only for some)
        'cgu_severe_count': np.random.poisson(lam=5.0, size=n_munis).astype(float),
        
        # Covariates (Reference Year 2000)
        'covariate_reference_year': [2000] * n_munis,
        'literacy_rate': np.random.uniform(0.70, 0.99, size=n_munis),
        'gdp_per_capita': np.random.lognormal(mean=8, sigma=1, size=n_munis),
        'urban_population_share': np.random.beta(a=2, b=1, size=n_munis),
        'population_total': np.random.randint(5000, 500000, size=n_munis).astype(float),
        'distance_to_capital_km': np.random.uniform(100, 3000, size=n_munis),
        'has_radio_fm': np.random.choice([0, 1], size=n_munis)
    }
    
    df = pd.DataFrame(data)
    
    # Introduce some missingness to test robustness
    mask_missing_cgu = np.random.rand(n_munis) < 0.3
    df.loc[mask_missing_cgu, 'cgu_severe_count'] = np.nan
    
    # Ensure population_log is derivable or present
    df['population_log'] = np.log(df['population_total'])
    
    return df

df_validation_raw = generate_synthetic_validation(df_raw_corpus)
logging.info(f"Generated df_validation_raw with shape: {df_validation_raw.shape}")
```

#### **3. Pipeline Execution**

We define the language and lexicon parameters, then invoke the top-level orchestrator.

```python
# Define Language
language = "Portuguese"

# Define Raw Lexicon List (Portuguese n-grams)
# This matches the structure expected by the dictionary normalization task
raw_lexicon_list = [
    "empresa fantasma",
    "empresa inexistente",
    "fraud",
    "conluio",
    "fals",
    "simulacao licitat",
    "montagem licitat",
    "superfaturamento"
]

# Execute the Pipeline
try:
    pipeline_results = execute_full_research_pipeline(
        df_raw_corpus=df_raw_corpus,
        df_validation_raw=df_validation_raw,
        language=language,
        raw_lexicon_list=raw_lexicon_list,
        study_configuration=study_config
    )
    
    print("\n" + "="*60)
    print("PIPELINE EXECUTION SUCCESSFUL")
    print("="*60)
    
except Exception as e:
    print(f"\nPipeline Execution Failed: {e}")
```

#### **4. Result Inspection**

The `pipeline_results` dictionary contains all artifacts. We can inspect specific outputs to verify the analysis.

```python
if 'pipeline_results' in locals():
    # 1. Inspect Main Pipeline Artifacts
    main_artifacts = pipeline_results['main_pipeline']
    
    print("\n--- Main Pipeline Artifacts ---")
    print(f"Cleaned Corpus Size: {len(main_artifacts['df_corpus_clean'])}")
    print(f"Extracted Irregularities: {len(main_artifacts['df_irregularities'])}")
    
    # Check PCA Index
    print("\n--- Corruption Index (Head) ---")
    print(main_artifacts['df_corpus_with_index'][['report_id', 'corruption_index']].head())
    
    # Check Table 1 Regression Results (Validation vs Human Coders)
    print("\n--- Table 1: Regression Results (Strict Agreement) ---")
    t1_res = main_artifacts['table1_results']['Strict Agreement']
    print(f"Beta: {t1_res.get('beta', 'N/A'):.4f}")
    print(f"R2:   {t1_res.get('R2', 'N/A'):.4f}")
    print(f"N:    {t1_res.get('N', 'N/A')}")

    # 2. Inspect Leave-One-Out Robustness
    loo_results = pipeline_results['loo_analysis']
    print("\n--- LOO Robustness Summary (Strict Agreement) ---")
    if 'summary' in loo_results and 'Strict' in loo_results['summary']:
        stats = loo_results['summary']['Strict']
        print(f"Beta Range: [{stats['beta_min']:.4f}, {stats['beta_max']:.4f}]")
        print(f"R2 Range:   [{stats['r2_min']:.4f}, {stats['r2_max']:.4f}]")

    # 3. Inspect Supervised Learning Robustness
    sup_results = pipeline_results['supervised_robustness']
    print("\n--- Supervised Learning Comparison ---")
    comp = sup_results['comparison_stats']
    print(f"Correlation (Dict-PCA vs ML-PCA): {comp.get('correlation', 0):.4f}")
    print(f"R2 (Dict-PCA vs ML-PCA):          {comp.get('R2', 0):.4f}")
```


In [None]:
# Task 1 — Validate schema, types, and uniqueness of `df_raw_corpus`

# ==============================================================================
# Task 1: Validate schema, types, and uniqueness of df_raw_corpus
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1: Verify presence and data types of all 15 required columns
# -------------------------------------------------------------------------------------------------------------------------------

def validate_corpus_schema(df: pd.DataFrame) -> None:
    """
    Validates the presence and semantic types of the 15 required columns in the raw corpus DataFrame.

    This function enforces the schema requirements derived from the research design. It checks for
    the existence of specific columns and validates their content types (e.g., integer-likeness
    for IDs, string format for states). It accumulates all validation errors and raises a single
    ValueError if any critical constraints are violated.

    Parameters
    ----------
    df : pd.DataFrame
        The raw corpus DataFrame containing audit report data.

    Raises
    ------
    ValueError
        If any required column is missing or if any column violates type constraints.
    """
    # List of required columns as specified in the task
    required_columns = [
        "report_id", "municipality_id", "state", "lottery", "year",
        "report_full_text", "report_summary_text", "has_summary_detected",
        "text_extraction_mode", "page_count", "image_count",
        "image_count_definition", "report_lines_count", "lines_count_definition",
        "pdf_source"
    ]

    # 1. Check for missing columns
    # Get the set of columns present in the DataFrame
    existing_columns = set(df.columns)
    # Identify any missing required columns
    missing_columns = set(required_columns) - existing_columns

    # Log extra columns for awareness (not a failure condition)
    extra_columns = existing_columns - set(required_columns)
    if extra_columns:
        logger.info(f"Extra columns detected in corpus: {extra_columns}")

    # Fail if columns are missing
    if missing_columns:
        error_msg = f"Missing required columns in df_raw_corpus: {missing_columns}"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # 2. Validate semantic types
    validation_errors = []

    # Helper to check integer-likeness (handles floats like 123.0)
    def is_integer_like(series: pd.Series) -> bool:
        # Drop NaNs for type check (null checks are separate)
        clean_series = series.dropna()
        if clean_series.empty:
            return True
        # Check if all values are numeric and equal to their floor
        is_numeric = pd.to_numeric(clean_series, errors='coerce').notna().all()
        if not is_numeric:
            return False
        # Check if values are effectively integers (e.g. 5.0 == 5)
        return np.all(np.mod(clean_series, 1) == 0)

    # Validate report_id (must be non-null)
    if df['report_id'].isnull().any():
        validation_errors.append("Column 'report_id' contains null values.")

    # Validate municipality_id (must be non-null, integer-like, 7 digits)
    if df['municipality_id'].isnull().any():
        validation_errors.append("Column 'municipality_id' contains null values.")
    elif not is_integer_like(df['municipality_id']):
        validation_errors.append("Column 'municipality_id' contains non-integer values.")

    # Validate state (non-null, 2 chars)
    if df['state'].isnull().any():
        validation_errors.append("Column 'state' contains null values.")
    # Check string length for non-nulls (convert to string first to handle potential object types)
    state_lengths = df['state'].dropna().astype(str).str.strip().str.len()
    if not (state_lengths == 2).all():
        validation_errors.append("Column 'state' contains values that are not 2 characters long.")

    # Validate lottery (non-null, integer-like, positive)
    if df['lottery'].isnull().any():
        validation_errors.append("Column 'lottery' contains null values.")
    elif not is_integer_like(df['lottery']):
        validation_errors.append("Column 'lottery' contains non-integer values.")
    elif (df['lottery'].dropna() <= 0).any():
        validation_errors.append("Column 'lottery' contains non-positive values.")

    # Validate year (non-null, integer-like, range 2003-2011)
    if df['year'].isnull().any():
        validation_errors.append("Column 'year' contains null values.")
    elif not is_integer_like(df['year']):
        validation_errors.append("Column 'year' contains non-integer values.")
    else:
        # Check range
        years = df['year'].dropna()
        if not ((years >= 2003) & (years <= 2011)).all():
             validation_errors.append("Column 'year' contains values outside the 2003-2011 range.")

    # Validate report_full_text (non-null)
    if df['report_full_text'].isnull().any():
        validation_errors.append("Column 'report_full_text' contains null values.")

    # Validate report_summary_text (nulls permitted, no check needed for nulls)

    # Validate has_summary_detected (non-null, boolean)
    if df['has_summary_detected'].isnull().any():
        validation_errors.append("Column 'has_summary_detected' contains null values.")
    # Check if values are boolean-like (True/False/0/1)
    if not df['has_summary_detected'].isin([True, False, 0, 1]).all():
         validation_errors.append("Column 'has_summary_detected' contains non-boolean values.")

    # Validate text_extraction_mode (non-null, controlled vocabulary)
    if df['text_extraction_mode'].isnull().any():
        validation_errors.append("Column 'text_extraction_mode' contains null values.")
    valid_modes = {'native_text', 'ocr'}
    if not df['text_extraction_mode'].isin(valid_modes).all():
        validation_errors.append(f"Column 'text_extraction_mode' contains invalid values. Allowed: {valid_modes}")

    # Validate numeric counts (page_count, image_count, report_lines_count)
    for col in ['page_count', 'image_count', 'report_lines_count']:
        if df[col].isnull().any():
            validation_errors.append(f"Column '{col}' contains null values.")
        elif not is_integer_like(df[col]):
            validation_errors.append(f"Column '{col}' contains non-integer values.")
        elif (df[col].dropna() < 0).any():
            validation_errors.append(f"Column '{col}' contains negative values.")

    # Validate definitions (non-null, non-empty string, constant)
    for col in ['image_count_definition', 'lines_count_definition']:
        if df[col].isnull().any():
            validation_errors.append(f"Column '{col}' contains null values.")
        elif (df[col].astype(str).str.strip() == "").any():
            validation_errors.append(f"Column '{col}' contains empty strings.")
        # Check constancy (uniqueness of definition)
        unique_defs = df[col].dropna().unique()
        if len(unique_defs) > 1:
            validation_errors.append(f"Column '{col}' is not constant across the dataset. Found: {unique_defs}")

    # Validate pdf_source (non-null, non-empty)
    if df['pdf_source'].isnull().any():
        validation_errors.append("Column 'pdf_source' contains null values.")
    elif (df['pdf_source'].astype(str).str.strip() == "").any():
        validation_errors.append("Column 'pdf_source' contains empty strings.")

    # Raise error if any validations failed
    if validation_errors:
        error_message = "Schema validation failed for df_raw_corpus:\n" + "\n".join(validation_errors)
        logger.error(error_message)
        raise ValueError(error_message)

    logger.info("Schema validation passed for df_raw_corpus.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Validate key uniqueness and referential consistency
# -------------------------------------------------------------------------------------------------------------------------------

def validate_corpus_uniqueness(df: pd.DataFrame) -> None:
    """
    Validates the uniqueness of primary and composite keys in the raw corpus DataFrame.

    This function ensures that `report_id` is globally unique, which is critical for
    downstream aggregation. It also checks for duplicate (municipality_id, lottery, year)
    tuples, which would indicate data duplication or ambiguous audit records.

    Parameters
    ----------
    df : pd.DataFrame
        The raw corpus DataFrame.

    Raises
    ------
    ValueError
        If `report_id` is not unique or if unjustified composite key duplicates exist.
    """
    # 1. Validate global uniqueness of report_id
    # Convert to string to handle potential mixed types and ensure strict uniqueness
    report_ids = df['report_id'].astype(str)
    if not report_ids.is_unique:
        duplicates = report_ids[report_ids.duplicated()].unique()
        error_msg = f"Duplicate report_id values found: {duplicates}"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # 2. Validate composite key uniqueness (municipality_id, lottery, year)
    # These define the audit event. Duplicates here imply multiple reports for the same audit event.
    composite_cols = ['municipality_id', 'lottery', 'year']

    # Check for duplicates
    if df.duplicated(subset=composite_cols).any():
        duplicate_rows = df[df.duplicated(subset=composite_cols, keep=False)]
        # Log the duplicates for inspection
        logger.error(f"Duplicate composite keys found for columns {composite_cols}.")
        logger.error(f"Duplicate rows sample:\n{duplicate_rows.head()}")

        # Per instructions: Hard fail unless explicit justification exists (none in schema)
        raise ValueError(f"Duplicate composite keys found in {composite_cols}. This implies data duplication or ambiguous audits.")

    # 3. Cross-validate state vs municipality_id (Conditional)
    # Note: Since no external IBGE mapping is provided in the function signature,
    # we perform a basic consistency check: same municipality_id should have same state.

    # Group by municipality_id and count unique states
    state_counts = df.groupby('municipality_id')['state'].nunique()
    inconsistent_munis = state_counts[state_counts > 1]

    if not inconsistent_munis.empty:
        error_msg = f"Inconsistent 'state' values found for municipality_ids: {inconsistent_munis.index.tolist()}"
        logger.error(error_msg)
        # This is a data integrity error (a muni cannot move states)
        raise ValueError(error_msg)

    logger.info("Uniqueness and referential consistency checks passed.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Validate logical consistency between lottery round and summary presence
# -------------------------------------------------------------------------------------------------------------------------------

def validate_lottery_summary_consistency(df: pd.DataFrame) -> None:
    """
    Validates the logical consistency between the lottery round number and the presence of
    summary text, enforcing the extraction regime logic.

    Rules:
    1. If lottery >= 8 AND has_summary_detected is True, report_summary_text should be present.
       (Violation is a Warning, as fallback is allowed).
    2. If lottery < 8, report_full_text must be non-empty.
       (Violation is a Critical Error, as extraction depends on full text).

    Parameters
    ----------
    df : pd.DataFrame
        The raw corpus DataFrame.

    Raises
    ------
    ValueError
        If critical consistency rules (Rule 2) are violated.
    """
    # Create a copy to avoid SettingWithCopy warnings on temporary columns
    df_check = df.copy()

    # Ensure text columns are treated as strings for length checks
    df_check['report_summary_text'] = df_check['report_summary_text'].astype(str)
    df_check['report_full_text'] = df_check['report_full_text'].astype(str)

    # Rule 1: Lottery >= 8 consistency
    # Condition: Lottery >= 8 AND Summary Detected
    mask_summary_expected = (df_check['lottery'] >= 8) & (df_check['has_summary_detected'] == True)

    # Check: Is summary text null or empty/whitespace?
    # Note: 'nan' string check handles string conversion of np.nan
    mask_summary_missing = (
        (df_check['report_summary_text'].str.strip() == "") |
        (df_check['report_summary_text'].str.lower() == 'nan') |
        (df_check['report_summary_text'].str.lower() == 'none')
    )

    # Identify violations
    violations_rule_1 = df_check[mask_summary_expected & mask_summary_missing]

    if not violations_rule_1.empty:
        count = len(violations_rule_1)
        sample_ids = violations_rule_1['report_id'].head(5).tolist()
        logger.warning(
            f"Rule 1 Violation: {count} reports with lottery >= 8 and summary detected "
            f"have missing summary text. Fallback extraction will be required. "
            f"Sample IDs: {sample_ids}"
        )

    # Rule 2: Lottery < 8 consistency
    # Condition: Lottery < 8
    mask_early_lottery = (df_check['lottery'] < 8)

    # Check: Is full text missing/empty?
    mask_fulltext_missing = (
        (df_check['report_full_text'].str.strip() == "") |
        (df_check['report_full_text'].str.lower() == 'nan') |
        (df_check['report_full_text'].str.lower() == 'none')
    )

    # Identify violations
    violations_rule_2 = df_check[mask_early_lottery & mask_fulltext_missing]

    if not violations_rule_2.empty:
        count = len(violations_rule_2)
        sample_ids = violations_rule_2['report_id'].head(5).tolist()
        error_msg = (
            f"Rule 2 Violation: {count} reports with lottery < 8 have missing full text. "
            f"Extraction is impossible for these records. Sample IDs: {sample_ids}"
        )
        logger.error(error_msg)
        raise ValueError(error_msg)

    logger.info("Lottery/Summary logical consistency checks completed.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_corpus_inputs(df_raw_corpus: pd.DataFrame) -> None:
    """
    Orchestrator function for Task 1: Validates the schema, types, uniqueness, and
    logical consistency of the raw corpus DataFrame.

    This function executes the validation steps in a strict sequence:
    1. Schema and Type Validation: Ensures all 15 required columns exist and contain valid data types.
    2. Uniqueness Validation: Ensures report_id is globally unique and composite keys are consistent.
    3. Logical Consistency: Checks alignment between lottery rounds and text availability.

    Parameters
    ----------
    df_raw_corpus : pd.DataFrame
        The raw corpus DataFrame to be validated.

    Raises
    ------
    ValueError
        If any critical validation step fails.
    """
    logger.info("Starting Task 1: Validate schema, types, and uniqueness of df_raw_corpus.")

    try:
        # Step 1: Schema and Types
        validate_corpus_schema(df_raw_corpus)

        # Step 2: Uniqueness
        validate_corpus_uniqueness(df_raw_corpus)

        # Step 3: Logical Consistency
        validate_lottery_summary_consistency(df_raw_corpus)

        logger.info("Task 1 completed successfully: df_raw_corpus is valid.")

    except ValueError as e:
        logger.critical(f"Task 1 Failed: {str(e)}")
        raise e
    except Exception as e:
        logger.critical(f"Task 1 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 2 — Validate schema, types, and join feasibility of df_validation_raw

# ==============================================================================
# Task 2: Validate schema, types, and join feasibility of df_validation_raw
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Verify presence and data types of all required columns
# -------------------------------------------------------------------------------------------------------------------------------

def validate_validation_schema(df: pd.DataFrame) -> None:
    """
    Validates the schema and semantic types of the validation DataFrame.

    This function ensures that `df_validation_raw` contains all necessary columns for
    reproducing Tables 1, 2, and 3 of the research paper. It enforces type constraints
    (e.g., integer-like IDs, numeric covariates) and checks for the existence of
    critical identifiers and outcome variables.

    Parameters
    ----------
    df : pd.DataFrame
        The raw validation DataFrame containing human-coded outcomes and covariates.

    Raises
    ------
    ValueError
        If required columns are missing or type constraints are violated.
    """
    # Define required columns
    # Note: population_log OR population_total is required; handled in logic below
    required_base = [
        "municipality_id", "state", "year",
        "ff_corruption_count", "gt_corruption_count", "cgu_severe_count",
        "literacy_rate", "gdp_per_capita", "urban_population_share",
        "distance_to_capital_km", "has_radio_fm", "covariate_reference_year"
    ]

    # 1. Check for missing columns
    existing_columns = set(df.columns)
    missing_base = set(required_base) - existing_columns

    # Handle population requirement (need at least one)
    has_pop_log = "population_log" in existing_columns
    has_pop_total = "population_total" in existing_columns

    if missing_base:
        error_msg = f"Missing required columns in df_validation_raw: {missing_base}"
        logger.error(error_msg)
        raise ValueError(error_msg)

    if not (has_pop_log or has_pop_total):
        error_msg = "Missing population data: either 'population_log' or 'population_total' must be present."
        logger.error(error_msg)
        raise ValueError(error_msg)

    # 2. Validate semantic types
    validation_errors = []

    # Helper for integer-likeness (reused concept from Task 1 for modularity)
    def is_integer_like(series: pd.Series) -> bool:
        clean = series.dropna()
        if clean.empty: return True
        is_num = pd.to_numeric(clean, errors='coerce').notna().all()
        if not is_num: return False
        return np.all(np.mod(clean, 1) == 0)

    # Validate municipality_id (non-null, integer-like, 7 digits)
    if df['municipality_id'].isnull().any():
        validation_errors.append("Column 'municipality_id' contains null values.")
    elif not is_integer_like(df['municipality_id']):
        validation_errors.append("Column 'municipality_id' contains non-integer values.")

    # Validate state (non-null, 2 chars)
    if df['state'].isnull().any():
        validation_errors.append("Column 'state' contains null values.")
    else:
        state_lens = df['state'].astype(str).str.strip().str.len()
        if not (state_lens == 2).all():
            validation_errors.append("Column 'state' contains values that are not 2 characters long.")

    # Validate year (integer-like if present)
    # Note: Year might be null if the row is purely for time-invariant covariates,
    # but schema implies it's an audit year key. We check integer-likeness for non-nulls.
    if not is_integer_like(df['year']):
        validation_errors.append("Column 'year' contains non-integer values.")

    # Validate outcomes (float-like, NaNs permitted)
    for col in ['ff_corruption_count', 'gt_corruption_count', 'cgu_severe_count']:
        # Check if non-null values are numeric
        clean = df[col].dropna()
        if not pd.to_numeric(clean, errors='coerce').notna().all():
            validation_errors.append(f"Column '{col}' contains non-numeric values.")

    # Validate covariates (float-like)
    covariates = [
        "literacy_rate", "gdp_per_capita", "urban_population_share",
        "distance_to_capital_km"
    ]
    if has_pop_log: covariates.append("population_log")
    if has_pop_total: covariates.append("population_total")

    for col in covariates:
        clean = df[col].dropna()
        if not pd.to_numeric(clean, errors='coerce').notna().all():
            validation_errors.append(f"Column '{col}' contains non-numeric values.")

    # Validate has_radio_fm (boolean-like: 0/1/True/False)
    clean_radio = df['has_radio_fm'].dropna()
    if not clean_radio.isin([0, 1, True, False]).all():
        validation_errors.append("Column 'has_radio_fm' contains values other than 0/1/True/False.")

    # Validate covariate_reference_year (integer-like, must contain 2000)
    if not is_integer_like(df['covariate_reference_year']):
        validation_errors.append("Column 'covariate_reference_year' contains non-integer values.")
    elif 2000 not in df['covariate_reference_year'].values:
        # Warning rather than error? No, Table 3 requires 2000. If not present, we can't reproduce.
        validation_errors.append("Column 'covariate_reference_year' does not contain the required year 2000.")

    if validation_errors:
        error_msg = "Schema validation failed for df_validation_raw:\n" + "\n".join(validation_errors)
        logger.error(error_msg)
        raise ValueError(error_msg)

    logger.info("Schema validation passed for df_validation_raw.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Validate join feasibility with corpus table
# -------------------------------------------------------------------------------------------------------------------------------

def validate_join_feasibility(df_validation: pd.DataFrame, df_corpus: pd.DataFrame) -> None:
    """
    Validates that the validation DataFrame can be successfully joined with the corpus DataFrame.

    This function checks for the existence of a valid join key (canonical municipality_id)
    and verifies that the intersection of keys is sufficient for analysis. It also checks
    for potential many-to-many join issues by analyzing key uniqueness.

    Parameters
    ----------
    df_validation : pd.DataFrame
        The validation DataFrame.
    df_corpus : pd.DataFrame
        The raw corpus DataFrame.

    Raises
    ------
    ValueError
        If the intersection of municipality IDs is zero or if join keys are fundamentally incompatible.
    """
    # 1. Canonicalize keys for comparison (temporary copies)
    # Ensure 7-digit zero-padded strings
    muni_corpus = df_corpus['municipality_id'].dropna().astype(int).astype(str).str.zfill(7)
    muni_val = df_validation['municipality_id'].dropna().astype(int).astype(str).str.zfill(7)

    # 2. Check Intersection
    common_munis = set(muni_corpus) & set(muni_val)
    intersection_count = len(common_munis)

    if intersection_count == 0:
        error_msg = "Zero intersection between corpus and validation municipality IDs. Check ID formats."
        logger.error(error_msg)
        raise ValueError(error_msg)

    logger.info(f"Found {intersection_count} common municipalities between corpus and validation data.")

    # 3. Check for Many-to-Many Risk
    # The corpus naturally has multiple reports per municipality (across years/lotteries).
    # The validation data structure determines the join strategy.

    # Check uniqueness of municipality_id in validation data
    val_muni_is_unique = muni_val.is_unique

    # Check uniqueness of (municipality_id, year) in validation data
    # (Use original df columns for composite check)
    val_composite_is_unique = df_validation.set_index(['municipality_id', 'year']).index.is_unique

    if val_muni_is_unique:
        logger.info("Validation data is unique at municipality level. Safe for Many-to-One join.")
    elif val_composite_is_unique:
        logger.info("Validation data is unique at (municipality, year) level. Join must use composite key.")
    else:
        # If neither is unique, we have duplicates in validation data that could cause row inflation
        logger.warning(
            "Validation data is NOT unique at municipality OR (municipality, year) level. "
            "Risk of Many-to-Many join inflation. Inspect duplicates in df_validation_raw."
        )
        # We don't hard fail here, but we flag it for the join implementation task (Task 11)

    # 4. Check coverage for specific outcomes
    # How many corpus reports have a match in FF/GT/CGU?
    # We simulate a join on municipality_id (most optimistic coverage)

    # Filter validation to rows with outcomes
    has_ff = df_validation[df_validation['ff_corruption_count'].notna()]['municipality_id'].astype(int).astype(str).str.zfill(7)
    has_gt = df_validation[df_validation['gt_corruption_count'].notna()]['municipality_id'].astype(int).astype(str).str.zfill(7)

    ff_coverage = len(set(muni_corpus) & set(has_ff))
    gt_coverage = len(set(muni_corpus) & set(has_gt))

    logger.info(f"Potential corpus reports with FF data: {ff_coverage} municipalities.")
    logger.info(f"Potential corpus reports with GT data: {gt_coverage} municipalities.")

    if ff_coverage < 10 or gt_coverage < 10:
        logger.warning("Very low overlap with FF/GT validation data. Check if datasets cover the same geographic/temporal scope.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Validate covariate completeness for Table 3
# -------------------------------------------------------------------------------------------------------------------------------

def validate_covariate_completeness(df: pd.DataFrame) -> None:
    """
    Validates the completeness of covariates required for Table 3 reproduction.

    This function filters the validation data to the reference year 2000 (as specified
    in the paper's Table 3 notes) and calculates the missingness rate for each
    required covariate.

    Parameters
    ----------
    df : pd.DataFrame
        The validation DataFrame.

    Raises
    ------
    ValueError
        If the reference year 2000 is missing from the data.
    """
    # 1. Filter to reference year 2000
    df_2000 = df[df['covariate_reference_year'] == 2000]

    if df_2000.empty:
        error_msg = "No data found for covariate_reference_year == 2000. Cannot reproduce Table 3."
        logger.error(error_msg)
        raise ValueError(error_msg)

    n_samples = len(df_2000)
    logger.info(f"Found {n_samples} observations for Table 3 (Year 2000).")

    # 2. Check missingness for required covariates
    covariates = [
        "literacy_rate", "gdp_per_capita", "urban_population_share",
        "distance_to_capital_km", "has_radio_fm"
    ]
    # Add population variable (prefer log, fallback to total)
    if "population_log" in df.columns:
        covariates.append("population_log")
    elif "population_total" in df.columns:
        covariates.append("population_total")

    missing_stats = {}
    for col in covariates:
        n_missing = df_2000[col].isnull().sum()
        pct_missing = (n_missing / n_samples) * 100
        missing_stats[col] = pct_missing

        if pct_missing > 10:
            logger.warning(f"High missingness for Table 3 covariate '{col}': {pct_missing:.2f}%")

    logger.info(f"Table 3 Covariate Missingness (Year 2000): {missing_stats}")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_validation_inputs(df_validation_raw: pd.DataFrame, df_raw_corpus: pd.DataFrame) -> None:
    """
    Orchestrator function for Task 2: Validates the schema, types, and join feasibility
    of the validation DataFrame.

    Executes validation steps in sequence:
    1. Schema Validation: Checks for required columns and types.
    2. Join Feasibility: Verifies overlap with corpus and checks for join risks.
    3. Covariate Completeness: Checks data availability for Table 3 (Year 2000).

    Parameters
    ----------
    df_validation_raw : pd.DataFrame
        The raw validation DataFrame.
    df_raw_corpus : pd.DataFrame
        The raw corpus DataFrame (for join checking).

    Raises
    ------
    ValueError
        If critical validation steps fail.
    """
    logger.info("Starting Task 2: Validate schema, types, and join feasibility of df_validation_raw.")

    try:
        # Step 1: Schema and Types
        validate_validation_schema(df_validation_raw)

        # Step 2: Join Feasibility
        validate_join_feasibility(df_validation_raw, df_raw_corpus)

        # Step 3: Covariate Completeness
        validate_covariate_completeness(df_validation_raw)

        logger.info("Task 2 completed successfully: df_validation_raw is valid.")

    except ValueError as e:
        logger.critical(f"Task 2 Failed: {str(e)}")
        raise e
    except Exception as e:
        logger.critical(f"Task 2 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 3 — Validate language, raw_lexicon_list, and study_configuration

# ==============================================================================
# Task 3: Validate language, raw_lexicon_list, and study_configuration
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Validate language parameter
# -------------------------------------------------------------------------------------------------------------------------------

def validate_language_parameter(language: str) -> str:
    """
    Validates the language parameter against allowed values and enforces replication fidelity.

    Ensures the language is either "English" or "Portuguese". For strict replication of the
    Brazilian audit paper, "Portuguese" is expected. Emits warnings if "English" is selected,
    as this implies translation artifacts not present in the original study.

    Parameters
    ----------
    language : str
        The language of the corpus and lexicon.

    Returns
    -------
    str
        The validated, title-cased language string.

    Raises
    ------
    ValueError
        If the language is not in the allowed set.
    """
    allowed_languages = {"English", "Portuguese"}

    # Normalize input
    lang_normalized = language.strip().title()

    if lang_normalized not in allowed_languages:
        error_msg = f"Invalid language '{language}'. Allowed values: {allowed_languages}"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Fidelity check
    if lang_normalized == "English":
        logger.warning(
            "Language set to 'English'. The original study uses Portuguese text and dictionaries. "
            "Results will not match the paper unless the corpus has been perfectly translated."
        )
    else:
        logger.info("Language set to 'Portuguese', consistent with the original study.")

    return lang_normalized


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Validate lexicon structure and content
# -------------------------------------------------------------------------------------------------------------------------------

def validate_lexicon_content(raw_lexicon_list: List[str], dictionary_config: Dict[str, Any]) -> None:
    """
    Validates the structure and semantic content of the raw lexicon list.

    Ensures the lexicon is a non-empty list of strings and contains at least one known
    severe irregularity term from the study (Appendix A2). Checks consistency with
    external resource configuration.

    Parameters
    ----------
    raw_lexicon_list : List[str]
        The list of raw n-grams/tokens for the dictionary.
    dictionary_config : Dict[str, Any]
        The dictionary configuration section from study parameters.

    Raises
    ------
    ValueError
        If the lexicon is empty, malformed, or lacks critical semantic terms.
    """
    # 1. Structure check
    if not isinstance(raw_lexicon_list, list):
        raise ValueError("raw_lexicon_list must be a list.")

    if not raw_lexicon_list:
        raise ValueError("raw_lexicon_list is empty.")

    # Check for non-string entries
    non_strings = [x for x in raw_lexicon_list if not isinstance(x, str)]
    if non_strings:
        raise ValueError(f"raw_lexicon_list contains non-string entries: {non_strings[:5]}...")

    # 2. Content check (Sanity check for known terms)
    # Terms from Appendix A2 (stemmed or raw)
    known_terms = {
        "empresa fantasma", "empresa inexistente", "fraud", "conluio", "fals",
        "simulacao licitat", "montagem licitat"
    }

    # Check if any known term (or a substring of it) is present in the list
    # We check loosely here because the input list might be stemmed or unstemmed
    found_known_term = False
    for term in raw_lexicon_list:
        term_lower = term.lower()
        if any(k in term_lower for k in known_terms):
            found_known_term = True
            break

    if not found_known_term:
        logger.warning(
            "raw_lexicon_list does not appear to contain known severe terms from Appendix A2 "
            f"(e.g., {known_terms}). Ensure the dictionary is correct."
        )
    else:
        logger.info("raw_lexicon_list contains expected semantic terms.")

    # 3. External resource check
    dict_url = dictionary_config.get("dictionary_url")
    if dict_url:
        logger.info(f"Using lexicon list provided in memory. Verify consistency with external resource: {dict_url}")

    logger.info(f"Lexicon validation passed. Size: {len(raw_lexicon_list)} entries.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Validate study_configuration coherence
# -------------------------------------------------------------------------------------------------------------------------------

def validate_study_config_coherence(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Validates the internal coherence of the study configuration dictionary against
    the research design specifications.

    Enforces critical parameters for replication:
    - Parsing: lottery cutoff = 8.
    - PCA: eigenvalue threshold = 1.0, exact 5 input features.
    - Econometrics: state fixed effects, robust SEs.
    - Dictionary: match mode resolution.

    Parameters
    ----------
    config : Dict[str, Any]
        The full study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        A validated 'run manifest' dictionary containing the active configuration.

    Raises
    ------
    ValueError
        If any critical parameter violates the research design.
    """
    pipeline_params = config.get("pipeline_parameters", {})

    # 1. Parsing Config
    parsing = pipeline_params.get("parsing_config", {})
    if parsing.get("lottery_cutoff") != 8:
        raise ValueError(f"Invalid lottery_cutoff: {parsing.get('lottery_cutoff')}. Must be 8 per Section 2.")

    # Check markers
    start_marker = parsing.get("regex_start_marker")
    if not start_marker or "Constatação" not in start_marker:
        logger.warning(f"regex_start_marker '{start_marker}' may not match Appendix A1.3 requirements.")

    # 2. PCA Config
    pca = pipeline_params.get("pca_config", {})
    if pca.get("eigenvalue_threshold") != 1.0:
        raise ValueError("PCA eigenvalue_threshold must be 1.0 (Kaiser criterion).")

    required_pca_features = [
        "image_count", "severe_irregularities_count", "page_count",
        "report_lines_count", "total_irregularities_count"
    ]
    configured_features = pca.get("pca_input_features", [])
    if configured_features != required_pca_features:
        raise ValueError(
            f"PCA input features mismatch.\nExpected: {required_pca_features}\nFound: {configured_features}"
        )

    # 3. Econometrics Config
    metrics = pipeline_params.get("econometrics_config", {})
    if metrics.get("validation_fixed_effects") != "state":
        raise ValueError("validation_fixed_effects must be 'state' per Equation (1).")

    if not metrics.get("robust_se_type"):
        # Enforce explicit setting
        raise ValueError("robust_se_type must be explicitly set (e.g., 'HC1').")

    # 4. Dictionary Config
    dict_conf = pipeline_params.get("dictionary_config", {})
    match_mode = dict_conf.get("match_mode")
    if match_mode == "regex_or_ngram_search":
        # Resolve ambiguity: Default to n-gram search for fidelity unless regex is strictly needed
        logger.info("Resolving ambiguous match_mode 'regex_or_ngram_search' to 'ngram_search' for primary pipeline.")
        dict_conf["resolved_match_mode"] = "ngram_search"
    elif match_mode not in ["ngram_search", "regex_search"]:
        raise ValueError(f"Unknown match_mode: {match_mode}")

    # 5. NLP Config
    nlp = pipeline_params.get("nlp_config", {})
    if nlp.get("stemmer_algorithm") != "nltk.stem.PorterStemmer":
        logger.warning("Stemmer is not PorterStemmer. This deviates from Appendix A4.")

    logger.info("Study configuration coherence check passed.")

    # Return the validated config as the run manifest
    return config


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_config_inputs(
    language: str,
    raw_lexicon_list: List[str],
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 3: Validates the language, lexicon, and study configuration.

    Executes validation steps in sequence:
    1. Language Validation: Checks against allowed set.
    2. Lexicon Validation: Checks structure and semantic content.
    3. Configuration Coherence: Enforces research design parameters.

    Parameters
    ----------
    language : str
        The language of the study.
    raw_lexicon_list : List[str]
        The list of raw dictionary terms.
    study_configuration : Dict[str, Any]
        The full study configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        The validated study configuration (run manifest).

    Raises
    ------
    ValueError
        If any validation step fails.
    """
    logger.info("Starting Task 3: Validate language, raw_lexicon_list, and study_configuration.")

    try:
        # Step 1: Language
        validated_lang = validate_language_parameter(language)

        # Step 2: Lexicon
        dict_config = study_configuration.get("pipeline_parameters", {}).get("dictionary_config", {})
        validate_lexicon_content(raw_lexicon_list, dict_config)

        # Step 3: Config Coherence
        run_manifest = validate_study_config_coherence(study_configuration)

        # Inject validated language into manifest for downstream reference
        run_manifest["validated_language"] = validated_lang

        logger.info("Task 3 completed successfully: Configuration is valid.")
        return run_manifest

    except ValueError as e:
        logger.critical(f"Task 3 Failed: {str(e)}")
        raise e
    except Exception as e:
        logger.critical(f"Task 3 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 4 — Cleanse identifiers and categorical fields in both DataFrames

# ==============================================================================
# Task 4: Cleanse identifiers and categorical fields in both DataFrames
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Normalize state codes and identifiers
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_identifiers(df: pd.DataFrame, id_col: str = 'municipality_id', state_col: str = 'state') -> pd.DataFrame:
    """
    Canonicalizes municipality identifiers and state codes.

    Creates 'municipality_id_canon' (7-digit zero-padded string) and 'state_canon'
    (2-char uppercase string). Preserves original columns.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame (corpus or validation).
    id_col : str
        Name of the municipality ID column.
    state_col : str
        Name of the state column.

    Returns
    -------
    pd.DataFrame
        DataFrame with added canonical columns.
    """
    df_out = df.copy()

    # 1. Canonicalize Municipality ID
    # Handle potential float representation (e.g. 1234567.0) -> int -> str -> zfill
    # Coerce to numeric first to handle strings with decimals, then to int (floor), then string

    # Helper to safely convert to canonical string
    def to_canon_id(series):
        # Force numeric, coerce errors to NaN
        nums = pd.to_numeric(series, errors='coerce')
        # Drop NaNs for formatting, fill with placeholder to avoid error, then mask back
        # We use -1 as placeholder for NaN
        filled = nums.fillna(-1).astype(int).astype(str)
        # Pad with zeros to 7 digits
        padded = filled.str.zfill(7)
        # Restore NaNs
        return padded.where(nums.notna(), other=None)

    df_out[f'{id_col}_canon'] = to_canon_id(df_out[id_col])

    # 2. Canonicalize State
    # Strip whitespace, uppercase
    if state_col in df_out.columns:
        df_out[f'{state_col}_canon'] = df_out[state_col].astype(str).str.strip().str.upper()
        # Handle "NAN" or "NONE" strings resulting from null conversion
        mask_null = df_out[state_col].isnull() | df_out[f'{state_col}_canon'].isin(['NAN', 'NONE', ''])
        df_out.loc[mask_null, f'{state_col}_canon'] = None

        # Validate length (2 chars)
        mask_invalid_len = df_out[f'{state_col}_canon'].str.len() != 2
        df_out.loc[mask_invalid_len, f'{state_col}_canon'] = None

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Normalize categorical fields
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_categoricals(df_corpus: pd.DataFrame, df_validation: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Normalizes categorical fields to controlled vocabularies.

    - text_extraction_mode -> {'native_text', 'ocr'}
    - has_summary_detected -> boolean
    - has_radio_fm -> {0, 1}

    Parameters
    ----------
    df_corpus : pd.DataFrame
        Raw corpus DataFrame.
    df_validation : pd.DataFrame
        Raw validation DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        Normalized corpus and validation DataFrames.
    """
    df_c = df_corpus.copy()
    df_v = df_validation.copy()

    # 1. Normalize text_extraction_mode
    # Map various inputs to controlled vocabulary
    mode_map = {
        'native_text': 'native_text', 'native': 'native_text', 'text': 'native_text',
        'ocr': 'ocr', 'scanned': 'ocr', 'image': 'ocr'
    }

    # Normalize input string first
    raw_modes = df_c['text_extraction_mode'].astype(str).str.lower().str.strip()
    df_c['text_extraction_mode'] = raw_modes.map(mode_map)

    # Log unmapped values
    unmapped = df_c[df_c['text_extraction_mode'].isnull()]['text_extraction_mode']
    if not unmapped.empty:
        logger.warning(f"Found {len(unmapped)} rows with invalid text_extraction_mode. Setting to None.")

    # 2. Normalize has_summary_detected
    # Coerce to boolean
    # Handle string 'true'/'false', 1/0
    def to_bool(val):
        if pd.isna(val): return None
        if isinstance(val, bool): return val
        s = str(val).lower().strip()
        if s in ['true', '1', '1.0', 'yes']: return True
        if s in ['false', '0', '0.0', 'no']: return False
        return None

    df_c['has_summary_detected'] = df_c['has_summary_detected'].apply(to_bool)

    # 3. Normalize has_radio_fm
    # Coerce to 0/1 integer
    def to_int_bool(val):
        if pd.isna(val): return None
        if isinstance(val, (bool, np.bool_)): return int(val)
        try:
            f = float(val)
            return int(f) if f in [0.0, 1.0] else None
        except (ValueError, TypeError):
            s = str(val).lower().strip()
            if s in ['true', 'yes']: return 1
            if s in ['false', 'no']: return 0
            return None

    df_v['has_radio_fm'] = df_v['has_radio_fm'].apply(to_int_bool)

    return df_c, df_v


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Handle missing or malformed identifiers
# -------------------------------------------------------------------------------------------------------------------------------

def quarantine_invalid_rows(df: pd.DataFrame, id_cols: list, context: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Splits DataFrame into clean and quarantined subsets based on identifier validity.

    Rows with null values in any of the specified `id_cols` are moved to the quarantine DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame.
    id_cols : list
        List of column names that must be non-null (e.g., ['report_id', 'municipality_id_canon']).
    context : str
        Label for logging (e.g., "Corpus", "Validation").

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        (df_clean, df_quarantined)
    """
    # Check for nulls in required columns
    mask_valid = df[id_cols].notna().all(axis=1)

    df_clean = df[mask_valid].copy()
    df_quarantined = df[~mask_valid].copy()

    if not df_quarantined.empty:
        count = len(df_quarantined)
        logger.warning(f"[{context}] Quarantined {count} rows due to missing/invalid identifiers in {id_cols}.")
        # Log sample of issues
        sample = df_quarantined[id_cols].head()
        logger.info(f"[{context}] Quarantine sample:\n{sample}")

        # Add reason column
        df_quarantined['quarantine_reason'] = 'Missing identifiers: ' + str(id_cols)

    return df_clean, df_quarantined


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_identifiers(df_raw_corpus: pd.DataFrame, df_validation_raw: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Orchestrator function for Task 4: Cleanse identifiers and categorical fields.

    Executes:
    1. Identifier Normalization (Canonicalization).
    2. Categorical Normalization.
    3. Quarantine of rows with invalid identifiers.

    Parameters
    ----------
    df_raw_corpus : pd.DataFrame
        Raw corpus DataFrame.
    df_validation_raw : pd.DataFrame
        Raw validation DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]
        (df_corpus_clean, df_validation_clean, df_corpus_quarantined, df_validation_quarantined)
    """
    logger.info("Starting Task 4: Cleanse identifiers and categorical fields.")

    try:
        # Step 1: Normalize Identifiers
        df_c_norm = normalize_identifiers(df_raw_corpus, id_col='municipality_id', state_col='state')
        df_v_norm = normalize_identifiers(df_validation_raw, id_col='municipality_id', state_col='state')

        # Step 2: Normalize Categoricals
        df_c_cat, df_v_cat = normalize_categoricals(df_c_norm, df_v_norm)

        # Step 3: Quarantine
        # Corpus requires report_id, municipality_id_canon, state_canon
        # Note: state_canon is required for FE later.
        corpus_req_cols = ['report_id', 'municipality_id_canon', 'state_canon']
        df_c_clean, df_c_quar = quarantine_invalid_rows(df_c_cat, corpus_req_cols, "Corpus")

        # Validation requires municipality_id_canon, state_canon
        val_req_cols = ['municipality_id_canon', 'state_canon']
        df_v_clean, df_v_quar = quarantine_invalid_rows(df_v_cat, val_req_cols, "Validation")

        logger.info(f"Task 4 completed. Retained {len(df_c_clean)} corpus rows, {len(df_v_clean)} validation rows.")

        return df_c_clean, df_v_clean, df_c_quar, df_v_quar

    except Exception as e:
        logger.critical(f"Task 4 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 5 — Cleanse text fields without destroying evidence

# ==============================================================================
# Task 5: Cleanse text fields without destroying evidence
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1: Enforce UTF-8 validity and normalize line endings
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_text_encoding(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalizes text encoding and line endings while preserving original content.

    - Creates '_original' copies of text columns.
    - Enforces UTF-8 validity (replacing errors with U+FFFD).
    - Normalizes line endings (\r\n, \r -> \n).

    Parameters
    ----------
    df : pd.DataFrame
        Input corpus DataFrame.

    Returns
    -------
    pd.DataFrame
        DataFrame with normalized text columns and preserved originals.
    """
    df_out = df.copy()
    text_cols = ['report_full_text', 'report_summary_text']

    for col in text_cols:
        if col not in df_out.columns:
            continue

        # 1. Preserve Evidence
        df_out[f'{col}_original'] = df_out[col]

        # 2. Enforce UTF-8 and Normalize Line Endings
        # We use a helper to apply logic safely to non-null values
        def clean_text(series):
            # Ensure string type, handle NaNs
            # decode/encode cycle to fix bad bytes if strictly needed,
            # but pandas usually handles UTF-8.
            # Here we focus on line endings and ensuring valid string objects.

            # Coerce to string, preserving NaNs
            s = series.astype(str).where(series.notna(), None)

            # Replace line endings
            # Note: regex=True is default for str.replace in newer pandas, but explicit is better
            s = s.str.replace(r'\r\n', '\n', regex=True).str.replace(r'\r', '\n', regex=True)

            # Force UTF-8 validity (in case of byte-like garbage)
            # This is implicit in Python 3 strings, but if we had bytes we'd decode.
            # Assuming input is already object/string dtype.
            return s

        df_out[col] = clean_text(df_out[col])

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 2: Handle null and empty text fields
# -------------------------------------------------------------------------------------------------------------------------------

def validate_text_content(df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Validates text content and quarantines rows with critical text failures.

    - Sets 'text_extraction_valid' flag.
    - Quarantines rows where 'report_full_text' is empty/null.
    - Retains rows with missing summaries (valid for early lotteries).

    Parameters
    ----------
    df : pd.DataFrame
        Normalized corpus DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        (df_clean, df_quarantined)
    """
    df_out = df.copy()

    # 1. Check Full Text Validity
    # Valid if not null AND not whitespace-only
    # Handle potential non-string types gracefully
    full_text = df_out['report_full_text'].astype(str).replace('nan', '')
    is_valid = full_text.str.strip().str.len() > 0

    df_out['text_extraction_valid'] = is_valid

    # 2. Quarantine Invalid Rows
    # Critical failure: No full text
    mask_critical_failure = ~is_valid

    df_clean = df_out[~mask_critical_failure].copy()
    df_quarantined = df_out[mask_critical_failure].copy()

    if not df_quarantined.empty:
        count = len(df_quarantined)
        logger.warning(f"Quarantined {count} rows due to empty/missing 'report_full_text'.")
        df_quarantined['quarantine_reason'] = 'Missing report_full_text'

    return df_clean, df_quarantined


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Log data quality metrics
# -------------------------------------------------------------------------------------------------------------------------------

def compute_text_quality_metrics(df_clean: pd.DataFrame, df_quarantined: pd.DataFrame) -> Dict[str, Any]:
    """
    Computes data quality metrics for text fields.

    Metrics:
    - Count of empty full text (from quarantine).
    - Count of missing summaries for lottery >= 8.
    - Distribution of text_extraction_mode.

    Parameters
    ----------
    df_clean : pd.DataFrame
        Cleansed corpus DataFrame.
    df_quarantined : pd.DataFrame
        Quarantined corpus DataFrame.

    Returns
    -------
    Dict[str, Any]
        Dictionary of QA metrics.
    """
    metrics = {}

    # 1. Empty Full Text Count
    metrics['count_empty_full_text'] = len(df_quarantined)

    # 2. Missing Summaries (Lottery >= 8)
    # Check in clean data
    if not df_clean.empty:
        mask_late_lottery = df_clean['lottery'] >= 8
        # Check for null or empty summary
        summary_text = df_clean['report_summary_text'].astype(str).replace('nan', '').replace('None', '')
        mask_missing_summary = summary_text.str.strip().str.len() == 0

        count_missing_summary_late = len(df_clean[mask_late_lottery & mask_missing_summary])
        metrics['count_missing_summary_lottery_ge_8'] = count_missing_summary_late

        # 3. Extraction Mode Distribution
        mode_dist = df_clean['text_extraction_mode'].value_counts().to_dict()
        metrics['text_extraction_mode_distribution'] = mode_dist
    else:
        metrics['count_missing_summary_lottery_ge_8'] = 0
        metrics['text_extraction_mode_distribution'] = {}

    logger.info(f"Text Quality Metrics: {metrics}")
    return metrics


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_text_fields(df_raw_corpus: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrator function for Task 5: Cleanse text fields.

    Executes:
    1. Text Normalization (Encoding, Line Endings).
    2. Content Validation & Quarantine.
    3. Metric Computation.

    Parameters
    ----------
    df_raw_corpus : pd.DataFrame
        Input corpus DataFrame (from Task 4).

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]
        (df_clean, df_quarantined, metrics_dict)
    """
    logger.info("Starting Task 5: Cleanse text fields.")

    try:
        # Step 1: Normalize
        df_norm = normalize_text_encoding(df_raw_corpus)

        # Step 2: Validate & Quarantine
        df_clean, df_quar = validate_text_content(df_norm)

        # Step 3: Metrics
        metrics = compute_text_quality_metrics(df_clean, df_quar)

        logger.info(f"Task 5 completed. Retained {len(df_clean)} rows.")
        return df_clean, df_quar, metrics

    except Exception as e:
        logger.critical(f"Task 5 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 6 — Cleanse numeric metadata fields

# ==============================================================================
# Task 6: Cleanse numeric metadata fields
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1: Validate and coerce numeric types
# -------------------------------------------------------------------------------------------------------------------------------

def validate_numeric_types(df: pd.DataFrame) -> pd.DataFrame:
    """
    Validates and coerces numeric metadata columns to integers.

    Enforces:
    - Integer-likeness (no fractional parts).
    - Non-negativity.
    - Non-nullness.

    Parameters
    ----------
    df : pd.DataFrame
        Input corpus DataFrame.

    Returns
    -------
    pd.DataFrame
        DataFrame with validated integer columns.

    Raises
    ------
    ValueError
        If validation fails (non-integer, negative, or null values).
    """
    df_out = df.copy()
    numeric_cols = ['page_count', 'image_count', 'report_lines_count']

    for col in numeric_cols:
        # 1. Check Nulls
        if df_out[col].isnull().any():
            raise ValueError(f"Column '{col}' contains null values.")

        # 2. Check Integer-likeness
        # Coerce to numeric, raising error on non-numeric strings
        series = pd.to_numeric(df_out[col], errors='raise')

        # Check for fractional parts
        if not np.all(np.mod(series, 1) == 0):
            raise ValueError(f"Column '{col}' contains non-integer values (fractional parts).")

        # 3. Check Non-negativity
        if (series < 0).any():
            raise ValueError(f"Column '{col}' contains negative values.")

        # 4. Cast to Integer
        df_out[col] = series.astype(int)

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 2: Validate definition consistency
# -------------------------------------------------------------------------------------------------------------------------------

def validate_definition_consistency(df: pd.DataFrame) -> Dict[str, str]:
    """
    Validates that counting definitions are constant across the dataset.

    Parameters
    ----------
    df : pd.DataFrame
        Input corpus DataFrame.

    Returns
    -------
    Dict[str, str]
        Dictionary of accepted definitions.

    Raises
    ------
    ValueError
        If definitions vary across rows.
    """
    definitions = {}
    def_cols = ['image_count_definition', 'lines_count_definition']

    for col in def_cols:
        # Normalize strings (strip whitespace)
        normalized = df[col].astype(str).str.strip()

        # Check uniqueness
        unique_defs = normalized.unique()

        if len(unique_defs) == 0:
             raise ValueError(f"Column '{col}' is empty.")

        if len(unique_defs) > 1:
            raise ValueError(f"Column '{col}' has multiple definitions: {unique_defs}")

        definitions[col] = unique_defs[0]
        logger.info(f"Accepted definition for {col}: {unique_defs[0]}")

    return definitions


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Handle outliers and implausible values
# -------------------------------------------------------------------------------------------------------------------------------

def flag_outliers(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Flags outliers and computes summary statistics for numeric fields.

    Flags:
    - page_count > 1000
    - report_lines_count == 0 (if full text exists)

    Parameters
    ----------
    df : pd.DataFrame
        Validated corpus DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, Any]]
        (DataFrame with flag columns, Statistics dictionary)
    """
    df_out = df.copy()
    stats = {}

    # 1. Compute Summary Statistics
    numeric_cols = ['page_count', 'image_count', 'report_lines_count']
    for col in numeric_cols:
        desc = df_out[col].describe()
        stats[col] = {
            'min': int(desc['min']),
            'median': int(desc['50%']),
            'max': int(desc['max']),
            'mean': float(desc['mean'])
        }

    # 2. Flag Page Count Outliers
    df_out['flag_page_count_outlier'] = df_out['page_count'] > 1000
    if df_out['flag_page_count_outlier'].any():
        count = df_out['flag_page_count_outlier'].sum()
        logger.warning(f"Flagged {count} rows with page_count > 1000.")

    # 3. Flag Zero Lines with Text
    # Condition: lines == 0 AND text_extraction_valid == True
    # Note: text_extraction_valid was created in Task 5
    if 'text_extraction_valid' in df_out.columns:
        mask_zero_lines = (df_out['report_lines_count'] == 0) & (df_out['text_extraction_valid'])
        df_out['flag_zero_lines_with_text'] = mask_zero_lines

        if mask_zero_lines.any():
            count = mask_zero_lines.sum()
            logger.warning(f"Flagged {count} rows with 0 lines but valid text.")
    else:
        logger.warning("Skipping zero-line check: 'text_extraction_valid' column missing.")

    return df_out, stats


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_numeric_fields(df_raw_corpus: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrator function for Task 6: Cleanse numeric metadata fields.

    Executes:
    1. Type Validation & Coercion.
    2. Definition Consistency Check.
    3. Outlier Flagging & Statistics.

    Parameters
    ----------
    df_raw_corpus : pd.DataFrame
        Input corpus DataFrame (from Task 5).

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, Any]]
        (df_clean, numeric_stats_dict)
    """
    logger.info("Starting Task 6: Cleanse numeric metadata fields.")

    try:
        # Step 1: Validate Types
        df_typed = validate_numeric_types(df_raw_corpus)

        # Step 2: Validate Definitions
        definitions = validate_definition_consistency(df_typed)

        # Step 3: Outliers & Stats
        df_clean, stats = flag_outliers(df_typed)

        # Merge definitions into stats for the manifest
        stats['definitions'] = definitions

        logger.info("Task 6 completed successfully.")
        return df_clean, stats

    except ValueError as e:
        logger.critical(f"Task 6 Failed: {str(e)}")
        raise e
    except Exception as e:
        logger.critical(f"Task 6 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 7 — Extract irregularity text units from each report (parsing algorithm)

# ==============================================================================
# Task 7: Extract irregularity text units from each report (parsing algorithm)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 2: Define sub-procedures for extraction
# -------------------------------------------------------------------------------------------------------------------------------

def parse_enumerated_summary(text: str) -> List[str]:
    """
    Parses a summary section into individual irregularity descriptions based on enumeration.

    Splits text by patterns like "1. ", "1.1 ", "2.3.1 ".
    Trims whitespace and filters empty strings.

    Parameters
    ----------
    text : str
        The summary text.

    Returns
    -------
    List[str]
        List of extracted irregularity descriptions.
    """
    if not text:
        return []

    # Regex for enumeration: Start of line, digits, dots, digits..., whitespace
    # We use a lookahead or split pattern.
    # Splitting by the enumeration pattern is robust.
    # Pattern: newline (optional) + whitespace + digits + (dot digits)* + dot/whitespace
    pattern = r'(?:^|\n)\s*\d+(?:\.\d+)*[.)]?\s+'

    # Split
    parts = re.split(pattern, text)

    # Filter empty and strip
    irregularities = [p.strip() for p in parts if p.strip()]

    return irregularities


def regex_extract_between_markers(
    text: str,
    start_marker: str,
    end_markers: List[str],
    flags: int = re.DOTALL | re.IGNORECASE
) -> List[str]:
    """
    Extracts text segments located between a start marker and any end marker.

    Implements: start_marker(.+?)(end_marker_1|end_marker_2|...)

    Parameters
    ----------
    text : str
        The full report text.
    start_marker : str
        The start delimiter (regex safe string).
    end_markers : List[str]
        List of end delimiters.
    flags : int
        Regex flags (default DOTALL | IGNORECASE).

    Returns
    -------
    List[str]
        List of extracted segments (captured groups).
    """
    if not text:
        return []

    # Escape markers to ensure they are treated as literals if they contain special chars
    # However, the config might provide regex-ready markers. Assuming literals for safety
    # unless they look like regex. The task implies they are phrases.
    # We escape them to be safe.
    s_esc = re.escape(start_marker)
    e_esc = "|".join(re.escape(m) for m in end_markers)

    # Pattern: Start + (Capture Minimal) + End
    pattern = f"{s_esc}(.+?)(?:{e_esc})"

    matches = re.findall(pattern, text, flags=flags)

    return [m.strip() for m in matches if m.strip()]


def find_paragraphs_starting_with(text: str, markers: List[str]) -> List[str]:
    """
    Identifies paragraphs that begin with any of the specified markers.

    Parameters
    ----------
    text : str
        The full report text.
    markers : List[str]
        List of marker phrases.

    Returns
    -------
    List[str]
        List of matching paragraphs.
    """
    if not text:
        return []

    # Normalize line endings (already done in Task 5, but safe to assume \n)
    # Split into paragraphs (double newline)
    paragraphs = re.split(r'\n\s*\n', text)

    matching_paragraphs = []
    for p in paragraphs:
        p_clean = p.strip()
        if not p_clean:
            continue

        # Check if starts with any marker (case insensitive)
        for m in markers:
            if p_clean.lower().startswith(m.lower()):
                matching_paragraphs.append(p_clean)
                break

    return matching_paragraphs


def extract_first_sentence(text: str, terminators: List[str]) -> str:
    """
    Extracts the first sentence from a text block based on terminators.

    Parameters
    ----------
    text : str
        The paragraph text.
    terminators : List[str]
        List of sentence terminator characters (e.g., ['.', ';']).

    Returns
    -------
    str
        The first sentence (including the terminator).
    """
    if not text:
        return ""

    # Find the earliest occurrence of any terminator
    min_idx = len(text)
    found = False

    for t in terminators:
        idx = text.find(t)
        if idx != -1 and idx < min_idx:
            min_idx = idx
            found = True

    if found:
        # Return up to and including the terminator
        return text[:min_idx+1].strip()
    else:
        # Return full text if no terminator found
        return text.strip()


# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1: Implement the two-regime extraction logic
# -------------------------------------------------------------------------------------------------------------------------------

def extract_irregularities_from_row(
    row: pd.Series,
    config: Dict[str, Any]
) -> Tuple[List[str], str]:
    """
    Applies the extraction logic to a single report row.

    Logic:
    1. If lottery >= cutoff AND summary detected AND summary text exists:
       -> Parse Summary.
    2. Else:
       -> Regex Extract.
       -> If Regex yields nothing:
          -> Fallback (Paragraph First Sentence).

    Parameters
    ----------
    row : pd.Series
        A row from df_raw_corpus.
    config : Dict[str, Any]
        Parsing configuration dictionary.

    Returns
    -------
    Tuple[List[str], str]
        (List of irregularity strings, Extraction Source label)
    """
    # Unpack config
    parsing_conf = config['pipeline_parameters']['parsing_config']
    cutoff = parsing_conf['lottery_cutoff']
    regex_start = parsing_conf['regex_start_marker']
    regex_ends = parsing_conf['regex_end_markers']
    para_starts = parsing_conf['paragraph_start_markers']
    terminators = parsing_conf['sentence_terminators']

    # Row values
    lottery = row['lottery']
    has_summary = row['has_summary_detected']
    summary_text = str(row['report_summary_text']) if pd.notna(row['report_summary_text']) else ""
    full_text = str(row['report_full_text']) if pd.notna(row['report_full_text']) else ""

    # Regime 1: Summary Parsing
    # Check if summary text is effectively present (not just whitespace/nan)
    summary_valid = len(summary_text.strip()) > 0 and summary_text.lower() != 'nan'

    if lottery >= cutoff and has_summary and summary_valid:
        irregularities = parse_enumerated_summary(summary_text)
        if irregularities:
            return irregularities, "summary_parsed"
        # If summary parsing fails (yields 0), fall through to 'summary_parsed_empty'
        return [], "summary_parsed_empty"

    # Regime 2: Full Text Extraction
    irregularities = regex_extract_between_markers(full_text, regex_start, regex_ends)

    if irregularities:
        return irregularities, "regex_between_markers"

    # Fallback
    paragraphs = find_paragraphs_starting_with(full_text, para_starts)
    fallback_irregularities = []
    for p in paragraphs:
        sent = extract_first_sentence(p, terminators)
        if sent:
            fallback_irregularities.append(sent)

    if fallback_irregularities:
        return fallback_irregularities, "fallback_first_sentence"

    return [], "failed_no_extraction"


# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 3: Materialize the irregularity-level table
# -------------------------------------------------------------------------------------------------------------------------------

def materialize_irregularities(
    df_corpus: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Applies extraction to the entire corpus and materializes the irregularity-level DataFrame.

    Parameters
    ----------
    df_corpus : pd.DataFrame
        The cleansed corpus DataFrame.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        DataFrame with one row per extracted irregularity.
        Columns: report_id, municipality_id_canon, lottery, state_canon, year,
                 irregularity_index, irregularity_text_raw, extraction_source.
    """
    records = []

    for idx, row in df_corpus.iterrows():
        irregs, source = extract_irregularities_from_row(row, config)

        if not irregs:
            # Log warning for empty extraction
            logger.warning(f"Report {row['report_id']}: No irregularities extracted (Source: {source})")
            continue

        for i, text in enumerate(irregs):
            records.append({
                'report_id': row['report_id'],
                'municipality_id_canon': row['municipality_id_canon'],
                'lottery': row['lottery'],
                'state_canon': row['state_canon'],
                'year': row['year'],
                'irregularity_index': i + 1,
                'irregularity_text_raw': text,
                'extraction_source': source
            })

    df_irreg = pd.DataFrame(records)

    # Ensure types
    if not df_irreg.empty:
        df_irreg['irregularity_index'] = df_irreg['irregularity_index'].astype(int)

    return df_irreg


# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def extract_irregularities(
    df_raw_corpus: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrator function for Task 7: Extract irregularity text units.

    Executes:
    1. Row-wise extraction using the two-regime algorithm.
    2. Materialization of the irregularity-level DataFrame.

    Parameters
    ----------
    df_raw_corpus : pd.DataFrame
        Cleansed corpus DataFrame.
    study_configuration : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    pd.DataFrame
        The irregularity-level DataFrame.
    """
    logger.info("Starting Task 7: Extract irregularity text units.")

    try:
        # Step 1 & 2 & 3 combined in materialization loop
        df_irregularities = materialize_irregularities(df_raw_corpus, study_configuration)

        # Validation
        n_reports = df_raw_corpus['report_id'].nunique()
        n_reports_with_irregs = df_irregularities['report_id'].nunique()
        n_irregs = len(df_irregularities)

        logger.info(f"Extraction complete. Found {n_irregs} irregularities across {n_reports_with_irregs} reports.")
        logger.info(f"Reports with zero extracted irregularities: {n_reports - n_reports_with_irregs}")

        if not df_irregularities.empty:
            source_dist = df_irregularities['extraction_source'].value_counts().to_dict()
            logger.info(f"Extraction Source Distribution: {source_dist}")

        return df_irregularities

    except Exception as e:
        logger.critical(f"Task 7 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 8 — Normalize text and lexicon into shared matching space

# ==============================================================================
# Task 8: Normalize text and lexicon into shared matching space
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1: Define the normalization function phi(.) exactly as configured
# -------------------------------------------------------------------------------------------------------------------------------

def get_normalization_function(config: Dict[str, Any]):
    """
    Factory that returns the normalization function phi(s) configured for the study.

    Pipeline:
    1. NFD Normalization (Unicode decomposition).
    2. Remove Accents (Strip combining marks).
    3. Lowercase.
    4. Remove Punctuation.
    5. Tokenize (Split by whitespace).
    6. Stem (PorterStemmer).

    Parameters
    ----------
    config : Dict[str, Any]
        Study configuration dictionary.

    Returns
    -------
    Callable[[str], List[str]]
        Function taking a string and returning a list of normalized stems.
    """
    nlp_config = config['pipeline_parameters']['nlp_config']

    # Initialize Stemmer
    # Note: Config pins 'nltk.stem.PorterStemmer'.
    # We instantiate it once here to avoid overhead.
    stemmer = PorterStemmer()

    # Compile regex for punctuation removal
    # Matches any character that is not a word character (alphanumeric) or whitespace
    # This effectively removes punctuation symbols.
    punct_re = re.compile(r'[^\w\s]')

    def phi(text: str) -> List[str]:
        if not text:
            return []

        # 1. NFD Normalization
        # Decompose characters (e.g., 'ç' -> 'c' + '¸')
        text_nfd = unicodedata.normalize('NFD', str(text))

        # 2. Remove Accents
        # Filter out non-spacing mark characters (Mn)
        text_no_accents = "".join(c for c in text_nfd if unicodedata.category(c) != 'Mn')

        # 3. Lowercase
        text_lower = text_no_accents.lower()

        # 4. Remove Punctuation
        # Replace punctuation with space to prevent merging words (e.g. "word1,word2" -> "word1 word2")
        # Then collapse whitespace later during tokenization
        text_no_punct = punct_re.sub(' ', text_lower)

        # 5. Tokenize
        # Split by whitespace (handles multiple spaces)
        tokens = text_no_punct.split()

        # 6. Stem
        stems = [stemmer.stem(t) for t in tokens]

        return stems

    return phi


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 2: Normalize the lexicon
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_lexicon(
    raw_lexicon_list: List[str],
    phi: Any
) -> Tuple[Set[str], Dict[str, List[str]]]:
    """
    Normalizes the raw lexicon into the shared matching space.

    Parameters
    ----------
    raw_lexicon_list : List[str]
        List of raw dictionary terms.
    phi : Callable
        The normalization function.

    Returns
    -------
    Tuple[Set[str], Dict[str, List[str]]]
        (normalized_lexicon_set, normalization_map)
        - normalized_lexicon_set: Set of space-joined normalized stems.
        - normalization_map: Mapping from normalized string to list of original raw terms.
    """
    normalized_lexicon = set()
    normalization_map = {}

    for raw_term in raw_lexicon_list:
        # Apply phi to get list of stems
        stems = phi(raw_term)

        # Join back to string for set storage/matching
        # e.g., ["empres", "fantasm"] -> "empres fantasm"
        norm_term = " ".join(stems)

        if not norm_term:
            continue

        normalized_lexicon.add(norm_term)

        if norm_term not in normalization_map:
            normalization_map[norm_term] = []
        normalization_map[norm_term].append(raw_term)

    return normalized_lexicon, normalization_map


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 3: Normalize each irregularity text
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_irregularities(
    df_irregularities: pd.DataFrame,
    phi: Any
) -> pd.DataFrame:
    """
    Applies normalization to the irregularity text in the DataFrame.

    Adds column:
    - 'irregularity_tokens_normalized': List of normalized stems.

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        DataFrame containing 'irregularity_text_raw'.
    phi : Callable
        The normalization function.

    Returns
    -------
    pd.DataFrame
        DataFrame with added normalization column.
    """
    df_out = df_irregularities.copy()

    # Apply phi row-wise
    # We use a lambda to handle potential non-string types safely, though Task 7 ensures strings.
    df_out['irregularity_tokens_normalized'] = df_out['irregularity_text_raw'].apply(lambda x: phi(str(x)) if pd.notna(x) else [])

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_text_and_lexicon(
    df_irregularities: pd.DataFrame,
    raw_lexicon_list: List[str],
    study_configuration: Dict[str, Any]
) -> Tuple[pd.DataFrame, Set[str], Dict[str, List[str]]]:
    """
    Orchestrator function for Task 8: Normalize text and lexicon.

    Executes:
    1. Instantiation of normalization function phi.
    2. Normalization of the lexicon.
    3. Normalization of irregularity texts.

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        Irregularity-level DataFrame.
    raw_lexicon_list : List[str]
        Raw dictionary terms.
    study_configuration : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Tuple[pd.DataFrame, Set[str], Dict[str, List[str]]]
        (df_irregularities_normalized, normalized_lexicon_set, normalization_map)
    """
    logger.info("Starting Task 8: Normalize text and lexicon.")

    try:
        # Step 1: Define phi
        phi = get_normalization_function(study_configuration)

        # Step 2: Normalize Lexicon
        norm_lexicon, norm_map = normalize_lexicon(raw_lexicon_list, phi)
        logger.info(f"Lexicon normalized. {len(raw_lexicon_list)} raw terms -> {len(norm_lexicon)} unique normalized terms.")

        # Step 3: Normalize Irregularities
        df_norm = normalize_irregularities(df_irregularities, phi)
        logger.info(f"Irregularities normalized. Processed {len(df_norm)} rows.")

        return df_norm, norm_lexicon, norm_map

    except Exception as e:
        logger.critical(f"Task 8 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 9 — Classify severe irregularities and compute report-level counts

# ==============================================================================
# Task 9: Classify severe irregularities and compute report-level counts
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 1: Define the severe classification rule (Appendix A2)
# -------------------------------------------------------------------------------------------------------------------------------

def classify_irregularity(
    tokens: List[str],
    normalized_lexicon: Set[str],
    max_ngram_size: int
) -> Tuple[int, List[str]]:
    """
    Classifies a single irregularity as severe if it contains any lexicon n-gram.

    Implements the rule: Severe = 1 if exists l in L such that l is a contiguous
    subsequence of the irregularity tokens.

    Parameters
    ----------
    tokens : List[str]
        List of normalized tokens for the irregularity.
    normalized_lexicon : Set[str]
        Set of normalized lexicon entries (space-joined strings).
    max_ngram_size : int
        Maximum token length of entries in the lexicon.

    Returns
    -------
    Tuple[int, List[str]]
        (is_severe (0 or 1), list of matched lexicon terms)
    """
    if not tokens:
        return 0, []

    matched_terms = []
    is_severe = 0

    # Generate n-grams of length 1 to max_ngram_size
    # We iterate through positions and lengths
    n_tokens = len(tokens)

    # Optimization: Iterate through the text once?
    # Or generate all candidate n-grams.
    # Given max_ngram_size is likely small (e.g. 3-5), generating candidates is fast.
    for n in range(1, min(n_tokens, max_ngram_size) + 1):
        for i in range(n_tokens - n + 1):
            ngram_tokens = tokens[i : i + n]
            ngram_str = " ".join(ngram_tokens)

            if ngram_str in normalized_lexicon:
                is_severe = 1
                matched_terms.append(ngram_str)

    # Deduplicate matches
    matched_terms = sorted(list(set(matched_terms)))

    return is_severe, matched_terms


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 2: Apply classification to all irregularities
# -------------------------------------------------------------------------------------------------------------------------------

def apply_classification(
    df_irregularities: pd.DataFrame,
    normalized_lexicon: Set[str]
) -> pd.DataFrame:
    """
    Applies the classification rule to the entire irregularities DataFrame.

    Adds columns:
    - 'is_severe': int (0 or 1)
    - 'matched_lexicon_terms': List[str]

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        DataFrame with 'irregularity_tokens_normalized'.
    normalized_lexicon : Set[str]
        Normalized lexicon set.

    Returns
    -------
    pd.DataFrame
        DataFrame with classification columns.
    """
    df_out = df_irregularities.copy()

    # Determine max n-gram size from lexicon for optimization
    if not normalized_lexicon:
        max_ngram_size = 0
    else:
        max_ngram_size = max(len(term.split()) for term in normalized_lexicon)

    logger.info(f"Max lexicon n-gram size: {max_ngram_size}")

    # Apply row-wise
    # We use a list comprehension zip for speed over .apply if possible, but .apply is clearer
    results = df_out['irregularity_tokens_normalized'].apply(
        lambda tokens: classify_irregularity(tokens, normalized_lexicon, max_ngram_size)
    )

    # Unpack results
    df_out['is_severe'] = [res[0] for res in results]
    df_out['matched_lexicon_terms'] = [res[1] for res in results]

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 3: Aggregate to report-level feature counts
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_counts(
    df_classified: pd.DataFrame,
    df_corpus: pd.DataFrame
) -> pd.DataFrame:
    """
    Aggregates irregularity counts to the report level and merges with the corpus.

    Computes:
    - total_irregularities_count
    - severe_irregularities_count

    Parameters
    ----------
    df_classified : pd.DataFrame
        Classified irregularities DataFrame.
    df_corpus : pd.DataFrame
        Base corpus DataFrame (to ensure all reports are present).

    Returns
    -------
    pd.DataFrame
        Corpus DataFrame with added count columns.
    """
    # 1. Aggregate
    # Group by report_id
    agg = df_classified.groupby('report_id').agg(
        total_irregularities_count=('irregularity_index', 'count'),
        severe_irregularities_count=('is_severe', 'sum')
    ).reset_index()

    # 2. Merge with Corpus
    # Left join to keep all reports (even those with 0 irregularities)
    df_merged = pd.merge(
        df_corpus,
        agg,
        on='report_id',
        how='left',
        validate='one_to_one'
    )

    # 3. Fill NaNs with 0
    # Reports with no extracted irregularities will have NaN counts
    cols_to_fill = ['total_irregularities_count', 'severe_irregularities_count']
    df_merged[cols_to_fill] = df_merged[cols_to_fill].fillna(0).astype(int)

    return df_merged


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def classify_severe_and_count(
    df_irregularities: pd.DataFrame,
    normalized_lexicon: Set[str],
    df_raw_corpus: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Orchestrator function for Task 9: Classify and Count.

    Executes:
    1. Classification of each irregularity.
    2. Aggregation of counts to report level.

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        Normalized irregularities.
    normalized_lexicon : Set[str]
        Normalized lexicon.
    df_raw_corpus : pd.DataFrame
        Base corpus.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        (df_classified_irregularities, df_corpus_with_counts)
    """
    logger.info("Starting Task 9: Classify severe irregularities and compute counts.")

    try:
        # Step 1 & 2: Classify
        df_classified = apply_classification(df_irregularities, normalized_lexicon)

        # Step 3: Aggregate
        df_corpus_counts = aggregate_counts(df_classified, df_raw_corpus)

        # Log stats
        total_severe = df_classified['is_severe'].sum()
        total_irregs = len(df_classified)
        pct_severe = (total_severe / total_irregs * 100) if total_irregs > 0 else 0

        logger.info(f"Classification complete. {total_severe}/{total_irregs} ({pct_severe:.2f}%) irregularities classified as severe.")

        return df_classified, df_corpus_counts

    except Exception as e:
        logger.critical(f"Task 9 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 10 — Construct the PCA input matrix and compute the Corruption Index

# ==============================================================================
# Task 10: Construct the PCA input matrix and compute the Corruption Index
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1: Build the PCA input matrix X
# -------------------------------------------------------------------------------------------------------------------------------

def build_pca_matrix(df: pd.DataFrame, config: Dict[str, Any]) -> Tuple[np.ndarray, List[Any]]:
    """
    Constructs the PCA input matrix X from the corpus DataFrame.

    Selects the 5 specific features in the exact order required by the study.
    Ensures no missing values.

    Parameters
    ----------
    df : pd.DataFrame
        Corpus DataFrame with counts.
    config : Dict[str, Any]
        Study configuration containing 'pca_input_features'.

    Returns
    -------
    Tuple[np.ndarray, List[Any]]
        (X matrix (n_samples, 5), list of report_ids corresponding to rows)
    """
    # Get feature list from config
    features = config['pipeline_parameters']['pca_config']['pca_input_features']

    # Validate columns exist
    missing = [f for f in features if f not in df.columns]
    if missing:
        raise ValueError(f"Missing PCA input features in DataFrame: {missing}")

    # Extract data
    # Ensure sorting by report_id for deterministic row order
    df_sorted = df.sort_values('report_id')

    # Check for nulls
    if df_sorted[features].isnull().any().any():
        raise ValueError("PCA input matrix contains null values. Imputation is not permitted.")

    X = df_sorted[features].values.astype(float)
    report_ids = df_sorted['report_id'].tolist()

    logger.info(f"PCA Matrix X constructed. Shape: {X.shape}. Features: {features}")

    return X, report_ids


# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2: Standardize features
# -------------------------------------------------------------------------------------------------------------------------------

def standardize_matrix(X: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Standardizes the input matrix X (Z-score normalization).

    Computes column-wise mean and sample standard deviation (ddof=1).
    Z = (X - mu) / sigma

    Parameters
    ----------
    X : np.ndarray
        Input matrix (n_samples, n_features).

    Returns
    -------
    Tuple[np.ndarray, np.ndarray, np.ndarray]
        (Z matrix, mu vector, sigma vector)
    """
    # Compute mean and sample std (ddof=1)
    mu = np.mean(X, axis=0)
    sigma = np.std(X, axis=0, ddof=1)

    # Check for zero variance
    if np.any(sigma == 0):
        zero_var_indices = np.where(sigma == 0)[0]
        raise ValueError(f"PCA features at indices {zero_var_indices} have zero variance. Standardization undefined.")

    # Standardize
    Z = (X - mu) / sigma

    return Z, mu, sigma


# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 3: Fit PCA, apply Kaiser criterion, and score the index
# -------------------------------------------------------------------------------------------------------------------------------

def compute_pca(Z: np.ndarray, report_ids: List[Any]) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Performs PCA on the standardized matrix Z.

    Computes covariance matrix, eigendecomposition, and PC1 scores.
    Enforces Kaiser criterion check and sign convention.

    Parameters
    ----------
    Z : np.ndarray
        Standardized input matrix.
    report_ids : List[Any]
        List of report_ids corresponding to rows of Z.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, Any]]
        (DataFrame with 'report_id' and 'corruption_index', PCA artifacts dict)
    """
    n_samples = Z.shape[0]

    # 1. Covariance Matrix
    # Sigma = (1 / (n-1)) * Z.T @ Z
    Sigma = (1 / (n_samples - 1)) * np.dot(Z.T, Z)

    # 2. Eigendecomposition
    # eigh for symmetric matrices returns eigenvalues in ascending order
    eigenvalues, eigenvectors = np.linalg.eigh(Sigma)

    # Sort descending
    idx = np.argsort(eigenvalues)[::-1]
    eigenvalues = eigenvalues[idx]
    eigenvectors = eigenvectors[:, idx]

    # 3. Kaiser Criterion & Explained Variance
    # Check PC1 eigenvalue
    pc1_eigenvalue = eigenvalues[0]
    if pc1_eigenvalue <= 1.0:
        logger.warning(f"PC1 Eigenvalue {pc1_eigenvalue:.4f} <= 1.0. Kaiser criterion not strictly met.")

    # Check if PC2 > 1 (Paper says only PC1 > 1)
    if len(eigenvalues) > 1 and eigenvalues[1] > 1.0:
        logger.warning(f"PC2 Eigenvalue {eigenvalues[1]:.4f} > 1.0. Deviation from paper findings.")

    explained_variance_ratio = eigenvalues / np.sum(eigenvalues)
    pc1_var_ratio = explained_variance_ratio[0]
    logger.info(f"PC1 Explained Variance: {pc1_var_ratio:.4f}")

    # 4. PC1 Loadings & Sign Convention
    pc1_loadings = eigenvectors[:, 0]

    # Paper says "All variables load positively".
    # If sum of loadings is negative, flip sign to align with "Corruption" direction.
    # (Or if majority are negative). Sum is robust.
    if np.sum(pc1_loadings) < 0:
        logger.info("Flipping PC1 sign to enforce positive loading convention.")
        pc1_loadings = -pc1_loadings

    # 5. Compute Scores
    # Score = Z @ loadings
    scores = np.dot(Z, pc1_loadings)

    # 6. Package Results
    df_scores = pd.DataFrame({
        'report_id': report_ids,
        'corruption_index': scores
    })

    artifacts = {
        'eigenvalues': eigenvalues.tolist(),
        'explained_variance_ratio': explained_variance_ratio.tolist(),
        'pc1_loadings': pc1_loadings.tolist(),
        'covariance_matrix': Sigma.tolist()
    }

    return df_scores, artifacts


# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def build_pca_index(
    df_corpus_with_counts: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrator function for Task 10: Build PCA Index.

    Executes:
    1. Matrix Construction.
    2. Standardization.
    3. PCA Computation.
    4. Merging index back to corpus.

    Parameters
    ----------
    df_corpus_with_counts : pd.DataFrame
        Corpus DataFrame with feature counts.
    study_configuration : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, Any]]
        (Corpus DataFrame with 'corruption_index', PCA artifacts)
    """
    logger.info("Starting Task 10: Construct PCA input matrix and compute Corruption Index.")

    try:
        # Step 1: Build Matrix
        X, report_ids = build_pca_matrix(df_corpus_with_counts, study_configuration)

        # Step 2: Standardize
        Z, mu, sigma = standardize_matrix(X)

        # Step 3: Compute PCA
        df_scores, artifacts = compute_pca(Z, report_ids)

        # Add standardization params to artifacts
        artifacts['mu'] = mu.tolist()
        artifacts['sigma'] = sigma.tolist()

        # Step 4: Merge back
        # Left join to original df (though ids should match exactly if sorted)
        df_final = pd.merge(
            df_corpus_with_counts,
            df_scores,
            on='report_id',
            how='left',
            validate='one_to_one'
        )

        logger.info("Task 10 completed. Corruption Index computed.")
        return df_final, artifacts

    except Exception as e:
        logger.critical(f"Task 10 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 11 — Join index to validation data and construct agreement samples

# ==============================================================================
# Task 11: Join index to validation data and construct agreement samples
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1: Merge corruption index with validation data
# -------------------------------------------------------------------------------------------------------------------------------

def merge_corpus_validation(
    df_corpus: pd.DataFrame,
    df_validation: pd.DataFrame
) -> pd.DataFrame:
    """
    Merges the corpus (with Corruption Index) and validation DataFrames.

    Prioritizes joining on ['municipality_id_canon', 'year'] if possible,
    otherwise falls back to ['municipality_id_canon'].
    Enforces that the merge does not inflate the corpus row count (Left Join).

    Parameters
    ----------
    df_corpus : pd.DataFrame
        Corpus DataFrame with 'corruption_index'.
    df_validation : pd.DataFrame
        Validation DataFrame with FF/GT/CGU data.

    Returns
    -------
    pd.DataFrame
        Merged DataFrame.
    """
    # Determine join keys
    # Check if validation is unique on (muni, year)
    val_muni_year_unique = not df_validation.duplicated(subset=['municipality_id_canon', 'year']).any()
    val_muni_unique = not df_validation.duplicated(subset=['municipality_id_canon']).any()

    # Corpus always has (muni, year)
    if val_muni_year_unique and 'year' in df_validation.columns:
        join_keys = ['municipality_id_canon', 'year']
        logger.info("Joining on ['municipality_id_canon', 'year']")
    elif val_muni_unique:
        join_keys = ['municipality_id_canon']
        logger.info("Joining on ['municipality_id_canon']")
    else:
        # Fallback: Try muni+year even if duplicates exist? No, that risks inflation.
        # If validation has duplicates on join keys, we must deduplicate or fail.
        # Assuming validation data is well-formed (Task 2 checked this).
        # We will use muni+year as the most specific key.
        join_keys = ['municipality_id_canon', 'year']
        logger.warning("Validation data not unique on join keys. Risk of row inflation.")

    # Perform Merge
    # Suffixes: _corpus is preserved, _val for validation columns
    df_merged = pd.merge(
        df_corpus,
        df_validation,
        on=join_keys,
        how='left',
        suffixes=('', '_val'),
        validate='many_to_one' if val_muni_year_unique or val_muni_unique else None
    )

    # Check for row inflation
    if len(df_merged) != len(df_corpus):
        logger.error(f"Merge inflated rows! Corpus: {len(df_corpus)}, Merged: {len(df_merged)}")
        # In a strict pipeline, we might raise Error. Here we log critical.

    # Check State Consistency
    # state_canon from corpus vs state_canon_val
    if 'state_canon_val' in df_merged.columns:
        mask_mismatch = (
            df_merged['state_canon'].notna() &
            df_merged['state_canon_val'].notna() &
            (df_merged['state_canon'] != df_merged['state_canon_val'])
        )
        if mask_mismatch.any():
            n_mismatch = mask_mismatch.sum()
            logger.warning(f"State mismatch between corpus and validation for {n_mismatch} rows.")

    return df_merged


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 2: Construct the strict and near agreement samples
# -------------------------------------------------------------------------------------------------------------------------------

def construct_agreement_flags(df: pd.DataFrame) -> pd.DataFrame:
    """
    Constructs boolean flags for strict and near agreement samples.

    Strict: FF == GT
    Near: |FF - GT| <= 1
    (Both must be non-null)

    Parameters
    ----------
    df : pd.DataFrame
        Merged DataFrame.

    Returns
    -------
    pd.DataFrame
        DataFrame with 'is_strict_agreement' and 'is_near_agreement' columns.
    """
    df_out = df.copy()

    ff = df_out['ff_corruption_count']
    gt = df_out['gt_corruption_count']

    # Ensure numeric
    ff = pd.to_numeric(ff, errors='coerce')
    gt = pd.to_numeric(gt, errors='coerce')

    mask_valid = ff.notna() & gt.notna()

    # Strict
    df_out['is_strict_agreement'] = False
    df_out.loc[mask_valid, 'is_strict_agreement'] = (ff[mask_valid] == gt[mask_valid])

    # Near
    df_out['is_near_agreement'] = False
    df_out.loc[mask_valid, 'is_near_agreement'] = (np.abs(ff[mask_valid] - gt[mask_valid]) <= 1)

    # Log counts
    n_strict = df_out['is_strict_agreement'].sum()
    n_near = df_out['is_near_agreement'].sum()
    logger.info(f"Agreement Samples: Strict N={n_strict}, Near N={n_near}")

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 3: Compute the mean human-coded outcome for agreement samples
# -------------------------------------------------------------------------------------------------------------------------------

def compute_hc_mean(df: pd.DataFrame) -> pd.DataFrame:
    """
    Computes the mean of FF and GT corruption counts.

    hc_mean = (FF + GT) / 2
    Only computed where both are non-null.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with FF and GT counts.

    Returns
    -------
    pd.DataFrame
        DataFrame with 'hc_mean' column.
    """
    df_out = df.copy()

    # Compute the mean of FF and GT corruption counts
    ff = pd.to_numeric(df_out['ff_corruption_count'], errors='coerce')
    gt = pd.to_numeric(df_out['gt_corruption_count'], errors='coerce')

    df_out['hc_mean'] = (ff + gt) / 2.0

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_validation_samples(
    df_corpus_with_index: pd.DataFrame,
    df_validation_raw: pd.DataFrame
) -> pd.DataFrame:
    """
    Orchestrator function for Task 11: Prepare validation samples.

    Executes:
    1. Merge corpus and validation data.
    2. Construct agreement flags.
    3. Compute mean human-coded outcome.

    Parameters
    ----------
    df_corpus_with_index : pd.DataFrame
        Corpus DataFrame with Corruption Index.
    df_validation_raw : pd.DataFrame
        Validation DataFrame.

    Returns
    -------
    pd.DataFrame
        Analysis DataFrame ready for regression.
    """
    logger.info("Starting Task 11: Prepare validation samples.")

    try:
        # Step 1: Merge
        df_merged = merge_corpus_validation(df_corpus_with_index, df_validation_raw)

        # Step 2: Agreement Flags
        df_flags = construct_agreement_flags(df_merged)

        # Step 3: HC Mean
        df_analysis = compute_hc_mean(df_flags)

        logger.info("Task 11 completed. Analysis DataFrame prepared.")
        return df_analysis

    except Exception as e:
        logger.critical(f"Task 11 Failed with unexpected error: {str(e)}")
        raise e


In [None]:
# Task 12 — Reproduce Table 1: validation regressions against FF/GT

# ==============================================================================
# Task 12: Reproduce Table 1: validation regressions against FF/GT
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1: Standardize the Corruption Index for interpretability
# -------------------------------------------------------------------------------------------------------------------------------

def standardize_series(series: pd.Series) -> pd.Series:
    """
    Standardizes a pandas Series (Z-score) using sample standard deviation (ddof=1).

    Parameters
    ----------
    series : pd.Series
        Input numeric series.

    Returns
    -------
    pd.Series
        Standardized series (mean ~0, std ~1).
    """
    if series.dropna().empty:
        return series

    # Standardize a pandas Series (Z-score) using sample standard deviation (ddof=1)
    mu = series.mean()
    sigma = series.std(ddof=1)

    if sigma == 0:
        logger.warning("Standardization skipped: Zero variance.")
        return series - mu

    return (series - mu) / sigma


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 2: Estimate Equation (1) with state fixed effects and robust SE
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_validation_model(
    df: pd.DataFrame,
    outcome_col: str,
    predictor_col: str,
    fe_col: str,
    robust_type: str = 'HC1'
) -> Dict[str, Any]:
    """
    Estimates the validation regression model: Outcome ~ Predictor + FE.

    Standardizes the predictor within the estimation sample.
    Uses HC1 robust standard errors.

    Parameters
    ----------
    df : pd.DataFrame
        Estimation sample DataFrame.
    outcome_col : str
        Name of outcome variable.
    predictor_col : str
        Name of predictor variable (Corruption Index).
    fe_col : str
        Name of fixed effect variable (State).
    robust_type : str
        Type of robust covariance (default 'HC1').

    Returns
    -------
    Dict[str, Any]
        Dictionary containing regression results and diagnostics.
    """
    # 1. Prepare Data
    # Drop missing
    cols = [outcome_col, predictor_col, fe_col]
    data = df[cols].dropna().copy()

    if data.empty:
        return {'N': 0, 'error': 'Empty sample'}

    # 2. Standardize Predictor (Sample-Specific)
    data['X_std'] = standardize_series(data[predictor_col])

    # 3. Define Formula
    # Outcome ~ X_std + C(State)
    # We rely on patsy to handle the dummy encoding (LSDV)
    formula = f"{outcome_col} ~ X_std + C({fe_col})"

    # 4. Fit Model
    try:
        model = smf.ols(formula, data=data)
        results = model.fit(cov_type=robust_type)

        # 5. Extract Metrics
        # Predictor is 'X_std'
        beta = results.params['X_std']
        se = results.bse['X_std']
        t_stat = results.tvalues['X_std']
        p_val = results.pvalues['X_std']
        r2 = results.rsquared
        n_obs = int(results.nobs)

        return {
            'beta': beta,
            'se': se,
            't_stat': t_stat,
            'p_value': p_val,
            'R2': r2,
            'N': n_obs,
            'sample_ids': df.index.tolist() # Assuming index is report_id or unique
        }

    except Exception as e:
        logger.error(f"Regression failed: {str(e)}")
        return {'N': 0, 'error': str(e)}


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Record and validate results against paper targets
# -------------------------------------------------------------------------------------------------------------------------------

def validate_results(results: Dict[str, Any], targets: Dict[str, Any], spec_name: str) -> None:
    """
    Validates regression results against paper targets.

    Parameters
    ----------
    results : Dict[str, Any]
        Computed regression results.
    targets : Dict[str, Any]
        Target values (N, R2, Beta).
    spec_name : str
        Name of the specification for logging.
    """
    if 'error' in results:
        logger.warning(f"[{spec_name}] Validation skipped due to error.")
        return

    # Check N
    if 'N' in targets:
        diff_n = results['N'] - targets['N']
        if diff_n != 0:
            logger.warning(f"[{spec_name}] Sample size mismatch. Got {results['N']}, Expected {targets['N']}.")

    # Check R2
    if 'R2' in targets:
        diff_r2 = abs(results['R2'] - targets['R2'])
        if diff_r2 > 0.05:
             logger.warning(f"[{spec_name}] R2 deviation > 0.05. Got {results['R2']:.3f}, Expected {targets['R2']:.3f}.")
        else:
             logger.info(f"[{spec_name}] R2 match ({results['R2']:.3f} vs {targets['R2']:.3f}).")

    # Check Beta
    if 'Beta' in targets:
        diff_beta = abs(results['beta'] - targets['Beta'])
        if diff_beta > 0.2: # Allow some variance due to standardization/sample diffs
             logger.warning(f"[{spec_name}] Beta deviation > 0.2. Got {results['beta']:.3f}, Expected {targets['Beta']:.3f}.")
        else:
             logger.info(f"[{spec_name}] Beta match ({results['beta']:.3f} vs {targets['Beta']:.3f}).")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_table1_regressions(
    df_analysis: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 12: Reproduce Table 1.

    Runs 4 specifications:
    1. Strict Agreement (FF=GT)
    2. Near Agreement (|FF-GT|<=1)
    3. Full FF
    4. Full GT

    Parameters
    ----------
    df_analysis : pd.DataFrame
        Analysis DataFrame with agreement flags and outcomes.
    study_configuration : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Dictionary of results for all specifications.
    """
    logger.info("Starting Task 12: Reproduce Table 1 regressions.")

    fe_col = 'state_canon' # Assuming this is the column name for state

    # Ensure state column exists, fallback to 'state' if 'state_canon' missing
    if fe_col not in df_analysis.columns and 'state' in df_analysis.columns:
        fe_col = 'state'

    specs = {
        'Strict Agreement': {
            'mask': df_analysis['is_strict_agreement'] == True,
            'outcome': 'hc_mean',
            'targets': {'N': 34, 'R2': 0.726, 'Beta': 1.492}
        },
        'Near Agreement': {
            'mask': df_analysis['is_near_agreement'] == True,
            'outcome': 'hc_mean',
            'targets': {'N': 99, 'R2': 0.714, 'Beta': 1.276}
        },
        'Full FF': {
            'mask': df_analysis['ff_corruption_count'].notna(),
            'outcome': 'ff_corruption_count',
            'targets': {'R2': 0.382, 'Beta': 0.628} # From Table 1 Col 3
        },
        'Full GT': {
            'mask': df_analysis['gt_corruption_count'].notna(),
            'outcome': 'gt_corruption_count',
            'targets': {'R2': 0.488, 'Beta': 2.120} # From Table 1 Col 4
        }
    }

    all_results = {}

    for name, spec in specs.items():
        logger.info(f"Running Spec: {name}")

        # Filter Data
        df_spec = df_analysis[spec['mask']].copy()

        # Run Regression
        res = estimate_validation_model(
            df_spec,
            outcome_col=spec['outcome'],
            predictor_col='corruption_index',
            fe_col=fe_col
        )

        # Validate
        validate_results(res, spec['targets'], name)

        all_results[name] = res

    return all_results


In [None]:
# Task 13 — Reproduce Table 2: validation regressions against CGU

# ==============================================================================
# Task 13: Reproduce Table 2: validation regressions against CGU
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1: Construct the CGU estimation sample
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_cgu_samples(df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    """
    Constructs estimation samples for CGU levels and log specifications.

    Levels sample: Rows with non-null CGU counts.
    Log sample: Rows with positive CGU counts (drops zeros).

    Parameters
    ----------
    df : pd.DataFrame
        Analysis DataFrame.

    Returns
    -------
    Dict[str, pd.DataFrame]
        Dictionary with 'levels' and 'log' DataFrames.
    """
    # Base: Non-null CGU
    mask_valid = df['cgu_severe_count'].notna()
    df_levels = df[mask_valid].copy()

    # Log: Positive CGU
    # Ensure numeric
    cgu_vals = pd.to_numeric(df_levels['cgu_severe_count'], errors='coerce')
    mask_pos = cgu_vals > 0
    df_log = df_levels[mask_pos].copy()

    n_dropped = len(df_levels) - len(df_log)
    if n_dropped > 0:
        logger.info(f"Dropped {n_dropped} non-positive CGU observations for log specification.")

    return {'levels': df_levels, 'log': df_log}


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 2: Estimate the levels specification
# -------------------------------------------------------------------------------------------------------------------------------

# We reuse estimate_validation_model from Task 12 (available in environment)
# No new code needed for estimation logic, just orchestration.

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 3: Estimate the log specification
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_log_outcome(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
    """
    Creates the log-transformed outcome variable.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame (already filtered for positivity).
    col_name : str
        Name of the column to log-transform.

    Returns
    -------
    pd.DataFrame
        DataFrame with new 'log_{col_name}' column.
    """
    df_out = df.copy()
    df_out[f'log_{col_name}'] = np.log(df_out[col_name].astype(float))
    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_table2_regressions(
    df_analysis: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 13: Reproduce Table 2.

    Runs 2 specifications:
    1. CGU Levels
    2. CGU Log

    Parameters
    ----------
    df_analysis : pd.DataFrame
        Analysis DataFrame.
    study_configuration : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Dictionary of results.
    """
    logger.info("Starting Task 13: Reproduce Table 2 regressions.")

    # Prepare Samples
    samples = prepare_cgu_samples(df_analysis)

    fe_col = 'state_canon'
    if fe_col not in df_analysis.columns and 'state' in df_analysis.columns:
        fe_col = 'state'

    results = {}

    # Spec 1: Levels
    logger.info("Running Spec: CGU Levels")
    res_levels = estimate_validation_model(
        samples['levels'],
        outcome_col='cgu_severe_count',
        predictor_col='corruption_index',
        fe_col=fe_col
    )
    validate_results(res_levels, {'R2': 0.311, 'Beta': 4.584}, "CGU Levels")
    results['CGU Levels'] = res_levels

    # Spec 2: Log
    logger.info("Running Spec: CGU Log")
    df_log_ready = prepare_log_outcome(samples['log'], 'cgu_severe_count')
    res_log = estimate_validation_model(
        df_log_ready,
        outcome_col='log_cgu_severe_count',
        predictor_col='corruption_index',
        fe_col=fe_col
    )
    validate_results(res_log, {'R2': 0.293, 'Beta': 0.169}, "CGU Log")
    results['CGU Log'] = res_log

    return results


In [None]:
# Task 14 — Reproduce Table 3: correlates and construct validity

# ==============================================================================
# Task 14: Reproduce Table 3: correlates and construct validity
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1: Prepare covariates with correct scaling
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_table3_covariates(df: pd.DataFrame) -> pd.DataFrame:
    """
    Prepares covariates for Table 3 analysis.

    - Filters to reference year 2000.
    - Scales distance to capital (km -> 1000km).
    - Derives log population if missing.

    Parameters
    ----------
    df : pd.DataFrame
        Analysis DataFrame.

    Returns
    -------
    pd.DataFrame
        DataFrame with prepared covariates.
    """
    # 1. Filter Year 2000
    # Note: Covariates are in df_validation_raw, merged into df_analysis.
    # The merge in Task 11 might have duplicated year columns if not careful.
    # We assume 'covariate_reference_year' is present.
    if 'covariate_reference_year' not in df.columns:
        # If missing, check if we can infer or if it was dropped.
        # Assuming it's present from Task 2 validation.
        logger.warning("covariate_reference_year missing. Assuming all rows valid for now (risk).")
        df_2000 = df.copy()
    else:
        df_2000 = df[df['covariate_reference_year'] == 2000].copy()

    if df_2000.empty:
        logger.warning("No observations for year 2000. Table 3 will be empty.")
        return df_2000

    # 2. Scale Distance
    if 'distance_to_capital_km' in df_2000.columns:
        df_2000['distance_scaled'] = df_2000['distance_to_capital_km'] * 0.001
    else:
        logger.warning("distance_to_capital_km missing.")

    # 3. Log Population
    if 'population_log' not in df_2000.columns:
        if 'population_total' in df_2000.columns:
            # Handle zeros/negatives
            pop = pd.to_numeric(df_2000['population_total'], errors='coerce')
            df_2000['population_log'] = np.log(pop.where(pop > 0))
        else:
            logger.warning("population_log and population_total missing.")

    return df_2000


# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 2: Estimate bivariate regressions (columns 1–6 of Table 3)
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_bivariate_models(df: pd.DataFrame) -> Dict[str, Any]:
    """
    Estimates bivariate regressions for Table 3.

    Model: Corruption Index ~ Covariate
    SE: HC1

    Parameters
    ----------
    df : pd.DataFrame
        Prepared DataFrame.

    Returns
    -------
    Dict[str, Any]
        Dictionary of regression results.
    """
    covariates = [
        'literacy_rate',
        'gdp_per_capita',
        'urban_population_share',
        'population_log',
        'distance_scaled',
        'has_radio_fm'
    ]

    results = {}

    for cov in covariates:
        if cov not in df.columns:
            logger.warning(f"Covariate {cov} missing. Skipping.")
            continue

        # Drop missing for this specific regression
        data = df[['corruption_index', cov]].dropna()

        if data.empty:
            logger.warning(f"Empty sample for {cov}.")
            continue

        formula = f"corruption_index ~ {cov}"

        try:
            model = smf.ols(formula, data=data)
            res = model.fit(cov_type='HC1')

            results[cov] = {
                'beta': res.params[cov],
                'se': res.bse[cov],
                'p_value': res.pvalues[cov],
                'R2': res.rsquared,
                'N': int(res.nobs)
            }
        except Exception as e:
            logger.error(f"Regression failed for {cov}: {str(e)}")

    return results


# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 3: Estimate multivariate regression (column 7 of Table 3)
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_multivariate_model(df: pd.DataFrame) -> Dict[str, Any]:
    """
    Estimates the multivariate regression for Table 3.

    Model: Corruption Index ~ All Covariates
    SE: HC1

    Parameters
    ----------
    df : pd.DataFrame
        Prepared DataFrame.

    Returns
    -------
    Dict[str, Any]
        Regression results.
    """
    covariates = [
        'literacy_rate',
        'gdp_per_capita',
        'urban_population_share',
        'population_log',
        'distance_scaled',
        'has_radio_fm'
    ]

    # Check existence
    valid_covs = [c for c in covariates if c in df.columns]

    if not valid_covs:
        return {'error': 'No covariates available'}

    # Drop missing listwise
    cols = ['corruption_index'] + valid_covs
    data = df[cols].dropna()

    if data.empty:
        return {'error': 'Empty sample'}

    formula = "corruption_index ~ " + " + ".join(valid_covs)

    try:
        model = smf.ols(formula, data=data)
        res = model.fit(cov_type='HC1')

        return {
            'params': res.params.to_dict(),
            'bse': res.bse.to_dict(),
            'pvalues': res.pvalues.to_dict(),
            'R2': res.rsquared,
            'N': int(res.nobs)
        }
    except Exception as e:
        logger.error(f"Multivariate regression failed: {str(e)}")
        return {'error': str(e)}


# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_table3_regressions(
    df_analysis: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 14: Reproduce Table 3.

    Executes:
    1. Covariate preparation.
    2. Bivariate regressions.
    3. Multivariate regression.

    Parameters
    ----------
    df_analysis : pd.DataFrame
        Analysis DataFrame.
    study_configuration : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Results dictionary.
    """
    logger.info("Starting Task 14: Reproduce Table 3 regressions.")

    # Step 1: Prepare
    df_prep = prepare_table3_covariates(df_analysis)

    # Step 2: Bivariate
    bivariate_results = estimate_bivariate_models(df_prep)

    # Step 3: Multivariate
    multivariate_results = estimate_multivariate_model(df_prep)

    # Validate Joint R2
    if 'R2' in multivariate_results:
        target_r2 = 0.172
        diff = abs(multivariate_results['R2'] - target_r2)
        if diff > 0.05:
            logger.warning(f"Table 3 Multivariate R2 deviation. Got {multivariate_results['R2']:.3f}, Expected {target_r2}.")
        else:
            logger.info(f"Table 3 Multivariate R2 match ({multivariate_results['R2']:.3f}).")

    return {
        'bivariate': bivariate_results,
        'multivariate': multivariate_results
    }


In [None]:
# Task 15 — Create orchestrator function for main pipeline and LOO robustness

# ==============================================================================
# Task 15: Create orchestrator function for main pipeline and LOO robustness
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 2: Define orchestrator input/output contract
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass
class PipelineArtifacts:
    """
    Container for all artifacts produced by the main pipeline.

    Attributes
    ----------
    df_corpus_clean : pd.DataFrame
        Cleansed corpus data.
    df_validation_clean : pd.DataFrame
        Cleansed validation data.
    df_irregularities : pd.DataFrame
        Extracted irregularity segments.
    df_corpus_with_index : pd.DataFrame
        Corpus with Corruption Index and PCA features.
    df_analysis : pd.DataFrame
        Merged analysis DataFrame with agreement flags.
    pca_artifacts : Dict[str, Any]
        PCA eigenvalues, vectors, and variance.
    table1_results : Dict[str, Any]
        Regression results for Table 1.
    table2_results : Dict[str, Any]
        Regression results for Table 2.
    table3_results : Dict[str, Any]
        Regression results for Table 3.
    run_manifest : Dict[str, Any]
        Metadata, configuration, and QA metrics.
    """
    df_corpus_clean: pd.DataFrame
    df_validation_clean: pd.DataFrame
    df_irregularities: pd.DataFrame
    df_corpus_with_index: pd.DataFrame
    df_analysis: pd.DataFrame
    pca_artifacts: Dict[str, Any]
    table1_results: Dict[str, Any]
    table2_results: Dict[str, Any]
    table3_results: Dict[str, Any]
    run_manifest: Dict[str, Any]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 3: Define determinism and reproducibility guarantees
# -------------------------------------------------------------------------------------------------------------------------------

def generate_run_manifest(config: Dict[str, Any], metrics: Dict[str, Any]) -> Dict[str, Any]:
    """
    Generates a run manifest with environment metadata and configuration hash.

    Parameters
    ----------
    config : Dict[str, Any]
        Study configuration.
    metrics : Dict[str, Any]
        Collected QA metrics from pipeline steps.

    Returns
    -------
    Dict[str, Any]
        The run manifest.
    """
    manifest = {
        'timestamp': datetime.utcnow().isoformat(),
        'python_version': sys.version,
        'libraries': {
            'pandas': pd.__version__,
            'numpy': np.__version__,
            # Add others as needed/available
        },
        'configuration': config,
        'config_hash': hashlib.md5(json.dumps(config, sort_keys=True).encode('utf-8')).hexdigest(),
        'metrics': metrics
    }
    return manifest

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_main_pipeline(
    df_raw_corpus: pd.DataFrame,
    df_validation_raw: pd.DataFrame,
    language: str,
    raw_lexicon_list: List[str],
    study_configuration: Dict[str, Any]
) -> PipelineArtifacts:
    """
    Orchestrator function for the main pipeline (Tasks 1-14).

    Executes the full end-to-end workflow:
    1. Validation & Cleansing
    2. Extraction & NLP
    3. Classification & Index Construction
    4. Validation Regressions (Tables 1, 2, 3)

    Parameters
    ----------
    df_raw_corpus : pd.DataFrame
        Raw corpus data.
    df_validation_raw : pd.DataFrame
        Raw validation data.
    language : str
        Study language.
    raw_lexicon_list : List[str]
        Raw dictionary terms.
    study_configuration : Dict[str, Any]
        Pipeline configuration.

    Returns
    -------
    PipelineArtifacts
        Container with all produced artifacts.
    """
    logger.info("Starting Main Pipeline Execution.")
    metrics_accumulator = {}

    try:
        # --- Phase 1: Validation & Cleansing ---
        # Task 1: Validate Corpus
        validate_corpus_inputs(df_raw_corpus)

        # Task 2: Validate Validation Data
        validate_validation_inputs(df_validation_raw, df_raw_corpus)

        # Task 3: Validate Config
        validated_config = validate_config_inputs(language, raw_lexicon_list, study_configuration)

        # Task 4: Cleanse Identifiers
        df_c_clean, df_v_clean, df_c_quar, df_v_quar = cleanse_identifiers(df_raw_corpus, df_validation_raw)
        metrics_accumulator['quarantine_corpus_ids'] = len(df_c_quar)
        metrics_accumulator['quarantine_validation_ids'] = len(df_v_quar)

        # Task 5: Cleanse Text
        df_c_text, df_c_text_quar, text_metrics = cleanse_text_fields(df_c_clean)
        metrics_accumulator.update(text_metrics)

        # Task 6: Cleanse Numeric
        df_c_num, num_stats = cleanse_numeric_fields(df_c_text)
        metrics_accumulator['numeric_stats'] = num_stats

        # --- Phase 2: Extraction & NLP ---
        # Task 7: Extract Irregularities
        df_irreg = extract_irregularities(df_c_num, validated_config)
        metrics_accumulator['total_irregularities_extracted'] = len(df_irreg)

        # Task 8: Normalize
        df_irreg_norm, norm_lexicon, norm_map = normalize_text_and_lexicon(df_irreg, raw_lexicon_list, validated_config)
        metrics_accumulator['normalized_lexicon_size'] = len(norm_lexicon)

        # Task 9: Classify & Count
        df_irreg_class, df_corpus_counts = classify_severe_and_count(df_irreg_norm, norm_lexicon, df_c_num)
        metrics_accumulator['total_severe_irregularities'] = df_irreg_class['is_severe'].sum()

        # --- Phase 3: Index Construction ---
        # Task 10: PCA Index
        df_corpus_index, pca_artifacts = build_pca_index(df_corpus_counts, validated_config)
        metrics_accumulator['pca_explained_variance_pc1'] = pca_artifacts['explained_variance_ratio'][0]

        # --- Phase 4: Validation Analysis ---
        # Task 11: Prepare Samples
        df_analysis = prepare_validation_samples(df_corpus_index, df_v_clean)
        metrics_accumulator['analysis_sample_size'] = len(df_analysis)

        # Task 12: Table 1
        t1_results = run_table1_regressions(df_analysis, validated_config)

        # Task 13: Table 2
        t2_results = run_table2_regressions(df_analysis, validated_config)

        # Task 14: Table 3
        t3_results = run_table3_regressions(df_analysis, validated_config)

        # --- Finalize ---
        manifest = generate_run_manifest(validated_config, metrics_accumulator)

        artifacts = PipelineArtifacts(
            df_corpus_clean=df_c_num,
            df_validation_clean=df_v_clean,
            df_irregularities=df_irreg_class,
            df_corpus_with_index=df_corpus_index,
            df_analysis=df_analysis,
            pca_artifacts=pca_artifacts,
            table1_results=t1_results,
            table2_results=t2_results,
            table3_results=t3_results,
            run_manifest=manifest
        )

        logger.info("Main Pipeline Execution Completed Successfully.")
        return artifacts

    except Exception as e:
        logger.critical(f"Pipeline Execution Failed: {str(e)}")
        raise e


In [None]:
# Task 16 — Leave-one-out robustness analysis (Figure 2)

# ==============================================================================
# Task 16: Leave-one-out robustness analysis (Figure 2)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 1: Define the LOO algorithm
# -------------------------------------------------------------------------------------------------------------------------------

def execute_loo_iteration(
    df_sample: pd.DataFrame,
    outcome_col: str,
    predictor_col: str,
    fe_col: str
) -> List[Dict[str, Any]]:
    """
    Executes Leave-One-Out estimation for a given sample.

    Iterates through each row, drops it, re-standardizes the predictor,
    and estimates the model.

    Parameters
    ----------
    df_sample : pd.DataFrame
        The full estimation sample.
    outcome_col : str
        Outcome variable name.
    predictor_col : str
        Predictor variable name (Corruption Index).
    fe_col : str
        Fixed effect variable name.

    Returns
    -------
    List[Dict[str, Any]]
        List of results for each iteration.
    """
    results = []
    indices = df_sample.index.tolist()

    # Ensure data is clean
    data = df_sample[[outcome_col, predictor_col, fe_col]].dropna()

    # We iterate over the *cleaned* data indices
    clean_indices = data.index.tolist()

    for drop_idx in clean_indices:
        # 1. Create LOO Sample
        loo_data = data.drop(drop_idx).copy()

        # 2. Estimate Model (using Task 12 function)
        # Note: estimate_validation_model handles standardization internally
        res = estimate_validation_model(
            loo_data,
            outcome_col=outcome_col,
            predictor_col=predictor_col,
            fe_col=fe_col
        )

        if 'error' not in res:
            results.append({
                'dropped_id': drop_idx,
                'beta': res['beta'],
                'R2': res['R2'],
                'N': res['N']
            })
        else:
            logger.warning(f"LOO iteration failed for dropped_id {drop_idx}: {res['error']}")

    return results


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2: Execute LOO for both agreement samples
# -------------------------------------------------------------------------------------------------------------------------------

def run_loo_for_samples(df_analysis: pd.DataFrame) -> Dict[str, List[Dict[str, Any]]]:
    """
    Runs LOO analysis for Strict and Near agreement samples.

    Parameters
    ----------
    df_analysis : pd.DataFrame
        Analysis DataFrame.

    Returns
    -------
    Dict[str, List[Dict[str, Any]]]
        Dictionary of LOO results for 'Strict' and 'Near'.
    """
    loo_results = {}

    fe_col = 'state_canon'
    if fe_col not in df_analysis.columns and 'state' in df_analysis.columns:
        fe_col = 'state'

    # 1. Strict Agreement
    mask_strict = df_analysis['is_strict_agreement'] == True
    df_strict = df_analysis[mask_strict].copy()

    if not df_strict.empty:
        logger.info(f"Running LOO for Strict Agreement (N={len(df_strict)})")
        loo_results['Strict'] = execute_loo_iteration(
            df_strict, 'hc_mean', 'corruption_index', fe_col
        )
    else:
        logger.warning("Strict Agreement sample empty. Skipping LOO.")
        loo_results['Strict'] = []

    # 2. Near Agreement
    mask_near = df_analysis['is_near_agreement'] == True
    df_near = df_analysis[mask_near].copy()

    if not df_near.empty:
        logger.info(f"Running LOO for Near Agreement (N={len(df_near)})")
        loo_results['Near'] = execute_loo_iteration(
            df_near, 'hc_mean', 'corruption_index', fe_col
        )
    else:
        logger.warning("Near Agreement sample empty. Skipping LOO.")
        loo_results['Near'] = []

    return loo_results


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 3: Summarize and validate stability
# -------------------------------------------------------------------------------------------------------------------------------

def summarize_loo_stability(loo_results: Dict[str, List[Dict[str, Any]]]) -> Dict[str, Any]:
    """
    Summarizes LOO results (min/max Beta and R2) and validates against targets.

    Parameters
    ----------
    loo_results : Dict[str, List[Dict[str, Any]]]
        LOO results from Step 2.

    Returns
    -------
    Dict[str, Any]
        Summary statistics.
    """
    summary = {}

    targets = {
        'Strict': {'beta_min': 1.20, 'beta_max': 1.73, 'r2_min': 0.67, 'r2_max': 0.83},
        'Near': {'beta_min': 1.17, 'beta_max': 1.35, 'r2_min': 0.66, 'r2_max': 0.74}
    }

    # Summarize LOO results (min/max Beta and R2) and validate against targets
    for sample_name, results in loo_results.items():
        if not results:
            continue

        betas = [r['beta'] for r in results]
        r2s = [r['R2'] for r in results]

        stats = {
            'beta_min': min(betas),
            'beta_max': max(betas),
            'r2_min': min(r2s),
            'r2_max': max(r2s)
        }
        summary[sample_name] = stats

        # Validate
        t = targets.get(sample_name)
        if t:
            # Check ranges (allow small tolerance)
            tol = 0.1
            if (stats['beta_min'] < t['beta_min'] - tol) or (stats['beta_max'] > t['beta_max'] + tol):
                logger.warning(f"[{sample_name}] LOO Beta range mismatch. Got [{stats['beta_min']:.2f}, {stats['beta_max']:.2f}], Expected [{t['beta_min']}, {t['beta_max']}]")
            else:
                logger.info(f"[{sample_name}] LOO Beta range match.")

            if (stats['r2_min'] < t['r2_min'] - tol) or (stats['r2_max'] > t['r2_max'] + tol):
                logger.warning(f"[{sample_name}] LOO R2 range mismatch. Got [{stats['r2_min']:.2f}, {stats['r2_max']:.2f}], Expected [{t['r2_min']}, {t['r2_max']}]")
            else:
                logger.info(f"[{sample_name}] LOO R2 range match.")

    return summary


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_loo_analysis(
    df_analysis: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 16: Leave-One-Out Robustness.

    Executes:
    1. LOO iterations for Strict and Near samples.
    2. Summarization and validation.

    Parameters
    ----------
    df_analysis : pd.DataFrame
        Analysis DataFrame.
    study_configuration : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Dictionary containing detailed results and summary stats.
    """
    logger.info("Starting Task 16: Leave-One-Out Robustness Analysis.")

    # Step 1 & 2
    raw_results = run_loo_for_samples(df_analysis)

    # Step 3
    summary = summarize_loo_stability(raw_results)

    return {
        'detailed_results': raw_results,
        'summary': summary
    }


In [None]:
# Task 17 — Create orchestrator function for supervised-learning robustness

# ==============================================================================
# Task 17: Create orchestrator function for supervised-learning robustness
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 2: Define supervised orchestrator input/output contract
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass
class SupervisedArtifacts:
    """
    Container for artifacts produced by the supervised learning robustness pipeline.

    Attributes
    ----------
    classification_reports : Dict[str, Dict[str, Any]]
        Metrics (precision, recall, f1) for each classifier (LR, NB, SVM).
    ml_severe_counts : pd.DataFrame
        DataFrame with 'report_id' and 'severe_irregularities_count_ml'.
    ml_pca_index : pd.DataFrame
        DataFrame with 'report_id' and 'corruption_index_ml'.
    comparison_stats : Dict[str, float]
        Comparison metrics (R2, correlation) between Dictionary-PCA and ML-PCA.
    top_features : Dict[str, List[str]]
        Top predictive bigrams for high/low corruption classes.
    training_metadata : Dict[str, Any]
        Metadata about training sample size, class balance, and vocabulary.
    """
    classification_reports: Dict[str, Dict[str, Any]]
    ml_severe_counts: pd.DataFrame
    ml_pca_index: pd.DataFrame
    comparison_stats: Dict[str, float]
    top_features: Dict[str, List[str]]
    training_metadata: Dict[str, Any]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 3: Freeze randomness and evaluation protocol
# -------------------------------------------------------------------------------------------------------------------------------

def set_random_seed(seed: int = 42) -> None:
    """
    Sets the global random seed for reproducibility.

    Parameters
    ----------
    seed : int
        Random seed.
    """
    np.random.seed(seed)
    # If using other libraries with global seeds, set them here.
    # Sklearn estimators will accept random_state locally.


In [None]:
# Task 18 — Supervised-learning robustness: label construction and TF-IDF features

# ==============================================================================
# Task 18: Supervised-learning robustness: label construction and TF-IDF features
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 1: Create binary labels via median split (Appendix A4)
# -------------------------------------------------------------------------------------------------------------------------------

def construct_labels(df_analysis: pd.DataFrame) -> pd.DataFrame:
    """
    Constructs binary labels for supervised learning based on human coder agreement.

    Logic:
    1. Filter to rows where both FF and GT are non-null.
    2. Compute Score = (FF + GT) / 2.
    3. Compute Median(Score).
    4. Define High/Low class for each coder relative to median.
    5. Keep rows where coders agree on class.
    6. Label y=1 (High) if Score >= Median, else 0.

    Parameters
    ----------
    df_analysis : pd.DataFrame
        Analysis DataFrame with 'ff_corruption_count' and 'gt_corruption_count'.

    Returns
    -------
    pd.DataFrame
        DataFrame with 'report_id' and 'label' (y), indexed by report_id.
    """
    # 1. Filter valid
    mask_valid = df_analysis['ff_corruption_count'].notna() & df_analysis['gt_corruption_count'].notna()
    df_valid = df_analysis[mask_valid].copy()

    if df_valid.empty:
        raise ValueError("No valid rows with both FF and GT counts for label construction.")

    # 2. Compute Score
    ff = df_valid['ff_corruption_count']
    gt = df_valid['gt_corruption_count']
    scores = (ff + gt) / 2.0

    # 3. Compute Median
    median_score = scores.median()
    logger.info(f"Label Construction: Median Score = {median_score}")

    # 4. Define Classes per Coder
    # High (1) if >= median, Low (0) if < median
    class_ff = (ff >= median_score).astype(int)
    class_gt = (gt >= median_score).astype(int)

    # 5. Agreement Filter
    mask_agree = (class_ff == class_gt)
    df_train = df_valid[mask_agree].copy()

    # 6. Final Label
    # Since they agree, we can use either class, or recompute on score (same result)
    df_train['label'] = class_ff[mask_agree]

    logger.info(f"Label Construction: {len(df_train)} training samples retained from {len(df_valid)} candidates.")
    logger.info(f"Class Balance: {df_train['label'].value_counts().to_dict()}")

    return df_train[['report_id', 'label']]


# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 2: Build TF-IDF bigram features (Equation 2)
# -------------------------------------------------------------------------------------------------------------------------------

def build_document_corpus(df_irregularities: pd.DataFrame) -> pd.Series:
    """
    Constructs report-level documents by concatenating irregularity texts.

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        Irregularity segments.

    Returns
    -------
    pd.Series
        Series of document texts, indexed by report_id.
    """
    # Sort by index to preserve order
    df_sorted = df_irregularities.sort_values(['report_id', 'irregularity_index'])

    # Group and join
    # Use a space separator
    docs = df_sorted.groupby('report_id')['irregularity_text_raw'].apply(lambda x: " ".join(x.astype(str)))

    return docs


def get_custom_preprocessor(config: Dict[str, Any]) -> Callable[[str], str]:
    """
    Factory function that returns a custom text preprocessing callable configured for the TfidfVectorizer.

    This preprocessor implements the specific NLP pipeline described in the study (Appendix A4)
    to ensure fidelity in feature engineering. The pipeline includes:
    1. Unicode Normalization (NFD form).
    2. Accent Removal (stripping combining diacritical marks).
    3. Case Folding (conversion to lowercase).
    4. Punctuation Removal (replacing non-alphanumeric characters with space).
    5. Tokenization (splitting by whitespace).
    6. Stopword Removal (using Portuguese stopwords).
    7. Stemming (using Porter Stemmer).

    Parameters
    ----------
    config : Dict[str, Any]
        The study configuration dictionary containing NLP parameters.

    Returns
    -------
    Callable[[str], str]
        A function that takes a raw text string and returns a preprocessed, space-joined string of stems.

    Raises
    ------
    LookupError
        If NLTK stopwords are not available and cannot be downloaded.
    """
    # Initialize the Porter Stemmer as specified in the study configuration
    stemmer = PorterStemmer()

    # Compile regex for punctuation removal: matches any character that is NOT a word char or whitespace
    punct_re = re.compile(r'[^\w\s]')

    # Load Portuguese stopwords from NLTK
    try:
        stops = set(stopwords.words('portuguese'))
    except LookupError:
        logger.info("Downloading NLTK stopwords...")
        nltk.download('stopwords')
        stops = set(stopwords.words('portuguese'))

    def preprocess(text: str) -> str:
        """
        Preprocesses a single text string according to the study's NLP pipeline.

        Parameters
        ----------
        text : str
            The raw input text.

        Returns
        -------
        str
            The normalized, stemmed text string.
        """
        # Handle null or non-string inputs gracefully
        if pd.isna(text):
            return ""

        # 1. NFD Normalization: Decompose characters (e.g., 'ç' -> 'c' + '¸')
        text_nfd = unicodedata.normalize('NFD', str(text))

        # 2. Remove Accents: Filter out non-spacing mark characters (Unicode category 'Mn')
        text_no_accents = "".join(c for c in text_nfd if unicodedata.category(c) != 'Mn')

        # 3. Lowercase: Convert to lowercase for case-insensitivity
        text_lower = text_no_accents.lower()

        # 4. Remove Punctuation: Replace punctuation with space to prevent merging words
        text_no_punct = punct_re.sub(' ', text_lower)

        # 5. Tokenize: Split by whitespace
        tokens = text_no_punct.split()

        # 6. Remove Stopwords and 7. Stem: Apply Porter Stemmer to non-stopword tokens
        stems = [stemmer.stem(t) for t in tokens if t not in stops]

        # Join stems back into a single string for TfidfVectorizer
        return " ".join(stems)

    return preprocess

def create_tfidf_vectorizer(config: Dict[str, Any]) -> TfidfVectorizer:
    """
    Creates and configures the TfidfVectorizer.

    Parameters
    ----------
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    TfidfVectorizer
        Configured vectorizer.
    """
    sup_config = config['pipeline_parameters']['supervised_learning_config']

    # Get preprocessor
    preprocessor = get_custom_preprocessor(config)

    vectorizer = TfidfVectorizer(
        preprocessor=preprocessor,
        ngram_range=tuple(sup_config['tfidf_ngram_range']), # (2,2)
        norm=sup_config['tfidf_norm'],
        use_idf=sup_config['tfidf_use_idf'],
        smooth_idf=sup_config['tfidf_smooth_idf']
    )

    return vectorizer


# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 3: Materialize the feature matrix and label vector
# -------------------------------------------------------------------------------------------------------------------------------

def align_features_and_labels(
    docs: pd.Series,
    labels: pd.DataFrame,
    vectorizer: TfidfVectorizer
) -> Dict[str, Any]:
    """
    Aligns documents and labels, fits vectorizer, and creates training matrices.

    Parameters
    ----------
    docs : pd.Series
        Documents indexed by report_id.
    labels : pd.DataFrame
        Labels with 'report_id' and 'label'.
    vectorizer : TfidfVectorizer
        Unfitted vectorizer.

    Returns
    -------
    Dict[str, Any]
        Bundle containing X_train, y_train, vectorizer, report_ids, metadata.
    """
    # Align indices
    # Intersection of report_ids in docs and labels
    common_ids = sorted(list(set(docs.index) & set(labels['report_id'])))

    if not common_ids:
        raise ValueError("No overlap between document corpus and labeled samples.")

    # Filter and sort
    docs_train = docs.loc[common_ids]
    labels_train = labels.set_index('report_id').loc[common_ids]

    # Fit Vectorizer
    logger.info("Fitting TF-IDF Vectorizer...")
    X_train = vectorizer.fit_transform(docs_train)
    y_train = labels_train['label'].values

    vocab = vectorizer.get_feature_names_out()

    metadata = {
        'n_samples': X_train.shape[0],
        'n_features': X_train.shape[1],
        'class_balance': {0: int((y_train==0).sum()), 1: int((y_train==1).sum())},
        'vocabulary_size': len(vocab)
    }

    logger.info(f"Feature Matrix Built: {metadata}")

    return {
        'X_train': X_train,
        'y_train': y_train,
        'vectorizer': vectorizer,
        'report_ids_train': common_ids,
        'metadata': metadata
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def build_supervised_data(
    df_irregularities: pd.DataFrame,
    df_analysis: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 18: Build Supervised Data.

    Executes:
    1. Label Construction.
    2. Document Construction.
    3. Vectorization and Alignment.

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        Irregularity segments.
    df_analysis : pd.DataFrame
        Validation data.
    study_configuration : Dict[str, Any]
        Config.

    Returns
    -------
    Dict[str, Any]
        Training data bundle.
    """
    logger.info("Starting Task 18: Build Supervised Data.")

    try:
        # Step 1: Labels
        df_labels = construct_labels(df_analysis)

        # Step 2: Docs
        docs = build_document_corpus(df_irregularities)

        # Step 3: Vectorizer
        vectorizer = create_tfidf_vectorizer(study_configuration)

        # Step 4: Align & Build
        bundle = align_features_and_labels(docs, df_labels, vectorizer)

        logger.info("Task 18 completed.")
        return bundle

    except Exception as e:
        logger.critical(f"Task 18 Failed: {str(e)}")
        raise e


In [None]:
# Task 19 — Supervised-learning robustness: train classifiers and compare indices

# ===============================================================================
# Task 19: Supervised-learning robustness: train classifiers and compare indices
# ===============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 1: Train classifiers per configuration
# -------------------------------------------------------------------------------------------------------------------------------

def train_classifiers(
    training_bundle: Dict[str, Any],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Trains classifiers (LR, NB, SVM) on the full labeled dataset and computes metrics.

    Parameters
    ----------
    training_bundle : Dict[str, Any]
        Output from Task 18 (X_train, y_train, vectorizer, etc.).
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Dictionary containing:
        - 'models': Dict of trained model objects.
        - 'reports': Dict of classification reports.
        - 'top_features': Dict of top bigrams per class (for LR).
    """
    X_train = training_bundle['X_train']
    y_train = training_bundle['y_train']
    vocab = training_bundle['vectorizer'].get_feature_names_out()

    sup_config = config['pipeline_parameters']['supervised_learning_config']
    hyperparams = sup_config.get('classifier_hyperparameters', {})

    models = {}
    reports = {}
    top_features = {}

    # 1. Logistic Regression
    if sup_config.get('use_logistic_regression', True):
        logger.info("Training Logistic Regression...")
        params = hyperparams.get('logistic_regression') or {'random_state': 42, 'max_iter': 1000}
        lr = LogisticRegression(**params)
        lr.fit(X_train, y_train)
        models['LogisticRegression'] = lr

        y_pred = lr.predict(X_train)
        reports['LogisticRegression'] = classification_report(y_train, y_pred, output_dict=True)

        # Extract top features
        coefs = lr.coef_[0]
        top_indices = coefs.argsort()
        # Bottom 10 (Low Corruption, class 0) and Top 10 (High Corruption, class 1)
        low_features = [vocab[i] for i in top_indices[:10]]
        high_features = [vocab[i] for i in top_indices[-10:]]
        top_features['LogisticRegression'] = {'low': low_features, 'high': high_features}

    # 2. Naive Bayes
    if sup_config.get('use_naive_bayes', True):
        logger.info("Training Naive Bayes...")
        params = hyperparams.get('naive_bayes') or {}
        nb = MultinomialNB(**params)
        nb.fit(X_train, y_train)
        models['NaiveBayes'] = nb

        y_pred = nb.predict(X_train)
        reports['NaiveBayes'] = classification_report(y_train, y_pred, output_dict=True)

    # 3. Linear SVM
    if sup_config.get('use_linear_svm', True):
        logger.info("Training Linear SVM...")
        params = hyperparams.get('linear_svm') or {'random_state': 42, 'dual': 'auto'}
        svm = LinearSVC(**params)
        svm.fit(X_train, y_train)
        models['LinearSVM'] = svm

        y_pred = svm.predict(X_train)
        reports['LinearSVM'] = classification_report(y_train, y_pred, output_dict=True)

    return {
        'models': models,
        'reports': reports,
        'top_features': top_features
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 2: Generate ML-derived severe counts and rebuild PCA index
# -------------------------------------------------------------------------------------------------------------------------------

def rebuild_ml_index(
    df_irregularities: pd.DataFrame,
    df_corpus_counts: pd.DataFrame,
    model: Any,
    vectorizer: TfidfVectorizer,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Predicts severity for all irregularities using the trained model, aggregates counts,
    and rebuilds the PCA index.

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        All irregularity segments.
    df_corpus_counts : pd.DataFrame
        Original corpus data (for other PCA features).
    model : Any
        Trained classifier (e.g., LogisticRegression).
    vectorizer : TfidfVectorizer
        Fitted vectorizer.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Dictionary containing:
        - 'ml_severe_counts': DataFrame with ML-derived counts.
        - 'ml_pca_index': DataFrame with ML-PCA scores.
        - 'pca_artifacts': PCA details.
    """
    logger.info("Predicting severity for all irregularities...")

    # 1. Transform all irregularities
    # Handle NaN texts
    texts = df_irregularities['irregularity_text_raw'].fillna("").astype(str)
    X_all = vectorizer.transform(texts)

    # 2. Predict
    y_pred = model.predict(X_all)

    # 3. Aggregate
    df_preds = df_irregularities[['report_id']].copy()
    df_preds['is_severe_ml'] = y_pred

    agg = df_preds.groupby('report_id')['is_severe_ml'].sum().reset_index()
    agg.rename(columns={'is_severe_ml': 'severe_irregularities_count_ml'}, inplace=True)

    # 4. Merge with Corpus Features
    # We need the other 4 features from df_corpus_counts
    # image_count, page_count, report_lines_count, total_irregularities_count
    # severe_irregularities_count is replaced
    features_needed = ['report_id', 'image_count', 'page_count', 'report_lines_count', 'total_irregularities_count']
    df_features = pd.merge(
        df_corpus_counts[features_needed],
        agg,
        on='report_id',
        how='left'
    )
    # Fill NaN counts with 0 (reports with no irregularities)
    df_features['severe_irregularities_count_ml'] = df_features['severe_irregularities_count_ml'].fillna(0).astype(int)

    # 5. Rebuild PCA
    # We reuse the logic from Task 10, but we need to construct the matrix manually
    # because the column name changed.

    # Construct X matrix
    # Order: image, severe(ML), page, lines, total
    X_ml = df_features[[
        'image_count',
        'severe_irregularities_count_ml',
        'page_count',
        'report_lines_count',
        'total_irregularities_count'
    ]].values.astype(float)

    # Standardize
    mu = np.mean(X_ml, axis=0)
    sigma = np.std(X_ml, axis=0, ddof=1)
    # Handle zero variance if any (unlikely for counts)
    sigma[sigma == 0] = 1.0
    Z_ml = (X_ml - mu) / sigma

    # PCA
    Sigma = (1 / (Z_ml.shape[0] - 1)) * np.dot(Z_ml.T, Z_ml)
    evals, evecs = np.linalg.eigh(Sigma)

    # Sort
    idx = np.argsort(evals)[::-1]
    evals = evals[idx]
    evecs = evecs[:, idx]

    # PC1
    pc1_loadings = evecs[:, 0]
    if np.sum(pc1_loadings) < 0:
        pc1_loadings = -pc1_loadings

    scores = np.dot(Z_ml, pc1_loadings)

    df_scores = pd.DataFrame({
        'report_id': df_features['report_id'],
        'corruption_index_ml': scores
    })

    return {
        'ml_severe_counts': agg,
        'ml_pca_index': df_scores,
        'pca_artifacts': {'eigenvalues': evals, 'loadings': pc1_loadings}
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 3: Compare dictionary-PCA vs ML-PCA indices
# -------------------------------------------------------------------------------------------------------------------------------

def compare_indices(
    df_dict_index: pd.DataFrame,
    df_ml_index: pd.DataFrame
) -> Dict[str, float]:
    """
    Compares the Dictionary-based PCA index with the ML-based PCA index.

    Computes Pearson correlation and R^2.

    Parameters
    ----------
    df_dict_index : pd.DataFrame
        DataFrame with 'report_id' and 'corruption_index'.
    df_ml_index : pd.DataFrame
        DataFrame with 'report_id' and 'corruption_index_ml'.

    Returns
    -------
    Dict[str, float]
        Dictionary with 'correlation', 'R2', 'N'.
    """
    # Merge
    merged = pd.merge(
        df_dict_index[['report_id', 'corruption_index']],
        df_ml_index[['report_id', 'corruption_index_ml']],
        on='report_id',
        how='inner'
    )

    if merged.empty:
        logger.warning("No overlapping reports for index comparison.")
        return {'correlation': 0.0, 'R2': 0.0, 'N': 0}

    # Correlation
    corr = merged['corruption_index'].corr(merged['corruption_index_ml'])
    r2 = corr ** 2

    logger.info(f"Index Comparison: Correlation = {corr:.4f}, R2 = {r2:.4f} (N={len(merged)})")

    return {
        'correlation': corr,
        'R2': r2,
        'N': len(merged)
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def train_and_compare(
    training_bundle: Dict[str, Any],
    df_irregularities: pd.DataFrame,
    df_corpus_counts: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 19: Train, Predict, Compare.

    Executes:
    1. Train classifiers.
    2. Rebuild PCA with ML counts.
    3. Compare indices.

    Parameters
    ----------
    training_bundle : Dict[str, Any]
        Training data.
    df_irregularities : pd.DataFrame
        Irregularity segments.
    df_corpus_counts : pd.DataFrame
        Corpus counts (with dict index).
    study_configuration : Dict[str, Any]
        Config.

    Returns
    -------
    Dict[str, Any]
        Artifacts bundle.
    """
    logger.info("Starting Task 19: Train and Compare.")

    # Step 1: Train
    models_bundle = train_classifiers(training_bundle, study_configuration)

    # Step 2: Rebuild (using LR)
    # Check if LR exists
    if 'LogisticRegression' in models_bundle['models']:
        lr_model = models_bundle['models']['LogisticRegression']
        ml_index_bundle = rebuild_ml_index(
            df_irregularities,
            df_corpus_counts,
            lr_model,
            training_bundle['vectorizer'],
            study_configuration
        )

        # Step 3: Compare
        comparison = compare_indices(
            df_corpus_counts,
            ml_index_bundle['ml_pca_index']
        )
    else:
        logger.warning("Logistic Regression not trained. Skipping index rebuild.")
        ml_index_bundle = {}
        comparison = {}

    return {
        'models_bundle': models_bundle,
        'ml_index_bundle': ml_index_bundle,
        'comparison': comparison
    }


In [None]:
# Orchestrator function for running Supervised Learning Robustness

# ===============================================================================
# Orchestrator function for running Supervised Learning Robustness
# ===============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Orchestrator function for running Supervised Learning Robustness
# -------------------------------------------------------------------------------------------------------------------------------

def run_supervised_robustness(
    df_irregularities: pd.DataFrame,
    df_analysis: pd.DataFrame,
    df_corpus_counts: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> SupervisedArtifacts:
    """
    Orchestrator function for running Supervised Learning Robustness.

    Executes the full supervised learning pipeline:
    1. Label construction & Feature Engineering (Task 18).
    2. Model Training & Prediction (Task 19).
    3. PCA Rebuild & Comparison (Task 19).

    Parameters
    ----------
    df_irregularities : pd.DataFrame
        Irregularity segments (text source).
    df_analysis : pd.DataFrame
        Validation data (label source).
    df_corpus_counts : pd.DataFrame
        Corpus data with original counts (for PCA rebuild).
    study_configuration : Dict[str, Any]
        Pipeline configuration.

    Returns
    -------
    SupervisedArtifacts
        Container with all supervised results.
    """
    logger.info("Starting Supervised Learning Robustness Pipeline.")

    try:
        # Set seed
        set_random_seed(42)

        # Note: The following functions are defined in Tasks 18 and 19.
        # They are called here to define the complete workflow.

        # Step 1: Build Labels and Features (Task 18)
        # Returns training data bundle and vectorizer for prediction
        # Expected return: Dict with keys 'X_train', 'y_train', 'vectorizer', 'metadata', 'report_ids_train'
        training_bundle = build_supervised_data(
            df_irregularities,
            df_analysis,
            study_configuration
        )

        # Step 2: Train Classifiers (Task 19)
        # Returns trained models and metrics
        # Expected return: Dict with keys 'models', 'reports', 'top_features'
        models_bundle = train_classifiers(
            training_bundle,
            study_configuration
        )

        # Step 3: Predict & Rebuild Index (Task 19)
        # Uses Logistic Regression as primary for index reconstruction
        # Expected return: Dict with keys 'ml_severe_counts', 'ml_pca_index'
        ml_index_bundle = rebuild_ml_index(
            df_irregularities,
            df_corpus_counts,
            models_bundle['models']['LogisticRegression'],
            training_bundle['vectorizer'],
            study_configuration
        )

        # Step 4: Compare (Task 19)
        # Expected return: Dict with keys 'correlation', 'R2', 'N'
        comparison = compare_indices(
            df_corpus_counts, # Contains original 'corruption_index'
            ml_index_bundle['ml_pca_index']
        )

        # Assemble Artifacts
        artifacts = SupervisedArtifacts(
            classification_reports=models_bundle['reports'],
            ml_severe_counts=ml_index_bundle['ml_severe_counts'],
            ml_pca_index=ml_index_bundle['ml_pca_index'],
            comparison_stats=comparison,
            top_features=models_bundle['top_features'],
            training_metadata=training_bundle['metadata']
        )

        logger.info("Supervised Robustness Pipeline Completed.")
        return artifacts

    except NameError as e:
        # This handles the case where functions aren't defined yet in this notebook context
        logger.warning(f"Sub-functions for Tasks 18/19 not yet defined: {e}")
        raise e
    except Exception as e:
        logger.critical(f"Supervised Pipeline Failed: {str(e)}")
        raise e



In [None]:
# Top-Level Orchestrator Function

# ==============================================================================
# Top-Level Orchestrator
# ==============================================================================

def execute_full_research_pipeline(
    df_raw_corpus: pd.DataFrame,
    df_validation_raw: pd.DataFrame,
    language: str,
    raw_lexicon_list: List[str],
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Top-level orchestrator that executes the entire end-to-end research pipeline.

    This function coordinates the execution of:
    1. The Main Pipeline (Data Cleansing, NLP, Index Construction, Validation Regressions).
    2. Leave-One-Out (LOO) Robustness Analysis.
    3. Supervised Learning Robustness Analysis.

    It ensures data flows correctly between these stages, utilizing artifacts produced
    in the main pipeline as inputs for the robustness checks.

    Parameters
    ----------
    df_raw_corpus : pd.DataFrame
        The raw corpus DataFrame containing audit reports and metadata.
    df_validation_raw : pd.DataFrame
        The raw validation DataFrame containing human-coded data and covariates.
    language : str
        The language of the study (e.g., "Portuguese").
    raw_lexicon_list : List[str]
        The list of raw dictionary terms for corruption identification.
    study_configuration : Dict[str, Any]
        The comprehensive configuration dictionary for the study.

    Returns
    -------
    Dict[str, Any]
        A dictionary containing all artifacts from the research pipeline:
        - 'main_pipeline': Dict of artifacts from the main analysis (tables, indices, etc.).
        - 'loo_analysis': Dict of results from the Leave-One-Out robustness check.
        - 'supervised_robustness': Dict of results from the Supervised Learning check.
    """
    logger.info("Starting End-to-End Research Pipeline Execution.")

    try:
        # ----------------------------------------------------------------------
        # Step 1: Execute Main Pipeline
        # ----------------------------------------------------------------------
        logger.info(">>> Step 1: Executing Main Pipeline...")

        # run_main_pipeline returns a PipelineArtifacts dataclass
        main_artifacts = run_main_pipeline(
            df_raw_corpus,
            df_validation_raw,
            language,
            raw_lexicon_list,
            study_configuration
        )

        logger.info("Main Pipeline execution successful.")

        # ----------------------------------------------------------------------
        # Step 2: Execute Leave-One-Out (LOO) Robustness Analysis
        # ----------------------------------------------------------------------
        logger.info(">>> Step 2: Executing Leave-One-Out Robustness Analysis...")

        # Requires the analysis DataFrame (merged corpus + validation) from the main pipeline
        loo_results = run_loo_analysis(
            main_artifacts.df_analysis,
            study_configuration
        )

        logger.info("LOO Analysis execution successful.")

        # ----------------------------------------------------------------------
        # Step 3: Execute Supervised Learning Robustness Analysis
        # ----------------------------------------------------------------------
        logger.info(">>> Step 3: Executing Supervised Learning Robustness Analysis...")

        # Requires:
        # - Irregularity segments (text source)
        # - Analysis DataFrame (label source)
        # - Corpus with counts (for PCA rebuild features)
        supervised_results = run_supervised_robustness(
            main_artifacts.df_irregularities,
            main_artifacts.df_analysis,
            main_artifacts.df_corpus_with_index,
            study_configuration
        )

        logger.info("Supervised Robustness execution successful.")

        # ----------------------------------------------------------------------
        # Step 4: Assemble Final Output
        # ----------------------------------------------------------------------
        # Convert dataclasses to dicts for the final return structure if preferred,
        # or keep objects. Here we return a structured dictionary.
        final_output = {
            'main_pipeline': asdict(main_artifacts),
            'loo_analysis': loo_results,
            'supervised_robustness': asdict(supervised_results)
        }

        logger.info("End-to-End Pipeline Execution Completed Successfully.")
        return final_output

    except Exception as e:
        logger.critical(f"End-to-End Pipeline Failed: {str(e)}")
        raise e
