# `README.md`

# An Independent Implementation of "Parental environment and student achievement: Does a Matthew effect exist?"

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2510.18481v1-b31b1b.svg)](https://arxiv.org/abs/2510.18481v1)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/parental_environment_and_student_achievement)
[![Discipline](https://img.shields.io/badge/Discipline-Economics%20of%20Education-00529B)](https://github.com/chirindaopensource/parental_environment_and_student_achievement)
[![Data Sources](https://img.shields.io/badge/Data-Madrid%20Gov%20%7C%20Barro--Lee-lightgrey)](https://github.com/chirindaopensource/parental_environment_and_student_achievement)
[![Core Method](https://img.shields.io/badge/Method-IV%20%7C%20Fixed%20Effects-orange)](https://github.com/chirindaopensource/parental_environment_and_student_achievement)
[![Analysis](https://img.shields.io/badge/Analysis-Causal%20Inference%20%7C%20Matthew%20Effect-red)](https://github.com/chirindaopensource/parental_environment_and_student_achievement)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![Matplotlib](https://img.shields.io/badge/matplotlib-%2311557c.svg?style=flat&logo=matplotlib&logoColor=white)](https://matplotlib.org/)
[![linearmodels](https://img.shields.io/badge/linearmodels-003F72-blue)](https://bashtage.github.io/linearmodels/)
[![PyYAML](https://img.shields.io/badge/PyYAML-gray?logo=yaml&logoColor=white)](https://pyyaml.org/)
[![Faker](https://img.shields.io/badge/Faker-lightgrey)](https://faker.readthedocs.io/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
---

**Repository:** `https://github.com/chirindaopensource/parental_environment_and_student_achievement`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Parental environment and student achievement: Does a Matthew effect exist?"** by:

*   Gaëlle Aymeric
*   Emmanuelle Lavaine
*   Brice Magdalou

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous data validation and feature engineering to the core econometric estimations (OLS and IV with Fixed Effects) and the final generation of all tables, figures, and reports.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `execute_full_study`](#key-callable-execute_full_study)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "Parental environment and student achievement: Does a Matthew effect exist?". The core of this repository is the iPython Notebook `parental_environment_and_student_achievement_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation to the final generation of all analytical tables and figures.

The paper investigates the causal impact of parental environment on student achievement, distinguishing between a static "persistent effect" and a dynamic "Matthew Effect" (the changing influence of parental background over time). This codebase operationalizes the paper's framework, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Process raw student survey data and external instrument data, applying a sequence of cleaning, harmonization, and feature engineering steps.
-   Estimate all 33 econometric models from the paper, including pooled OLS, interaction OLS, and their 2SLS instrumental variable counterparts, all with high-dimensional (school and year) fixed effects and clustered standard errors.
-   Run a full suite of diagnostic and robustness checks to validate the credibility of the findings.
-   Automatically generate all key figures and a final narrative report summarizing the results and policy implications.

## Theoretical Background

The implemented methods are grounded in modern microeconometrics and causal inference.

**1. High-Dimensional Fixed Effects (HDFE) OLS:**
The baseline models for the persistent effect (Equation 1) and the Matthew Effect (Equation 2) are estimated via OLS with fixed effects for both school ($b_s$) and academic year ($a_t$).
$$
Y_{its} = \alpha_0 + \sum_{j=1,2} \alpha_j p_{ji} + \text{Controls} + a_t + b_s + \epsilon_{its} \quad (1)
$$
$$
Y_{igts} = \dots + \sum_{j=1,2} \theta_j p_{ji} I(u,v) + \dots \quad (2)
$$
The fixed effects are absorbed using the Frisch-Waugh-Lovell theorem to control for all time-invariant school characteristics and year-specific shocks.

**2. Two-Stage Least Squares (2SLS) with an Instrumental Variable (IV):**
To address the endogeneity of parental education (`PHE`), the study employs a 2SLS strategy. The instrument is the gender gap in tertiary education in the parent's country of origin in 1960 (`GG`).
-   **First Stage (Relevance):** The endogenous variable is regressed on the instrument and all exogenous controls.
    $$
    \text{PHE}_i = \pi_0 + \pi_1 \text{GG}_i + \text{Controls} + a_t + b_s + \nu_{its} \quad (3)
    $$
-   **Second Stage (Causal Effect):** The outcome is regressed on the *predicted* value of the endogenous variable from the first stage.
    $$
    Y_{its} = \alpha_0 + \alpha_1 \widehat{\text{PHE}}_i + \text{Controls} + a_t + b_s + \epsilon_{its} \quad (4)
    $$
This methodology is extended to the interaction models to estimate the *causal* Matthew Effect.

**3. Cluster-Robust Standard Errors:**
All models are estimated with standard errors clustered at the `school_id` level. This accounts for the fact that residuals for students within the same school may be correlated, providing more conservative and reliable statistical inference.

## Features

The provided iPython Notebook (`parental_environment_and_student_achievement_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into 27 distinct, modular tasks, each with its own orchestrator function, ensuring clarity and testability.
-   **Configuration-Driven Design:** All study parameters are managed in an external `config.yaml` file, allowing for easy customization and replication.
-   **Rigorous Data Validation:** A multi-stage validation process checks the schema, content integrity, and statistical properties of all input data before any analysis begins.
-   **Systematic Feature Engineering:** A transparent, step-by-step process for constructing the main endogenous variable (`PHE`), the instrumental variable (`GG`), and the full set of dummy-coded control variables.
-   **Advanced Econometric Engine:** A generic estimation function that correctly handles high-dimensional fixed effects and clustered standard errors for both OLS and 2SLS models using the `linearmodels` library.
-   **Comprehensive Estimation Suite:** Programmatically specifies and estimates all 33 models from the paper.
-   **Automated Diagnostics and Reporting:** Concludes by automatically generating all key figures, a full suite of robustness checks, and a final narrative summary of the findings.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Validation & Cleaning (Tasks 1-5):** Ingests and validates the `config.yaml` and raw data, cleanses the student survey data by applying inclusion criteria, and harmonizes all categorical codings.
2.  **Feature Engineering (Tasks 6-8):** Constructs the `PHE` variable and its dummies, creates the full set of control variables, and constructs the `GG` instrumental variable.
3.  **Sample & Model Definition (Tasks 9-11):** Programmatically defines the 33 distinct analysis samples using listwise deletion and creates the formal specifications for all OLS and IV models.
4.  **Estimation (Tasks 12-20):** Executes all OLS and IV regressions (pooled, by-grade, and interaction), extracting and formatting the results.
5.  **Visualization & Diagnostics (Tasks 21-24):** Generates all CDF and coefficient plots, and performs final diagnostic checks on sample composition and data normalization.
6.  **Robustness & Reporting (Tasks 25-27):** Provides top-level orchestrators to run the full pipeline, execute a suite of robustness checks, and synthesize the final narrative report.

## Core Components (Notebook Structure)

The `parental_environment_and_student_achievement_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `execute_full_study`

The project is designed around a single, top-level user-facing interface function:

-   **`execute_full_study`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, from data validation to the final report generation.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `matplotlib`, `linearmodels`, `pyyaml`, `faker`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/parental_environment_and_student_achievement.git
    cd parental_environment_and_student_achievement
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy matplotlib linearmodels pyyaml Faker
    ```

## Input Data Structure

The pipeline requires two `pandas.DataFrame` objects with specific schemas, which are rigorously validated by the pipeline. A synthetic data generator is included in the notebook for demonstration purposes.
1.  **`student_parent_survey_raw`**: The primary dataset containing student and parent information.
2.  **`barro_lee_raw`**: The external dataset with historical education data for constructing the instrument.

All other parameters are controlled by the `config.yaml` file.

## Usage

The `parental_environment_and_student_achievement_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `execute_full_study` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # The notebook includes a synthetic data generation stage.
    # We assume `student_parent_survey_raw` and `barro_lee_raw` are in memory.
    
    # Load the configuration from the YAML file.
    with open('config.yaml', 'r') as f:
        config = yaml.safe_load(f)
    
    # Define the top-level directory for all outputs.
    RESULTS_DIRECTORY = "study_replication_output"

    # Execute the entire replication study.
    full_results = execute_full_study(
        student_parent_survey_raw=student_parent_survey_raw,
        barro_lee_raw=barro_lee_raw,
        config=config,
        output_dir=RESULTS_DIRECTORY
    )
```

## Output Structure

The pipeline creates a `base_output_dir` and returns a comprehensive dictionary containing all analytical artifacts, structured as follows:
-   **`main_analysis`**: Contains all primary results.
    -   `results`: Nested dictionary of all regression tables (DataFrames).
    -   `diagnostics`: Diagnostic tables (e.g., sample flow).
    -   `figures`: List of paths to all generated plots (PNG files).
-   **`robustness_checks`**: Contains all sensitivity analysis reports.
-   **`final_summary_report`**: A string containing the final narrative summary.

## Project Structure

```
parental_environment_and_student_achievement/
│
├── parental_environment_and_student_achievement_draft.ipynb  # Main implementation notebook
├── config.yaml                                               # Master configuration file
├── requirements.txt                                          # Python package dependencies
│
├── study_replication_output/                                 # Example output directory
│   └── figures/
│       ├── cdf_math_grade_3.png
│       └── ...
│
├── LICENSE                                                   # MIT Project License File
└── README.md                                                 # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can easily modify all study parameters, including date ranges, variable definitions, and model specifications, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Alternative Instruments:** Integrating and testing alternative instrumental variables to check the robustness of the causal claims.
-   **Heterogeneity Analysis:** Exploring how the Matthew Effect varies across different subgroups of the population (e.g., by gender, immigrant status, or school type).
-   **Machine Learning Models:** Using non-parametric or machine learning methods (e.g., causal forests) to explore potential non-linearities in the treatment effects.
-   **Structural Modeling:** Building and estimating a structural model of skill formation to provide deeper theoretical insights into the observed dynamic patterns.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{aymeric2025parental,
  title={Parental environment and student achievement: Does a Matthew effect exist?},
  author={Aymeric, Ga{\"e}lle and Lavaine, Emmanuelle and Magdalou, Brice},
  journal={arXiv preprint arXiv:2510.18481},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Professional-Grade Implementation of "Parental environment and student achievement: Does a Matthew effect exist?".
GitHub repository: https://github.com/chirindaopensource/parental_environment_and_student_achievement
```

## Acknowledgments

-   Credit to **Gaëlle Aymeric, Emmanuelle Lavaine, and Brice Magdalou** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, Matplotlib, and the `linearmodels` library**.

--

*This README was generated based on the structure and content of the `parental_environment_and_student_achievement_draft.ipynb` notebook and follows best practices for research software documentation.*

# Paper

Title: "*Parental environment and student achievement: Does a Matthew effect exist?*"

Authors: Gaëlle Aymeric, Emmanuelle Lavaine, Brice Magdalou

E-Journal Submission Date: 21 October 2025

Link: https://arxiv.org/abs/2510.18481v1

Abstract:

This paper investigates the causal impact of the parental environment on the student's academic performance in mathematics, literature and English (as a foreign language), using a new database covering all children aged 8 to 15 of the Madrid community, from 2016 to 2019. Parental environment refers here to the parents' level of education (i.e. the skills they acquired before bringing up their children), and parental investment (the effort made by parents to bring up their children). We distinguish the persistent effect of the parental environment from the so-called Matthew effect, which describes a possible tendency for the impact of the parental environment to increase as the child grows up. Whatever the subject (mathematics, literature or English), our results are in line with most studies concerning the persistent effect: a favourable parental environment goes hand in hand with better results for the children. As regards the Matthew effect, the results differ between subjects: while the impact of the parental environment tends to diminish from the age of 8 to 15 in mathematics, it forms a bell curve in literature (first increasing, then decreasing) and increases steadily in English. This result, which is encouraging for mathematics and even literature, confirms the social dimension involved in learning a foreign language compared to more academic subjects.


# Summary

### **Summary and Analysis of Aymeric, Lavaine, and Magdalou (2025)**

#### **Core Research Question and Contribution**

The paper investigates the causal impact of the "parental environment" on a child's academic achievement over time. It seeks to disentangle two distinct phenomena:

1.  **The Persistent Effect:** The baseline observation that a more favorable parental environment (higher parental education, greater investment) is consistently associated with better student outcomes at any given point in time.
2.  **The Matthew Effect:** The central hypothesis under investigation. Named after a biblical passage ("the rich get richer"), this effect posits that the *impact* of the parental environment *grows* as a child ages. In other words, initial advantages accumulate and widen the achievement gap over time.

The paper's primary contribution is to test for the existence and nature of this Matthew effect across three core academic subjects—mathematics, literature, and English—using a novel and comprehensive dataset.

#### **Data and Descriptive Analysis**

The authors construct a unique and powerful dataset from the Madrid Community in Spain, covering all students aged 8, 11, and 15 (Grades 3, 6, and 10) from 2016 to 2019.

*   **Source:** Administrative data from the Ministry of Education and Research of the Community of Madrid.
*   **Scope:** It is a census, not a sample, covering over 320,000 students in both public and private schools.
*   **Variables:**
    *   **Outcome:** Standardized test scores in mathematics, literature, and English (foreign language), normalized to a mean of 500 and a standard deviation of 100 (similar to PISA scores).
    *   **Key Explanatory Variable:** Parental environment, operationalized through two channels:
        1.  **Parents' Highest Level of Education:** Categorized into three ISCED levels (0-2, 3-5, 6-8).
        2.  **Parental Investment:** Proxied by survey responses on the frequency of parent-child discussions about school.
    *   **Controls:** A rich set of student and household characteristics (e.g., gender, country of birth, student's time spent on homework).

**Descriptive Finding:** A preliminary look at the data via Cumulative Distribution Functions (CDFs) reveals a clear pattern of **first-order stochastic dominance**. For any given score, a child whose parents have a higher level of education has a higher probability of achieving that score or better. This strongly suggests the existence of the *persistent effect* and inequality of opportunity.

#### **Econometric Strategy I - OLS with Interaction Terms**

The authors first employ a linear regression model with fixed effects to establish baseline correlations.

*   **Model 1 (Persistent Effect):** They regress student scores on dummy variables for parental education levels, controlling for student/household characteristics, school fixed effects, and year fixed effects.
    
    `Score_its = α₀ + α₁·ParentEd_i¹ + α₂·ParentEd_i² + Controls_i + Year_t + School_s + ε_its`
    
    Here, `α₁` and `α₂` measure the average score advantage for children from medium and high-education families, respectively, compared to the low-education reference group.
    
*   **Model 2 (Matthew Effect):** To test how this effect evolves, they introduce interaction terms between the parental education dummies and grade-level dummies.
    
    `Score_igts = ... + θ₁·(ParentEd_i¹ × Grade_g) + θ₂·(ParentEd_i² × Grade_g) + ...`
    
    The coefficients `θ₁` and `θ₂` are the core of this analysis. A positive and significant `θ` would indicate that the achievement gap associated with parental education widens as students move to a higher grade, providing evidence for a Matthew effect. A negative `θ` would suggest the gap is closing.

#### **Results from the OLS Analysis**

The OLS models yield nuanced, subject-specific results regarding the Matthew effect.

*   **Persistent Effect:** Confirmed across all subjects. Higher parental education is strongly and significantly associated with higher student scores.
*   **Matthew Effect (via interaction terms):**
    *   **Mathematics:** The impact of parental education *diminishes* between Grade 6 and Grade 10. The achievement gap narrows.
    *   **Literature:** The impact follows a **bell-shaped curve**. It increases from Grade 3 to 6 and then decreases from Grade 6 to 10.
    *   **English (Foreign Language):** The impact **continuously and significantly increases** across all grades. This is the only subject that exhibits a classic, unambiguous Matthew effect.

#### **Econometric Strategy II - Instrumental Variable (IV) Analysis**

Recognizing that OLS estimates may be biased by endogeneity (e.g., unobserved variables like parental motivation or inherited ability that correlate with both parental education and child achievement), the authors implement a more rigorous instrumental variable (IV) strategy to identify a causal effect.

*   **The Instrument:** The **gender gap in tertiary education in 1960 in the parent's country of origin**.
*   **Justification for Validity:**
    1.  **Relevance:** This historical, country-level variable is plausibly correlated with a parent's own educational attainment, especially for immigrant families. The first-stage regressions confirm this with high F-statistics (well above the conventional threshold of 10), indicating a strong instrument.
    2.  **Exclusion Restriction:** The instrument is assumed to affect the child's current academic performance in Madrid *only through* its effect on their parent's education. It is unlikely that the gender education gap in, for example, Ecuador in 1960 has a direct impact on a child's test scores today, other than via the channel of parental human capital.

*   **Implementation:** They use a two-stage least squares (2SLS) approach, first predicting parental education using the instrument and then using these predicted values to estimate the causal effect on student scores.

#### **Results from the IV Analysis**

The IV estimates largely confirm and strengthen the OLS findings, lending them a causal interpretation.

*   **Persistent Effect:** The causal impact of parental education on student scores is positive, significant, and large. For instance, a one-category increase in parental education causally increases scores by 37 points in math, 51 in literature, and a striking 72 in English.
*   **Matthew Effect (via IV with interactions):** The dynamic pattern observed in the OLS models holds.
    *   **Mathematics & Literature:** The causal influence of parental education diminishes as students age (particularly after Grade 6).
    *   **English:** The causal influence is unambiguously amplified as students mature, confirming a strong Matthew effect.

#### **Analysis of Student Effort and Parental Investment**

The paper also examines the evolving impact of student effort (proxied by time spent on homework) and parental investment (frequency of school-related talks).

*   **Student Effort:** The impact of homework becomes increasingly positive as students get older. In Grade 3, more homework is correlated with *lower* scores (likely reverse causality), but by Grade 10, it is a strong positive predictor of success, especially in literature and math.
*   **Parental Investment:** The impact of parental involvement decreases over time in math and literature but, consistent with the main findings, *increases* in English.

#### **Discussion and Policy Implications**

The authors conclude by interpreting their heterogeneous findings and drawing policy implications.

*   **Interpretation:** The divergence between subjects is key.
    *   In **mathematics and literature**, students appear to gain autonomy as they age. Their own effort becomes a more dominant factor, and the influence of their family background wanes. This aligns with cognitive development theories (e.g., Piaget) where children develop intrinsic learning capabilities.
    *   In **English (foreign language)**, learning is presented as a more socially-embedded process. Parental resources, exposure, and attitudes (integrative motivation) become *more* critical over time, creating a path-dependent accumulation of advantage (aligning with theories from Vygotsky and Cunha & Heckman).

*   **Policy Implications:**
    1.  **Remediation:** The diminishing influence of family background in math and literature suggests that remediation programs for adolescents in these subjects can be effective, as student effort plays a larger role.
    2.  **Equality of Opportunity:** The strong Matthew effect in English implies that using foreign language proficiency as a key criterion for university or elite school admission can significantly exacerbate inequality, as it heavily favors students from advantaged backgrounds.

### **Overall Assessment**

This is a methodologically robust and compelling paper. Its strength lies in the use of a unique, large-scale dataset combined with a credible IV strategy to move from correlation to causation. The central finding—that the Matthew effect is not a universal phenomenon but is highly subject-dependent—is a significant contribution to the literature on educational inequality.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================
#
#  Replication Pipeline for "Parental environment and student achievement:
#  Does a Matthew effect exist?"
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Parental environment and student
#  achievement: Does a Matthew effect exist?" by Aymeric, Lavaine, and
#  Magdalou (2025). It delivers a fully automated and reproducible system for
#  investigating the causal impact of parental environment on student
#  achievement, with a focus on identifying subject-specific "Matthew Effects."
#
#  The pipeline is designed to enable a transition from uniform educational
#  policies to dynamic, evidence-based strategies by identifying the precise
#  age groups and subjects where parental background has the strongest causal
#  influence.
#
#  Core Methodological Components:
#  • High-dimensional fixed effects (HDFE) OLS models for associational analysis.
#  • Two-Stage Least Squares (2SLS) with HDFE for causal inference, using a
#    historical gender gap in education as an instrumental variable.
#  • Estimation of both persistent (average) and dynamic (Matthew Effect)
#    impacts of parental education.
#  • School-level cluster-robust standard errors for valid statistical inference.
#  • Systematic analysis across three subjects (Mathematics, Literature, English)
#    and three grade levels (3, 6, 10).
#
#  Technical Implementation Features:
#  • A modular, multi-stage pipeline covering data validation, cleaning, feature
#    engineering, model specification, estimation, visualization, and reporting.
#  • Use of the `linearmodels` library for efficient estimation of panel data
#    models with absorbed fixed effects.
#  • Programmatic generation of all analysis samples, model specifications,
#    result tables, and figures to ensure perfect reproducibility.
#  • A comprehensive suite of diagnostic and robustness checks, including
#    sensitivity to control variable inclusion and instrument validity tests.
#
#  Paper Reference:
#  Aymeric, G., Lavaine, E., & Magdalou, B. (2025). Parental environment and
#  student achievement: Does a Matthew effect exist?. arXiv preprint
#  arXiv:2510.18481v1.
#  https://arxiv.org/abs/2510.18481v1
#
#  Author: [Your Name/Organization]
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================

# --- Core Libraries ---
# Used for data manipulation and numerical operations.
import math
import os
import warnings

# --- Third-Party Libraries ---
# Primary library for data analysis and manipulation.
import pandas as pd

# Primary library for numerical operations, especially array manipulation.
import numpy as np

# Primary library for creating static, animated, and interactive visualizations.
import matplotlib.pyplot as plt

# Primary library for advanced econometric modeling, especially panel data.
from linearmodels.panel import PanelOLS, IV2SLS
from linearmodels.panel.results import PanelEffectsResults, IVPanelEffectsResults

# --- Typing ---
# Used for providing detailed type hints for function signatures and variables,
# enhancing code clarity, and enabling static analysis.
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Set,
    Tuple,
    Union
)


# Implementation

## Draft 1

### **Functional Specification of Pipeline Components**

#### **Task 1: `validate_configuration`**

*   **Inputs:** `config: Dict[str, Any]`.
*   **Process:** Performs a multi-step validation of the `config` dictionary.
    1.  **Structural Validation (`_validate_config_structure`):** Verifies that all 15 required top-level keys exist and that their corresponding values are dictionaries. This confirms the basic hierarchical structure.
    2.  **Scope Validation (`_validate_scope_and_period`):** Extracts and asserts the exact values of critical study parameters (e.g., `start_year == 2016`, `grades_of_interest == [3, 6, 10]`). This locks the analysis to the specific scope of the replication.
    3.  **Model Parameter Validation (`_validate_variables_and_models`):** Extracts and asserts the exact values of parameters that define the econometric models (e.g., `endogenous_variable == 'PHE'`, `fixed_effects == ['school_id', 'academic_year']`).
*   **Outputs:** `bool` (`True` on success). Raises `ConfigurationError` on failure.
*   **Research Role:** This callable is the **Protocol Enforcement Layer**. It acts as a non-negotiable guardrail, ensuring that the entire pipeline is executed with a set of parameters that are a perfect match for the methodology described in the paper. It prevents any deviation, intentional or accidental, from the replication protocol.

#### **Task 2: `validate_student_survey_data`**

*   **Inputs:** `student_parent_survey_raw: pd.DataFrame`, `config: Dict[str, Any]`.
*   **Process:** Performs a three-step validation of the primary input data.
    1.  **Schema Validation (`_validate_dataframe_schema`):** Compares the column names and data types of the input DataFrame against a canonical, hard-coded schema of 26 required columns. It reports missing columns, extra columns, and any dtype mismatches.
    2.  **Content Integrity Validation (`_validate_content_integrity`):** For 16 specific categorical columns, it checks that all non-null values fall within their predefined valid domains (e.g., `gender` must be in `{1, 2}`). It also enforces the structural rule that no record can have `grade == 10` and `academic_year == 2016`.
    3.  **Normalization Validation (`_validate_score_normalization`):** For each of the 3 score columns, it groups the data by `(grade, academic_year)` and calculates the mean and standard deviation for each cell. It asserts that the mean is within `500 ± 3` and the standard deviation is within `100 ± 3`.
*   **Outputs:** `bool` (`True` on success). Raises `SchemaValidationError` or `DataIntegrityError` on failure.
*   **Research Role:** This callable is the **Data Quality Assurance Layer**. It empirically verifies the claims made in the "Data" section of the paper, ensuring that the raw data is structurally sound, internally consistent, and has been correctly pre-processed (specifically, the IRT score normalization) before the replication pipeline begins.

#### **Task 3: `validate_barro_lee_data`**

*   **Inputs:** `barro_lee_raw: pd.DataFrame`, `student_parent_survey_raw: pd.DataFrame`, `config: Dict[str, Any]`.
*   **Process:** Performs a three-step validation of the external instrument data.
    1.  **Schema Validation (`_validate_barro_lee_schema`):** Checks the column names and dtypes of the `barro_lee_raw` DataFrame against its 7 required columns.
    2.  **Content Integrity Validation (`_validate_barro_lee_content`):** Verifies that `reference_year` is always 1960, that `share_...` columns are valid proportions in [0,1], and that `country_cob` strings are valid 3-character uppercase ISO codes.
    3.  **Coverage Validation (`_validate_instrument_coverage`):** Extracts the unique set of parent country codes from the student data and the unique set of available country codes from the Barro-Lee data. It computes the set difference to identify any required countries for which no instrument data is available and checks that the `config` specifies a valid policy (`"drop"`) for these cases.
*   **Outputs:** `bool` (`True` on success). Raises `SchemaValidationError` or `DataIntegrityError` on failure.
*   **Research Role:** This callable is the **Instrument Feasibility Layer**. It validates the external data source for the instrumental variable and, critically, performs a pre-analysis to determine the extent to which the instrument can be matched to the analysis sample. This provides an early diagnostic on the potential sample size and external validity of the causal analysis.

#### **Task 4: `cleanse_student_survey_data`**

*   **Inputs:** `student_parent_survey_raw: pd.DataFrame`, `config: Dict[str, Any]`.
*   **Process:** Transforms the raw data into the final analytical population by applying a sequence of filters.
    1.  **Inclusion Criteria (`_apply_inclusion_criteria`):** Filters the DataFrame to keep only rows where `parent_response_complete == True` and `grade` is in `{3, 6, 10}`.
    2.  **Invalid Record Removal (`_remove_invalid_records`):** Applies study-specific exclusions (e.g., drops rows where `grade == 10` and `academic_year == 2016`). It then performs sequential checks to remove any rows with missing values in the critical identifiers `student_id`, `school_id`, or `academic_year`.
    3.  **Deduplication (`_resolve_duplicates`):** Identifies all records that are duplicates on the key `(student_id, academic_year, grade)`. For each duplicate set, it deterministically selects the single "best" record to keep by sorting on data completeness and original index, then drops all others.
*   **Outputs:** `Tuple[pd.DataFrame, Dict[str, Any]]` (the cleansed DataFrame, a detailed attrition log).
*   **Research Role:** This callable constructs the **Analytical Population**. It is the precise, reproducible implementation of the sample selection process that defines the universe of observations for the study. The attrition log it produces is the basis for the sample flow table in the paper's appendix.

#### **Task 5: `harmonize_and_standardize_data`**

*   **Inputs:** The cleansed DataFrame from Task 4, `config: Dict[str, Any]`.
*   **Process:** Performs in-place data cleaning and metadata extraction.
    1.  **Country Code Standardization (`_standardize_country_codes`):** For the three `..._cob` columns, it applies `.str.upper().str.strip()` and validates the 3-character alphabetic format, nullifying any invalid codes.
    2.  **Numeric Validation (`_validate_numeric_categoricals`):** For 12 specific numeric categorical columns, it enforces domain integrity, either by clipping values to a valid range (for `days_homework_raw`) or by nullifying out-of-domain values (for all others).
    3.  **Metadata Extraction (`_document_reference_categories`):** It parses the `config` dictionary to create a new dictionary that explicitly maps each categorical variable to its reference category for modeling.
*   **Outputs:** `Tuple[pd.DataFrame, Dict[str, Any], Dict[str, Any]]` (the harmonized DataFrame, a cleaning log, the reference category metadata).
*   **Research Role:** This callable is the **Data Standardization Layer**. It ensures that all categorical variables are clean and consistently formatted, preventing errors in merges and dummy variable creation. The reference category dictionary it creates is a critical piece of metadata that guarantees all subsequent models are specified with the same baseline assumptions.

#### **Task 6: `construct_parental_education_variable`**

*   **Inputs:** The harmonized DataFrame from Task 5, `config: Dict[str, Any]`.
*   **Process:** Engineers the primary independent variable, `PHE`.
    1.  **Max Education (`_compute_phe_raw`):** Implements $PHE\_raw_i = \max(\text{parent\_edu\_mother}_i, \text{parent\_edu\_father}_i)$ using vectorized operations.
    2.  **Binning (`_bin_phe_raw_to_phe`):** Maps the `PHE_raw` value (0-8) to the three-level `PHE` variable (0, 1, 2) based on the ISCED groupings defined in the `config`.
    3.  **Dummification (`_create_phe_dummies`):** Creates the two binary indicator columns, `p1` and `p2`, for use as regressors in OLS models, correctly omitting the reference category `PHE=0`.
*   **Outputs:** `Tuple[pd.DataFrame, Dict[str, Any]]` (the DataFrame augmented with the four new PHE-related columns, and a log).
*   **Research Role:** This callable operationalizes the core concept of "Parental environment" as defined by the authors. It transforms the raw parental education data into the specific endogenous variable (`PHE`) and regressors (`p1`, `p2`) whose effects are the central subject of the paper.

#### **Task 7: `construct_control_variables`**

*   **Inputs:** The DataFrame from Task 6, the reference category metadata from Task 5.
*   **Process:** This function is a large-scale feature engineering step. It systematically iterates through all remaining categorical control variables (student effort, parental investment, household resources, gender, etc.). For each, it calls a generic helper (`_create_dummies_from_categorical`) that creates a set of dummy variables, correctly omitting the reference category specified in the metadata dictionary.
*   **Outputs:** `Tuple[pd.DataFrame, List[str]]` (the fully-featured DataFrame, a list of all newly created dummy variable names).
*   **Research Role:** This callable constructs the full set of control variables (represented as $s_{ki}$ and $h_{ki}$ in the paper's equations). This is the implementation of the "ceteris paribus" condition, ensuring that the estimated effect of `PHE` is not confounded by a wide range of observable student and household characteristics.

#### **Task 8: `construct_instrumental_variable`**

*   **Inputs:** The DataFrame from Task 7, the validated `barro_lee_raw` DataFrame, `config: Dict[str, Any]`.
*   **Process:** Constructs the instrumental variable, `GG`.
    1.  **Parent Selection (`_select_instrument_country_of_birth`):** Implements the hierarchical logic to select the country of birth of the more-educated parent for each student.
    2.  **Merge (`_merge_with_instrument_data`):** Performs a `left` merge, joining the student data with the Barro-Lee data on the selected country code.
    3.  **Computation (`_compute_gender_gap_instrument`):** Implements the formula $GG_i = \text{share\_tertiary\_women\_1960}_i - \text{share\_tertiary\_men\_1960}_i$ on the merged data.
*   **Outputs:** `Tuple[pd.DataFrame, Dict[str, Any]]` (the final DataFrame now including the `GG` column, and a log).
*   **Research Role:** This callable implements the "Identification strategy" of the paper. It creates the instrumental variable that is the key to moving from correlation to causation by addressing the endogeneity of parental education.

#### **Task 9: `define_analysis_samples`**

*   **Inputs:** The final DataFrame from Task 8, the list of control variables from Task 7, `config: Dict[str, Any]`.
*   **Process:** This function is a data management workhorse. It programmatically generates all 33 distinct analysis samples required by the study. For each model specification (OLS, IV, pooled, interaction, etc.), it assembles the full list of required variables and then applies listwise deletion (`.dropna()`) to the master DataFrame to create a complete-case sample for that specific model.
*   **Outputs:** `Tuple[Dict[str, Any], Dict[str, Any]]` (a nested dictionary containing all 33 sample DataFrames, and a parallel dictionary logging their final sizes).
*   **Research Role:** This callable enforces the **complete-case analysis** assumption for every regression. By creating these distinct samples, it ensures that each model is estimated on the maximum number of valid observations available for its specific set of variables.

#### **Tasks 10 & 11: `specify_ols_models`, `specify_iv_models`**

*   **Inputs:** The list of control variables, `config: Dict[str, Any]`.
*   **Process:** These are pure metadata-generating functions. They do not process data. They translate the theoretical equations from the paper into machine-readable dictionaries that define every component of every model to be estimated (dependent variable, regressors, fixed effects, etc.).
*   **Outputs:** Nested dictionaries of model specifications.
*   **Research Role:** These callables are the **Architectural Blueprint** for the entire econometric analysis. They programmatically define every regression equation (Equations 1-5), ensuring that the models estimated in the subsequent tasks are exact, faithful replications of the paper's methodology.

#### **Task 12: `estimate_model`**

*   **Inputs:** An analysis sample DataFrame, a model specification dictionary.
*   **Process:** This is the **Estimation Engine**. It takes a sample and a specification, prepares the data for panel analysis by setting a `MultiIndex`, and then instantiates and fits the appropriate `linearmodels` estimator (`PanelOLS` or `IV2SLS`). It correctly configures the estimator to absorb high-dimensional fixed effects and to compute cluster-robust standard errors at the school level.
*   **Outputs:** A fitted model results object from `linearmodels`.
*   **Research Role:** This callable is the computational heart of the project. It executes the complex fixed-effects regressions with clustered standard errors that are central to the paper's empirical strategy. It implements the cluster-robust variance estimator: $\widehat{V}_{\text{cluster}} = (X'X)^{-1} \left( \sum_{g=1}^{G} X_g' \hat{u}_g \hat{u}_g' X_g \right) (X'X)^{-1}$.

#### **Tasks 13-20 (Orchestrators): `run_..._estimation`**

*   **Inputs:** The dictionaries of samples and specifications.
*   **Process:** Each of these orchestrators manages a specific block of estimations. They are essentially loops that iterate through subjects, grades, or grade-pairs, and for each iteration, they call `estimate_model` and then call a helper (`_extract_result_summary`) to parse the results object into a clean table.
*   **Outputs:** Dictionaries of formatted results tables (DataFrames).
*   **Research Role:** These callables execute the full empirical analysis, producing the numerical results for the key coefficients of interest: the persistent effects ($\alpha_j$, $\alpha_1$) and the Matthew effects ($\theta_j$, $\alpha_2$). They generate the content for Tables 8-17 in the paper.

#### **Tasks 21-23 (Orchestrators): `generate_..._plots`**

*   **Inputs:** The final dataset and the results dictionaries.
*   **Process:** These orchestrators manage the creation of all figures. They transform the data and results into a "tidy" format and then use `matplotlib` to generate and save the publication-quality CDF plots and coefficient plots.
*   **Outputs:** A list of file paths to the saved image files.
*   **Research Role:** These callables replicate the paper's key visual evidence (Figures 1-5, 8-10), providing intuitive visualizations of the main findings.

#### **Task 24: `perform_diagnostic_checks`**

*   **Inputs:** The cleansed DataFrame and various log files.
*   **Process:** This function performs final sanity checks. It checks for composition effects, re-validates score normalization, and synthesizes logs into a sample attrition table.
*   **Outputs:** A dictionary of diagnostic tables and warnings.
*   **Research Role:** This callable provides the empirical evidence for methodological claims, such as the absence of significant composition effects, thereby bolstering the study's internal validity.

#### **Task 25: `run_parental_environment_study_pipeline`**

*   **Inputs:** The two raw DataFrames (`student_parent_survey_raw`, `barro_lee_raw`) and the `config` dictionary.
*   **Process:** This function orchestrates the entire main analysis, from Task 1 through Task 24.
*   **Outputs:** A tuple containing the main results dictionary and a dictionary of key intermediate artifacts.
*   **Research Role:** This callable represents the complete, self-contained replication of the paper's primary analysis, packaged as a reusable component.

#### **Task 26: `run_robustness_checks`**

*   **Inputs:** The outputs from the main pipeline (Task 25).
*   **Process:** This function executes sensitivity analyses. It re-estimates models with fewer controls, systematically reviews all IV diagnostics, and formally validates the developmental patterns.
*   **Outputs:** A dictionary of robustness reports.
*   **Research Role:** This callable is the **Internal Peer Review Layer**. It stress-tests the main findings, providing crucial evidence on their stability and credibility.

#### **Task 27: `synthesize_final_report`**

*   **Inputs:** The main pipeline outputs.
*   **Process:** This function programmatically translates the final quantitative results into a human-readable, narrative summary of the findings and their policy implications.
*   **Outputs:** A single formatted string containing the final report.
*   **Research Role:** This callable replicates the "Discussion, related literature, and policy implications" section of the paper, translating the empirical evidence into a conclusive narrative.

#### **Top-Level Orchestrator: `execute_full_study`**

*   **Inputs:** The two raw DataFrames and the `config` dictionary.
*   **Process:** This is the ultimate entry point. It calls the main pipeline orchestrator (Task 25), then passes the outputs to the robustness check orchestrator (Task 26), and finally calls the report synthesis orchestrator (Task 27).
*   **Outputs:** A single, comprehensive dictionary containing all artifacts from the entire project.
*   **Research Role:** This callable represents the entire research project, from raw data to final narrative, as a single, atomic, and perfectly reproducible function. It is the embodiment of computational social science at its most rigorous.

<br><br>

### **Usage Examples**

### High-Fidelity Example: Executing the End-to-End Pipeline

This example demonstrates how to use the top-level orchestrator function, `execute_full_study`, to run the entire replication of Aymeric, Lavaine, and Magdalou (2025). It covers the three necessary inputs:
1.  **`student_parent_survey_raw`**: A synthetic but realistic DataFrame representing the primary student and parent survey data.
2.  **`barro_lee_raw`**: A synthetic DataFrame representing the historical data for the instrumental variable.
3.  **`config`**: The master configuration dictionary, loaded from the `config.yaml` file.

The example will proceed in three stages:
1.  **Setup:** Import necessary libraries and create the `config.yaml` file.
2.  **Data Generation:** Create high-fidelity synthetic versions of the two required DataFrames.
3.  **Execution:** Load the configuration, call the `execute_full_study` function, and briefly inspect the outputs.

#### **Stage 1: Setup**

First, we ensure all necessary libraries are imported. We will use `pyyaml` to load the configuration file and `faker` to help generate realistic synthetic data.

```python
# Import necessary libraries for data generation and file handling
import pandas as pd
import numpy as np
import yaml
from faker import Faker
import random

# Initialize the Faker generator for creating synthetic data
fake = Faker()
```

#### **Stage 2: Synthetic Data Generation**

Now, we will create realistic, synthetic versions of the two input DataFrames. This is a critical step for testing and demonstrating the pipeline without access to the original sensitive microdata. The data generation process is designed to mimic the structure and properties of the real data as described in the paper and the data dictionary.

##### **Input 1: `barro_lee_raw` DataFrame**

This DataFrame contains the historical data for our instrument. We will create a sample of countries with realistic 1960 tertiary education shares.

```python
# Define a list of sample countries with their ISO alpha-3 codes.
# This list includes a mix of developed and developing countries for realism.
country_list = [
    'USA', 'GBR', 'FRA', 'DEU', 'JPN', 'CAN', 'AUS', # Developed
    'ESP', 'ITA', 'PRT', 'GRC', # Southern Europe
    'BRA', 'ARG', 'COL', 'MEX', 'PER', # Latin America
    'NGA', 'ZAF', 'EGY', 'KEN', # Africa
    'CHN', 'IND', 'IDN', 'PAK', # Asia
    'CSK', 'SUN' # Legacy entities for testing
]

# Generate the synthetic Barro-Lee data.
barro_lee_data = []
for country in country_list:
    # Simulate lower female education rates, typical for 1960.
    share_men = random.uniform(0.01, 0.15)
    # Women's share is a fraction of men's, plus some noise.
    share_women = max(0, share_men * random.uniform(0.2, 0.9) - random.uniform(0, 0.02))
    
    is_legacy = country in ['CSK', 'SUN']
    notes = "Assignment rule: Successor states mapped here." if is_legacy else None
    
    barro_lee_data.append({
        'country_cob': country,
        'country_name_1960': fake.country() if not is_legacy else {'CSK': 'Czechoslovakia', 'SUN': 'USSR'}[country],
        'is_legacy_entity': is_legacy,
        'share_tertiary_women_1960': share_women,
        'share_tertiary_men_1960': share_men,
        'reference_year': 1960,
        'notes': notes
    })

# Create the DataFrame.
barro_lee_raw = pd.DataFrame(barro_lee_data)

print("Synthetic 'barro_lee_raw' DataFrame created:")
print(barro_lee_raw.head())
```

##### **Input 2: `student_parent_survey_raw` DataFrame**

This is the main dataset. We will generate a sample of 10,000 observations, ensuring the data structure and distributions are plausible.

```python
# Set parameters for data generation.
N_STUDENTS = 10000
N_SCHOOLS = 200
YEARS = [2016, 2017, 2018, 2019]
GRADES = [3, 6, 10]

# Generate the synthetic student and parent survey data.
survey_data = []
for i in range(N_STUDENTS):
    # Assign basic identifiers.
    student_id = 10000 + i
    school_id = random.randint(1, N_SCHOOLS)
    academic_year = random.choice(YEARS)
    grade = random.choice(GRADES)
    
    # Enforce the structural exclusion rule from the study design.
    if academic_year == 2016 and grade == 10:
        continue # Skip this invalid combination.

    # Simulate parental education with a realistic distribution (more higher-ed).
    parent_edu_mother = random.choices(range(9), weights=[2, 2, 5, 10, 15, 25, 20, 15, 6])[0]
    # Assortative mating: father's education is correlated with mother's.
    parent_edu_father = max(0, min(8, parent_edu_mother + random.randint(-2, 2)))
    
    # Higher parental education correlates with better outcomes.
    base_ability = (parent_edu_mother + parent_edu_father) / 16.0 # Normalize to [0, 1]
    
    # Generate scores around the mean of 500, influenced by base_ability.
    # The multiplier (e.g., 150) controls the strength of the parental effect.
    mu = 500 + 150 * (base_ability - 0.5)
    score_math = np.random.normal(mu, 100)
    score_lit = np.random.normal(mu, 100)
    score_eng = np.random.normal(mu + 20, 100) # Slightly higher mean for English

    # Simulate homework days.
    days_hw_raw = random.randint(0, 7)
    if days_hw_raw <= 1: days_hw_cat = 1
    elif days_hw_raw <= 3: days_hw_cat = 2
    elif days_hw_raw <= 5: days_hw_cat = 3
    else: days_hw_cat = 4
    
    survey_data.append({
        'student_id': student_id,
        'school_id': school_id,
        'academic_year': academic_year,
        'grade': grade,
        'parent_response_complete': random.choices([True, False], weights=[0.9, 0.1])[0],
        'gender': random.randint(1, 2),
        'student_cob': random.choices(['ESP', 'COL', 'MAR', 'ROU'], weights=[85, 5, 5, 5])[0],
        'Score_Math': score_math,
        'Score_Lit': score_lit,
        'Score_Eng': score_eng,
        'days_homework_raw': days_hw_raw,
        'days_homework_cat': days_hw_cat,
        'parent_edu_mother': parent_edu_mother,
        'parent_edu_father': parent_edu_father,
        'parent_cob_mother': random.choices(['ESP', 'COL', 'ECU', 'USA'], weights=[80, 8, 7, 5])[0],
        'parent_cob_father': random.choices(['ESP', 'COL', 'ECU', 'USA'], weights=[80, 8, 7, 5])[0],
        'employment_status_mother': random.randint(0, 3),
        'employment_status_father': random.randint(0, 3),
        'parent_talk_school': random.randint(1, 4),
        'parent_teach_homework': random.randint(1, 4),
        'parent_help_homework': random.randint(1, 4),
        'parent_check_homework': random.randint(1, 4),
        'books_at_home': random.randint(1, 5),
        'freq_use_books_home': random.randint(1, 4),
        'freq_use_computer_home': random.randint(1, 4),
        'freq_use_internet_home': random.randint(1, 4),
    })

# Create the DataFrame.
student_parent_survey_raw = pd.DataFrame(survey_data)

# Convert types to match the exact specification.
student_parent_survey_raw = student_parent_survey_raw.astype({
    'student_id': 'int64', 'school_id': 'int64', 'academic_year': 'int64',
    'grade': 'int64', 'parent_response_complete': 'bool', 'gender': 'int64',
    'days_homework_raw': 'float64', 'days_homework_cat': 'int64',
    'parent_edu_mother': 'int64', 'parent_edu_father': 'int64',
    'employment_status_mother': 'int64', 'employment_status_father': 'int64',
    'parent_talk_school': 'int64', 'parent_teach_homework': 'int64',
    'parent_help_homework': 'int64', 'parent_check_homework': 'int64',
    'books_at_home': 'int64', 'freq_use_books_home': 'int64',
    'freq_use_computer_home': 'int64', 'freq_use_internet_home': 'int64'
})


print("\nSynthetic 'student_parent_survey_raw' DataFrame created:")
print(student_parent_survey_raw.head())
print(f"\nTotal rows generated: {len(student_parent_survey_raw)}")
```

#### **Stage 3: Pipeline Execution and Output Inspection**

With the configuration file on disk and the two synthetic DataFrames in memory, we can now execute the entire pipeline with a single function call.

```python
# Load the configuration from the YAML file.
try:
    with open('config.yaml', 'r') as f:
        config = yaml.safe_load(f)
    print("\nConfiguration loaded successfully from config.yaml.")
except FileNotFoundError:
    print("\nERROR: config.yaml not found. Please run Stage 1 first.")
    config = None
except Exception as e:
    print(f"\nERROR: Failed to load or parse config.yaml: {e}")
    config = None

# Check if config was loaded before proceeding.
if config:
    # Execute the entire study pipeline using the top-level orchestrator.
    # All results, diagnostics, and figures will be generated.
    # The 'output_dir' will be created in the current directory.
    full_results = execute_full_study(
        student_parent_survey_raw=student_parent_survey_raw,
        barro_lee_raw=barro_lee_raw,
        config=config,
        output_dir="study_replication_output"
    )

    # --- Inspect the Outputs ---
    # The 'full_results' object is a deeply nested dictionary containing everything.
    # We can now inspect any part of the analysis.

    print("\n--- Example Output Inspection ---")
    
    # 1. Inspect the final summary report.
    print("\nFinal Summary Report:")
    print(full_results['final_summary_report'])

    # 2. Inspect a specific result table: Pooled IV for Mathematics.
    print("\nExample Result Table: Pooled IV for Score_Math")
    pooled_iv_math_results = full_results['main_analysis']['results']['pooled_iv']['Score_Math']
    print(pooled_iv_math_results)

    # 3. Inspect a diagnostic table: Sample inclusion flow.
    print("\nDiagnostic Table: Sample Inclusion Flow")
    sample_flow = full_results['main_analysis']['diagnostics']['sample_flow_table']
    print(sample_flow)
    
    # 4. Inspect a robustness check: OLS control sensitivity.
    print("\nRobustness Check: OLS Control Sensitivity")
    sensitivity_check = full_results['robustness_checks']['ols_control_sensitivity']
    print(sensitivity_check)
    
    # 5. List the generated figures.
    print("\nGenerated Figure Paths:")
    for fig_type, paths in full_results['main_analysis']['figures'].items():
        print(f"  - {fig_type}:")
        for path in paths:
            print(f"    - {path}")
```

This concludes the example. It demonstrates a complete, professional workflow: generating realistic test data, executing a complex, multi-stage analytical pipeline with a single command, and finally, programmatically accessing the structured outputs for review.

<br><br>

In [None]:
# Task 1: Validate configuration dictionary structure and parameter values

# ==============================================================================
# Task 1: Validate configuration dictionary structure and parameter values
# ==============================================================================

class ConfigurationError(ValueError):
    """
    Custom exception raised for errors during configuration validation.

    This exception is specifically used to signal that the main configuration
    dictionary, which governs the entire analysis pipeline, is malformed or
    contains invalid parameters. It is raised when issues such as missing keys,
    incorrect data types for values, or parameter values that violate the
    study's replication standards are detected.

    The primary purpose of this custom exception is to provide clear,
    actionable error messages that pinpoint the exact location and nature of
    the configuration problem, facilitating rapid debugging and correction.

    Inherits:
        ValueError: This class inherits from Python's built-in ValueError,
                    as configuration problems represent a specific case of
                    providing an argument (the config dictionary) with an
                    inappropriate or invalid value.
    """
    # The __init__ method is inherited from the base ValueError class.
    # It is designed to be instantiated with a descriptive string message
    # that details the specific configuration error encountered.
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        # This call initializes the base ValueError class with the provided
        # error message and any other arguments.
        super().__init__(*args, **kwargs)

# ------------------------------------------------------------------------------
# Task 1, Step 1: Validate the presence and type of top-level keys.
# ------------------------------------------------------------------------------

def _validate_config_structure(config: Dict[str, Any]) -> None:
    """
    Validates the structural integrity of the configuration dictionary.

    This function checks for the presence of all required top-level keys and
    verifies that their corresponding values are dictionaries, ensuring the
    expected nested structure for the pipeline's parameters.

    Args:
        config: The configuration dictionary for the study.

    Raises:
        ConfigurationError: If a required top-level key is missing or if its
                            value is not a dictionary.
    """
    # Define the set of all mandatory top-level keys for the configuration.
    required_top_level_keys: List[str] = [
        'data_sources', 'study_period', 'population_scope',
        'variable_definitions', 'psychometric_model',
        'parental_education_coding', 'instrument_construction',
        'effort_coding', 'parental_investment_coding',
        'books_at_home_coding', 'model_specifications',
        'inference_parameters', 'iv_validation', 'sample_policies',
        'reproducibility'
    ]

    # Iterate through each required key to ensure its presence and correct type.
    for key in required_top_level_keys:
        # Check if the key exists in the configuration dictionary.
        if key not in config:
            # If a key is missing, raise a specific error.
            raise ConfigurationError(f"Missing required top-level key in configuration: '{key}'")

        # Check if the value associated with the key is a dictionary.
        if not isinstance(config[key], dict):
            # If the type is incorrect, raise a specific error.
            raise ConfigurationError(
                f"Key '{key}' in configuration must be a dictionary, but found type "
                f"'{type(config[key]).__name__}'."
            )

# ------------------------------------------------------------------------------
# Task 1, Step 2: Validate study scope and period constraints.
# ------------------------------------------------------------------------------

def _validate_scope_and_period(config: Dict[str, Any]) -> None:
    """
    Validates critical parameters related to the study's scope and time period.

    This function ensures that the configuration aligns with the specific
    constraints of the replication study, such as the academic years, grades
    of interest, and sample inclusion criteria.

    Args:
        config: The configuration dictionary for the study.

    Raises:
        ConfigurationError: If any scope or period parameter does not match
                            the required value for the study replication.
    """
    # A dictionary mapping the expected path, value, and a descriptive error message.
    validations = {
        ('study_period', 'start_year'): 2016,
        ('study_period', 'end_year'): 2019,
        ('study_period', 'grade10_assessed_in_2016'): False,
        ('population_scope', 'grades_of_interest'): [3, 6, 10],
        ('population_scope', 'require_parent_response'): True,
    }

    # Iterate through the validation rules.
    for path, expected_value in validations.items():
        try:
            # Traverse the dictionary to get the actual value.
            actual_value = config
            for key in path:
                actual_value = actual_value[key]

            # Compare the actual value with the expected value.
            if actual_value != expected_value:
                # If they don't match, raise a detailed error.
                raise ConfigurationError(
                    f"Configuration validation failed for path '{' -> '.join(path)}'. "
                    f"Expected: {expected_value} (type: {type(expected_value).__name__}), "
                    f"Found: {actual_value} (type: {type(actual_value).__name__})."
                )
        except KeyError:
            # If any key in the path is missing, raise a detailed error.
            raise ConfigurationError(f"Missing required key path in configuration: {path}")

# ------------------------------------------------------------------------------
# Task 1, Step 3: Validate variable definitions and model specifications.
# ------------------------------------------------------------------------------

def _validate_variables_and_models(config: Dict[str, Any]) -> None:
    """
    Validates parameters defining variables and econometric models.

    This function checks key definitions for dependent/endogenous variables,
    fixed effects, interaction terms, and inference parameters to ensure
    that the subsequent modeling steps are correctly specified.

    Args:
        config: The configuration dictionary for the study.

    Raises:
        ConfigurationError: If any variable or model parameter does not match
                            the required value for the study replication.
    """
    # A dictionary mapping the expected path, value, and a descriptive error message.
    # Using a list of tuples to handle potential float comparisons.
    validations: List[Tuple[Tuple[str, ...], Any]] = [
        (('variable_definitions', 'dependent_variables'), ['Score_Math', 'Score_Lit', 'Score_Eng']),
        (('variable_definitions', 'endogenous_variable'), 'PHE'),
        (('variable_definitions', 'instrumental_variable'), 'GG'),
        (('model_specifications', 'fixed_effects'), ['school_id', 'academic_year']),
        (('model_specifications', 'grade_pair_definitions'), [(3, 6), (6, 10), (3, 10)]),
        (('inference_parameters', 'standard_error_clustering_level'), 'school_id'),
        (('iv_validation', 'weak_instrument_threshold_f_stat'), 10.0),
    ]

    # Iterate through the validation rules.
    for path, expected_value in validations:
        try:
            # Traverse the dictionary to get the actual value.
            actual_value = config
            for key in path:
                actual_value = actual_value[key]

            # Check for type and value equality, with special handling for floats.
            is_equal = False
            if isinstance(expected_value, float):
                # Use math.isclose for robust floating-point comparison.
                is_equal = isinstance(actual_value, (int, float)) and math.isclose(actual_value, expected_value)
            else:
                # Use direct comparison for other types (lists, strings, etc.).
                is_equal = actual_value == expected_value

            # If the values are not equal, raise a detailed error.
            if not is_equal:
                raise ConfigurationError(
                    f"Configuration validation failed for path '{' -> '.join(path)}'. "
                    f"Expected: {expected_value} (type: {type(expected_value).__name__}), "
                    f"Found: {actual_value} (type: {type(actual_value).__name__})."
                )
        except KeyError:
            # If any key in the path is missing, raise a detailed error.
            raise ConfigurationError(f"Missing required key path in configuration: {path}")

# ------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# ------------------------------------------------------------------------------

def validate_configuration(config: Dict[str, Any]) -> bool:
    """
    Orchestrates the validation of the entire study configuration dictionary.

    This function serves as the main entry point for configuration validation.
    It sequentially calls specialized validation functions to check the
    structure, scope, period, variable definitions, and model specifications.
    A successful execution of this function guarantees that the configuration
    is valid and sufficient for running the replication pipeline.

    Args:
        config: The complete configuration dictionary for the study.

    Returns:
        True if the configuration is valid in all aspects.

    Raises:
        ConfigurationError: If any part of the configuration is invalid,
                            with a specific message indicating the failure.
        TypeError: If the provided config is not a dictionary.
    """
    # --- Input Validation ---
    # Ensure the provided config object is a dictionary.
    if not isinstance(config, dict):
        raise TypeError(f"Configuration must be a dictionary, but got {type(config).__name__}.")

    # --- Step 1: Validate top-level structure ---
    # This ensures all primary sections of the config are present.
    _validate_config_structure(config=config)

    # --- Step 2: Validate study scope and period constraints ---
    # This confirms the study parameters match the replication target.
    _validate_scope_and_period(config=config)

    # --- Step 3: Validate variable definitions and model specifications ---
    # This ensures the econometric models will be specified correctly.
    _validate_variables_and_models(config=config)

    # If all validations pass without raising an exception, return True.
    return True


In [None]:
# Task 2: Validate student_parent_survey_raw DataFrame schema and content integrity

# =================================================================================
# Task 2: Validate student_parent_survey_raw DataFrame schema and content integrity
# =================================================================================

class SchemaValidationError(ValueError):
    """
    Custom exception raised for errors related to DataFrame schema violations.

    This exception is specifically used to signal that a pandas DataFrame does
    not conform to a predefined schema. It is typically raised when there are
    discrepancies in column names, data types, or the presence/absence of
    expected columns. Its error message is designed to be comprehensive,
    detailing all identified schema issues to facilitate rapid debugging.

    Inherits:
        ValueError: The standard Python exception for inappropriate value types,
                    making it a semantically appropriate base class for schema
                    validation failures.
    """
    # The __init__ method is inherited from the base ValueError class.
    # It can be called with a string argument detailing the specific schema error.
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        # Initialize the base ValueError class with the provided arguments.
        super().__init__(*args, **kwargs)


class DataIntegrityError(ValueError):
    """
    Custom exception raised for errors in data content, integrity, or consistency.

    This exception is used to indicate that while the data schema may be correct,
    the values within the DataFrame violate predefined business rules, domain
    constraints, or logical consistency checks. Examples include out-of-range
    values, violation of structural rules (e.g., impossible data combinations),
    or failure to meet normalization criteria.

    Inherits:
        ValueError: Chosen as the base class because data integrity failures
                    represent cases where the data's content is invalid for
                    the intended analytical purpose.
    """
    # The __init__ method is inherited from the base ValueError class.
    # It is intended to be instantiated with a detailed report of all
    # data integrity violations found.
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        # Initialize the base ValueError class with the provided arguments.
        super().__init__(*args, **kwargs)

# ------------------------------------------------------------------------------
# Task 2, Step 1: Verify presence, naming, and data types of all required columns.
# ------------------------------------------------------------------------------

def _validate_dataframe_schema(
    df: pd.DataFrame,
    expected_schema: Dict[str, str]
) -> None:
    """
    Validates the schema of a DataFrame against an expected structure.

    This function performs a rigorous check of column names and data types.
    It identifies missing columns, extraneous columns, and columns with
    incorrect data types, raising a detailed error if any discrepancies are found.

    Args:
        df: The pandas DataFrame to validate.
        expected_schema: A dictionary where keys are expected column names (str)
                         and values are the expected dtype names (str).

    Raises:
        SchemaValidationError: If the DataFrame's schema does not match the
                               expected schema.
        TypeError: If df is not a pandas DataFrame.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    if not isinstance(expected_schema, dict):
        raise TypeError("Input 'expected_schema' must be a dictionary.")

    # --- Schema Validation ---
    # Get the actual column names from the DataFrame.
    actual_columns: Set[str] = set(df.columns)
    # Get the expected column names from the schema definition.
    expected_columns: Set[str] = set(expected_schema.keys())

    # Identify missing and extra columns using set operations for efficiency.
    missing_columns: Set[str] = expected_columns - actual_columns
    extra_columns: Set[str] = actual_columns - expected_columns

    # Check for data type mismatches for columns that are present.
    type_mismatches: Dict[str, Tuple[str, str]] = {}
    for col_name, expected_type in expected_schema.items():
        if col_name in actual_columns:
            # Get the string name of the actual dtype.
            actual_type: str = df[col_name].dtype.name
            # Allow float for integer columns that might contain NaNs.
            if 'int' in expected_type and 'float' in actual_type:
                continue
            if actual_type != expected_type:
                type_mismatches[col_name] = (expected_type, actual_type)

    # If any validation errors were found, compile a detailed error message.
    if missing_columns or extra_columns or type_mismatches:
        error_messages: List[str] = ["DataFrame schema validation failed:"]
        if missing_columns:
            error_messages.append(f"  - Missing columns: {sorted(list(missing_columns))}")
        if extra_columns:
            error_messages.append(f"  - Extra columns found: {sorted(list(extra_columns))}")
        if type_mismatches:
            mismatches_str = [f"'{c}' (expected: {e}, found: {a})" for c, (e, a) in type_mismatches.items()]
            error_messages.append(f"  - Dtype mismatches: {', '.join(mismatches_str)}")

        # Raise a single, comprehensive error with all findings.
        raise SchemaValidationError("\n".join(error_messages))

# ------------------------------------------------------------------------------
# Task 2, Step 2: Validate domain constraints and structural rules.
# ------------------------------------------------------------------------------

def _validate_content_integrity(df: pd.DataFrame) -> None:
    """
    Validates the content integrity of the student survey DataFrame.

    This function checks for adherence to predefined domain constraints for
    categorical variables and enforces critical structural rules, such as the
    exclusion of Grade 10 data in the 2016 academic year. It collects all
    violations before raising an error.

    Args:
        df: The schema-validated student survey DataFrame.

    Raises:
        DataIntegrityError: If any content integrity checks fail, containing a
                            comprehensive report of all violations.
        TypeError: If df is not a pandas DataFrame.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # --- Content Validation ---
    # Dictionary to store any validation failures.
    violations: Dict[str, Any] = {}

    # Define domain constraints for various columns.
    domain_rules = {
        'grade': {3, 6, 10},
        'academic_year': {2016, 2017, 2018, 2019},
        'gender': {1, 2},
        'days_homework_cat': {1, 2, 3, 4},
        'parent_edu_mother': set(range(9)), # {0, 1, ..., 8}
        'parent_edu_father': set(range(9)),
        'parent_talk_school': {1, 2, 3, 4},
        'parent_teach_homework': {1, 2, 3, 4},
        'parent_help_homework': {1, 2, 3, 4},
        'parent_check_homework': {1, 2, 3, 4},
        'books_at_home': {1, 2, 3, 4, 5},
        'freq_use_books_home': {1, 2, 3, 4},
        'freq_use_computer_home': {1, 2, 3, 4},
        'freq_use_internet_home': {1, 2, 3, 4},
        'employment_status_mother': {0, 1, 2, 3},
        'employment_status_father': {0, 1, 2, 3},
    }

    # Check domain constraints for each specified column.
    for col, valid_set in domain_rules.items():
        # Isolate non-missing values for validation.
        series_to_check = df[col].dropna()
        # Find values that are not in the allowed set.
        invalid_mask = ~series_to_check.isin(valid_set)
        if invalid_mask.any():
            # If violations exist, record them.
            violating_values = series_to_check[invalid_mask].unique().tolist()
            violations[col] = {
                "rule": f"Values must be in {valid_set}",
                "count": int(invalid_mask.sum()),
                "sample_invalid_values": violating_values[:5]
            }

    # Check the structural rule: no Grade 10 data in 2016.
    structural_violation_mask = (df['grade'] == 10) & (df['academic_year'] == 2016)
    if structural_violation_mask.any():
        # If violations exist, record them.
        violations['structural_rule_grade10_2016'] = {
            "rule": "No records should have grade=10 and academic_year=2016",
            "count": int(structural_violation_mask.sum())
        }

    # If any violations were collected, raise a single, comprehensive error.
    if violations:
        error_message = "Data content integrity validation failed:"
        for key, details in violations.items():
            error_message += f"\n  - Check failed for '{key}':"
            error_message += f"\n    Rule: {details['rule']}"
            error_message += f"\n    Violation count: {details['count']}"
            if "sample_invalid_values" in details:
                error_message += f"\n    Sample invalid values: {details['sample_invalid_values']}"
        raise DataIntegrityError(error_message)

# ------------------------------------------------------------------------------
# Task 2, Step 3: Validate score normalization within (subject, grade, year) cells.
# ------------------------------------------------------------------------------

def _validate_score_normalization(
    df: pd.DataFrame,
    score_cols: List[str],
    mean_target: float,
    std_target: float,
    tolerance: float,
    min_sample_size: int
) -> None:
    """
    Validates the normalization of score columns within specified groups.

    This function checks if the mean and standard deviation of score columns
    are within a given tolerance of their target values for each
    (grade, academic_year) cell.

    Args:
        df: The schema- and content-validated student survey DataFrame.
        score_cols: A list of score column names to validate.
        mean_target: The expected mean of the scores (e.g., 500).
        std_target: The expected standard deviation of the scores (e.g., 100).
        tolerance: The acceptable deviation from the target values.
        min_sample_size: The minimum number of observations in a cell to
                         perform the validation check.

    Raises:
        DataIntegrityError: If any cell's statistics fall outside the tolerance.
        TypeError: If df is not a pandas DataFrame.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # --- Normalization Validation ---
    # Group the DataFrame by grade and academic year.
    grouped = df.groupby(['grade', 'academic_year'])

    # Dictionary to store any normalization failures.
    violations: List[str] = []

    # Iterate through each score column to validate its normalization.
    for score_col in score_cols:
        # Aggregate to get mean, std, and count for each group.
        stats = grouped[score_col].agg(['mean', 'std', 'count']).reset_index()

        # Filter out groups with insufficient sample size.
        stats_to_check = stats[stats['count'] >= min_sample_size]

        # Check for mean violations.
        mean_violations = stats_to_check[~np.isclose(stats_to_check['mean'], mean_target, atol=tolerance)]
        for _, row in mean_violations.iterrows():
            violations.append(
                f"Mean violation for '{score_col}' in (grade={row['grade']}, year={row['academic_year']}): "
                f"Expected ~{mean_target}, Found {row['mean']:.2f} (Count: {row['count']})"
            )

        # Check for standard deviation violations.
        std_violations = stats_to_check[~np.isclose(stats_to_check['std'], std_target, atol=tolerance)]
        for _, row in std_violations.iterrows():
            violations.append(
                f"Std Dev violation for '{score_col}' in (grade={row['grade']}, year={row['academic_year']}): "
                f"Expected ~{std_target}, Found {row['std']:.2f} (Count: {row['count']})"
            )

    # If any violations were found, raise a single, comprehensive error.
    if violations:
        error_message = "Score normalization validation failed:\n" + "\n".join(violations)
        raise DataIntegrityError(error_message)

# ------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# ------------------------------------------------------------------------------

def validate_student_survey_data(
    student_parent_survey_raw: pd.DataFrame,
    config: Dict[str, Any]
) -> bool:
    """
    Orchestrates the validation of the student_parent_survey_raw DataFrame.

    This function executes a sequence of validation steps to ensure the input
    DataFrame has the correct schema, adheres to all content integrity rules,
    and meets the score normalization criteria specified in the study design.

    Args:
        student_parent_survey_raw: The raw student survey DataFrame.
        config: The validated study configuration dictionary.

    Returns:
        True if all validation steps pass successfully.

    Raises:
        SchemaValidationError: If the DataFrame's schema is invalid.
        DataIntegrityError: If the DataFrame's content or normalization is invalid.
        TypeError: If input types are incorrect.
    """
    # --- Input Validation ---
    if not isinstance(student_parent_survey_raw, pd.DataFrame):
        raise TypeError("Input 'student_parent_survey_raw' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Validate DataFrame Schema ---
    # Define the expected schema based on the provided data dictionary.
    # Note: Nullable integer columns are represented as float64 by pandas.
    expected_schema = {
        'student_id': 'int64', 'school_id': 'int64', 'academic_year': 'int64',
        'grade': 'int64', 'parent_response_complete': 'bool', 'gender': 'int64',
        'student_cob': 'object', 'Score_Math': 'float64', 'Score_Lit': 'float64',
        'Score_Eng': 'float64', 'days_homework_raw': 'float64', # Nullable int
        'days_homework_cat': 'int64', 'parent_edu_mother': 'int64',
        'parent_edu_father': 'int64', 'parent_cob_mother': 'object',
        'parent_cob_father': 'object', 'employment_status_mother': 'int64',
        'employment_status_father': 'int64', 'parent_talk_school': 'int64',
        'parent_teach_homework': 'int64', 'parent_help_homework': 'int64',
        'parent_check_homework': 'int64', 'books_at_home': 'int64',
        'freq_use_books_home': 'int64', 'freq_use_computer_home': 'int64',
        'freq_use_internet_home': 'int64'
    }
    _validate_dataframe_schema(df=student_parent_survey_raw, expected_schema=expected_schema)

    # --- Step 2: Validate Content Integrity ---
    # This checks domain constraints and structural rules.
    _validate_content_integrity(df=student_parent_survey_raw)

    # --- Step 3: Validate Score Normalization ---
    # Extract parameters from the validated config.
    score_cols = config['variable_definitions']['dependent_variables']
    mean_target = config['psychometric_model']['standardization_mean']
    std_target = config['psychometric_model']['standardization_std_dev']

    _validate_score_normalization(
        df=student_parent_survey_raw,
        score_cols=score_cols,
        mean_target=float(mean_target),
        std_target=float(std_target),
        tolerance=3.0,  # As specified in the task discussion
        min_sample_size=30 # A reasonable threshold for stable stats
    )

    # If all validations pass, return True.
    return True


In [None]:
# Task 3: Validate barro_lee_raw DataFrame schema and instrument construction readiness

# =====================================================================================
# Task 3: Validate barro_lee_raw DataFrame schema and instrument construction readiness
# =====================================================================================

# ------------------------------------------------------------------------------
# Task 3, Step 1: Verify presence, naming, and data types of all required columns.
# ------------------------------------------------------------------------------

def _validate_barro_lee_schema(df: pd.DataFrame) -> None:
    """
    Validates the schema of the Barro-Lee DataFrame.

    This function ensures the DataFrame containing the instrumental variable data
    has the exact column structure and data types required for the analysis.
    It checks for column presence, names, and dtypes against a canonical schema.

    Args:
        df: The pandas DataFrame containing the Barro-Lee (1960) data.

    Raises:
        SchemaValidationError: If the DataFrame's schema does not match the
                               expected structure.
        TypeError: If df is not a pandas DataFrame.
    """
    # --- Input Validation ---
    # Ensure the input is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # --- Schema Definition ---
    # Define the canonical schema for the barro_lee_raw DataFrame.
    # The 'notes' column is often object type and can be nullable.
    expected_schema: Dict[str, str] = {
        'country_cob': 'object',
        'country_name_1960': 'object',
        'is_legacy_entity': 'bool',
        'share_tertiary_women_1960': 'float64',
        'share_tertiary_men_1960': 'float64',
        'reference_year': 'int64',
        'notes': 'object'
    }

    # --- Schema Validation ---
    # Re-use the robust, generic schema validation function defined in Task 2.
    # This promotes modularity and code reuse.
    _validate_dataframe_schema(df=df, expected_schema=expected_schema)

# ------------------------------------------------------------------------------
# Task 3, Step 2: Validate instrument data constraints and reference year.
# ------------------------------------------------------------------------------

def _validate_barro_lee_content(df: pd.DataFrame) -> None:
    """
    Validates the content integrity of the Barro-Lee DataFrame.

    This function performs critical checks on the data values to ensure they
    are consistent and valid for constructing the instrumental variable. It
    verifies the reference year, the valid range of proportion data, and the
    format of country codes.

    Args:
        df: The schema-validated Barro-Lee DataFrame.

    Raises:
        DataIntegrityError: If any content integrity checks fail, containing a
                            comprehensive report of all violations.
        TypeError: If df is not a pandas DataFrame.
    """
    # --- Input Validation ---
    # Ensure the input is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # --- Content Validation ---
    # Dictionary to store any validation failures for a comprehensive report.
    violations: Dict[str, Any] = {}

    # 1. Validate that all entries in 'reference_year' are exactly 1960.
    if not df['reference_year'].eq(1960).all():
        # Find and report the count of non-compliant rows.
        non_compliant_years = df.loc[~df['reference_year'].eq(1960), 'reference_year'].unique().tolist()
        violations['reference_year'] = {
            "rule": "All values must be exactly 1960.",
            "count": df['reference_year'].ne(1960).sum(),
            "sample_invalid_values": non_compliant_years
        }

    # 2. Validate that share variables are valid proportions [0.0, 1.0].
    for col in ['share_tertiary_women_1960', 'share_tertiary_men_1960']:
        # Isolate non-missing values for the range check.
        series_to_check = df[col].dropna()
        # Create a boolean mask for values outside the valid [0, 1] range.
        invalid_mask = (series_to_check < 0.0) | (series_to_check > 1.0)
        if invalid_mask.any():
            # If violations exist, record them.
            violating_values = series_to_check[invalid_mask].unique().tolist()
            violations[col] = {
                "rule": "Values must be valid proportions in the range [0.0, 1.0].",
                "count": int(invalid_mask.sum()),
                "sample_invalid_values": violating_values[:5]
            }

    # 3. Validate the format of 'country_cob' (ISO 3166-1 alpha-3).
    # Isolate non-missing country codes for format validation.
    cob_series = df['country_cob'].dropna()
    # Check for length not equal to 3 or non-alphabetic characters.
    invalid_format_mask = (cob_series.str.len() != 3) | (~cob_series.str.isalpha()) | (cob_series.str.upper() != cob_series)
    if invalid_format_mask.any():
        # If format violations exist, record them.
        violating_values = cob_series[invalid_format_mask].unique().tolist()
        violations['country_cob'] = {
            "rule": "Values must be 3-character, uppercase alphabetic ISO codes.",
            "count": int(invalid_format_mask.sum()),
            "sample_invalid_values": violating_values[:5]
        }

    # If any violations were collected, raise a single, comprehensive error.
    if violations:
        error_message = "Barro-Lee data content integrity validation failed:"
        for key, details in violations.items():
            error_message += f"\n  - Check failed for '{key}':"
            error_message += f"\n    Rule: {details['rule']}"
            error_message += f"\n    Violation count: {details['count']}"
            if "sample_invalid_values" in details:
                error_message += f"\n    Sample invalid values: {details['sample_invalid_values']}"
        raise DataIntegrityError(error_message)

# ------------------------------------------------------------------------------
# Task 3, Step 3: Validate crosswalk coverage and legacy entity handling.
# ------------------------------------------------------------------------------

def _validate_instrument_coverage(
    barro_lee_df: pd.DataFrame,
    student_survey_df: pd.DataFrame,
    config: Dict[str, Any]
) -> None:
    """
    Validates the coverage of the instrument data against the student data.

    This function checks which parent countries of birth from the student survey
    are present in the Barro-Lee dataset. It reports any coverage gaps and
    validates that a policy for handling them is defined in the configuration.
    It also checks that legacy entities are properly documented.

    Args:
        barro_lee_df: The schema- and content-validated Barro-Lee DataFrame.
        student_survey_df: The student survey DataFrame.
        config: The validated study configuration dictionary.

    Raises:
        DataIntegrityError: If there are unmapped countries and no valid policy
                            is defined, or if legacy entities lack notes.
        TypeError: If inputs are not of the expected types.
    """
    # --- Input Validation ---
    if not isinstance(barro_lee_df, pd.DataFrame):
        raise TypeError("Input 'barro_lee_df' must be a pandas DataFrame.")
    if not isinstance(student_survey_df, pd.DataFrame):
        raise TypeError("Input 'student_survey_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Coverage Validation ---
    # 1. Extract unique, non-null country codes from the Barro-Lee data.
    # Ensure codes are cleaned (uppercase, stripped) for fair comparison.
    available_codes: Set[str] = set(barro_lee_df['country_cob'].str.upper().str.strip().dropna().unique())

    # 2. Extract unique, non-null parent country codes from the student survey data.
    mother_cobs = student_survey_df['parent_cob_mother'].str.upper().str.strip().dropna()
    father_cobs = student_survey_df['parent_cob_father'].str.upper().str.strip().dropna()
    required_codes: Set[str] = set(pd.concat([mother_cobs, father_cobs]).unique())

    # 3. Compute the set of unmapped country codes.
    unmapped_codes: Set[str] = required_codes - available_codes

    # 4. Validate the policy for handling unmapped codes.
    if unmapped_codes:
        # Check if the configuration specifies a valid policy ('drop').
        policy = config.get('instrument_construction', {}).get('instrument_missing_policy')
        if policy != 'drop':
            raise DataIntegrityError(
                f"Found {len(unmapped_codes)} unmapped parent countries of birth, but the "
                f"policy in config['instrument_construction']['instrument_missing_policy'] "
                f"is '{policy}', not 'drop'. Unmapped codes: {sorted(list(unmapped_codes))}"
            )
        # If the policy is valid, this is a warning, not an error. The pipeline can proceed.
        # In a real-world scenario, this would be logged as a warning.
        print(f"INFO: Found {len(unmapped_codes)} parent COBs not in Barro-Lee data. "
              f"Observations from these countries will be dropped from IV analysis as per policy.")

    # 5. Validate handling of legacy entities.
    # Check that rows marked as legacy entities have non-null notes.
    legacy_rows = barro_lee_df[barro_lee_df['is_legacy_entity']]
    legacy_without_notes = legacy_rows[legacy_rows['notes'].isnull()]
    if not legacy_without_notes.empty:
        # If any legacy entities lack documentation, raise an error.
        countries_missing_notes = legacy_without_notes['country_name_1960'].tolist()
        raise DataIntegrityError(
            f"Found {len(countries_missing_notes)} legacy entities with missing crosswalk "
            f"notes: {countries_missing_notes}"
        )

# ------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# ------------------------------------------------------------------------------

def validate_barro_lee_data(
    barro_lee_raw: pd.DataFrame,
    student_parent_survey_raw: pd.DataFrame,
    config: Dict[str, Any]
) -> bool:
    """
    Orchestrates the validation of the Barro-Lee instrument DataFrame.

    This function serves as the main entry point for validating the external
    instrument data. It executes a sequence of checks for schema, content
    integrity, and coverage against the main student survey data, ensuring it
    is ready for use in the IV analysis.

    Args:
        barro_lee_raw: The raw pandas DataFrame with Barro-Lee (1960) data.
        student_parent_survey_raw: The raw student survey DataFrame, used for
                                   coverage checks.
        config: The validated study configuration dictionary.

    Returns:
        True if all validation steps pass successfully.

    Raises:
        SchemaValidationError: If the Barro-Lee DataFrame's schema is invalid.
        DataIntegrityError: If the content, integrity, or coverage is invalid.
        TypeError: If input types are incorrect.
    """
    # --- Input Validation ---
    # This is implicitly handled by the helper functions, but good practice.
    if not isinstance(barro_lee_raw, pd.DataFrame):
        raise TypeError("Input 'barro_lee_raw' must be a pandas DataFrame.")
    if not isinstance(student_parent_survey_raw, pd.DataFrame):
        raise TypeError("Input 'student_parent_survey_raw' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Validate DataFrame Schema ---
    # Ensures the structure of the instrument data is correct.
    _validate_barro_lee_schema(df=barro_lee_raw)

    # --- Step 2: Validate Content Integrity ---
    # Ensures the values within the instrument data are valid.
    _validate_barro_lee_content(df=barro_lee_raw)

    # --- Step 3: Validate Instrument Coverage and Legacy Handling ---
    # Ensures the instrument data can be successfully merged with the main data.
    _validate_instrument_coverage(
        barro_lee_df=barro_lee_raw,
        student_survey_df=student_parent_survey_raw,
        config=config
    )

    # If all validations pass without raising an exception, return True.
    return True


In [None]:
# Task 4: Cleanse student_parent_survey_raw by enforcing inclusion criteria and removing invalid records

# ==============================================================================
# Task 4: Cleanse student_parent_survey_raw by enforcing inclusion criteria
#         and removing invalid records
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 4, Step 1: Enforce primary sample inclusion criteria.
# ------------------------------------------------------------------------------

def _apply_inclusion_criteria(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Applies primary inclusion criteria to the student survey DataFrame.

    This function filters the DataFrame based on two core criteria defined in
    the configuration: the requirement for a complete parent response and the
    restriction to valid grade levels. It meticulously tracks and returns the
    number of records dropped at each stage.

    Args:
        df: The raw or partially processed student survey DataFrame.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The filtered DataFrame.
        - Dict[str, int]: A log detailing the sample size at each step.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Filtering and Logging ---
    # Initialize a log to track sample attrition.
    log = {'initial_rows': len(df)}

    # 1. Filter for records with a complete parent response.
    # This is a fundamental requirement for the analysis sample.
    require_parent_response = config['population_scope']['require_parent_response']
    if require_parent_response:
        df_filtered = df[df['parent_response_complete']].copy()
        log['after_parent_response_filter'] = len(df_filtered)
    else:
        df_filtered = df.copy()
        log['after_parent_response_filter'] = len(df_filtered) # No change

    # 2. Filter for records within the valid grade levels for the study.
    # This ensures the sample is restricted to the grades of interest.
    valid_grades = config['sample_policies']['valid_grades']
    df_filtered = df_filtered[df_filtered['grade'].isin(valid_grades)]
    log['after_grade_filter'] = len(df_filtered)

    return df_filtered, log

# ------------------------------------------------------------------------------
# Task 4, Step 2: Remove structurally invalid records and enforce exclusions.
# ------------------------------------------------------------------------------

def _remove_invalid_records(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Removes structurally invalid records and applies study-specific exclusions.

    This function cleans the DataFrame by removing records that are impossible
    by study design (e.g., Grade 10 in 2016) and records with missing critical
    identifiers (student, school, year). This revised implementation provides a
    highly granular log of records dropped due to missing identifiers.

    Args:
        df: The DataFrame after primary inclusion criteria have been applied.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The further cleansed DataFrame.
        - Dict[str, int]: A detailed log of the number of rows dropped at each
                          sub-step.
    """
    # --- Input Validation ---
    # Ensure the input df is a pandas DataFrame.
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    # Ensure the input config is a dictionary.
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Filtering and Logging ---
    # Record the number of rows before any filtering in this function.
    initial_rows = len(df)
    # Initialize a dictionary to log the counts of dropped rows.
    log: Dict[str, int] = {}

    # Create a working copy of the DataFrame to avoid modifying the original input.
    df_filtered = df.copy()

    # 1. Apply study-specific exclusions from the configuration.
    # This loop makes the exclusion logic generalizable to multiple rules.
    for exclusion in config['sample_policies']['exclude_year_grade']:
        # Create a boolean mask for rows that match the exclusion criteria.
        mask = (df_filtered['academic_year'] == exclusion['academic_year']) & \
               (df_filtered['grade'] == exclusion['grade'])
        # Apply the filter by keeping only the rows that do NOT match the mask.
        df_filtered = df_filtered[~mask]

    # Log the number of rows dropped by this specific exclusion rule.
    rows_after_exclusion = len(df_filtered)
    log['dropped_by_study_exclusion'] = initial_rows - rows_after_exclusion

    # 2. Sequentially remove records with missing critical identifiers.
    # This provides a granular audit trail of data attrition.
    id_cols = ['student_id', 'school_id', 'academic_year']

    # Keep track of the row count before starting the sequential check.
    rows_before_id_check = len(df_filtered)

    # Iterate through each identifier column to check for and remove nulls.
    for id_col in id_cols:
        # Record the number of rows before filtering on the current identifier.
        rows_before_current_check = len(df_filtered)
        # Create a mask for non-null values in the current identifier column.
        non_null_mask = df_filtered[id_col].notna()
        # Apply the filter.
        df_filtered = df_filtered[non_null_mask]
        # Count the number of rows dropped in this specific sub-step.
        rows_dropped_current = rows_before_current_check - len(df_filtered)
        # If any rows were dropped, log the count with a descriptive key.
        if rows_dropped_current > 0:
            log[f'dropped_missing_{id_col}'] = rows_dropped_current

    # Calculate the total number of rows dropped due to any missing identifier.
    rows_after_id_check = len(df_filtered)
    log['total_dropped_by_missing_identifiers'] = rows_before_id_check - rows_after_id_check

    # Return the fully cleansed DataFrame and the detailed log.
    return df_filtered, log

# ------------------------------------------------------------------------------
# Task 4, Step 3: Identify and resolve duplicate records.
# ------------------------------------------------------------------------------

def _resolve_duplicates(df: pd.DataFrame) -> Tuple[pd.DataFrame, int]:
    """
    Identifies and resolves duplicate records based on a composite key.

    This function ensures that each observation, defined by the unique key
    (student_id, academic_year, grade), is unique. It uses a deterministic
    strategy to resolve duplicates, prioritizing records with more complete
    data.

    Args:
        df: The DataFrame after filtering and exclusions.

    Returns:
        A tuple containing:
        - pd.DataFrame: The deduplicated DataFrame.
        - int: The number of duplicate rows that were dropped.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # --- Deduplication Logic ---
    # Define the composite key that should be unique for each observation.
    key_cols = ['student_id', 'academic_year', 'grade']

    # Find all rows that are part of any duplicate set.
    duplicates_mask = df.duplicated(subset=key_cols, keep=False)

    # If no duplicates exist, return the original DataFrame.
    if not duplicates_mask.any():
        return df, 0

    # Separate unique records from duplicate sets for processing.
    df_unique = df[~duplicates_mask]
    df_duplicates = df[duplicates_mask]

    # --- Deterministic Resolution Strategy ---
    # 1. Calculate the number of non-missing values for each duplicate row.
    df_duplicates['completeness'] = df_duplicates.notna().sum(axis=1)

    # 2. Sort duplicates to bring the "best" record to the top of each group.
    # Sort by completeness (descending), then by original index (ascending) for tie-breaking.
    df_sorted_duplicates = df_duplicates.sort_values(
        by=['student_id', 'academic_year', 'grade', 'completeness', df_duplicates.index],
        ascending=[True, True, True, False, True]
    )

    # 3. Keep only the first (i.e., the "best") record for each duplicate group.
    df_resolved = df_sorted_duplicates.drop_duplicates(subset=key_cols, keep='first')

    # Drop the temporary 'completeness' column.
    df_resolved = df_resolved.drop(columns=['completeness'])

    # 4. Combine the unique records with the resolved duplicate records.
    df_final = pd.concat([df_unique, df_resolved], ignore_index=True)

    # --- Final Assertion and Logging ---
    # Verify that the deduplication was successful.
    if df_final.duplicated(subset=key_cols).any():
        raise RuntimeError("Deduplication process failed. Duplicates still exist.")

    # Calculate the number of rows dropped.
    rows_dropped = len(df) - len(df_final)

    return df_final, rows_dropped

# ------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# ------------------------------------------------------------------------------

def cleanse_student_survey_data(
    student_parent_survey_raw: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the cleansing of the student survey DataFrame.

    This function applies a multi-step cleansing process to the raw data,
    enforcing inclusion criteria, removing invalid records, and resolving
    duplicates. It produces a clean DataFrame ready for feature engineering
    and a detailed log of the sample attrition at each step.

    Args:
        student_parent_survey_raw: The raw student survey DataFrame.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The fully cleansed DataFrame.
        - Dict[str, Any]: A comprehensive log of the cleansing process and
                          sample attrition.
    """
    # --- Input Validation ---
    if not isinstance(student_parent_survey_raw, pd.DataFrame):
        raise TypeError("Input 'student_parent_survey_raw' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Apply primary inclusion criteria ---
    # Filter based on parent response and valid grades.
    df_step1, log_step1 = _apply_inclusion_criteria(
        df=student_parent_survey_raw,
        config=config
    )

    # --- Step 2: Remove structurally invalid records ---
    # Apply study-specific exclusions and remove rows with missing IDs.
    df_step2, log_step2 = _remove_invalid_records(
        df=df_step1,
        config=config
    )

    # --- Step 3: Resolve duplicate records ---
    # Ensure each observation (student-year-grade) is unique.
    df_step3, dropped_duplicates = _resolve_duplicates(df=df_step2)

    # --- Compile Final Log ---
    # Combine logs from all steps into a single, comprehensive audit trail.
    final_log = {
        "initial_rows": log_step1['initial_rows'],
        "rows_after_parent_response_filter": log_step1['after_parent_response_filter'],
        "rows_after_grade_filter": log_step1['after_grade_filter'],
        "rows_after_exclusions_and_id_check": len(df_step2) - log_step2['dropped_by_missing_identifiers'],
        "final_rows_after_deduplication": len(df_step3),
        "summary_dropped": {
            "dropped_no_parent_response": log_step1['initial_rows'] - log_step1['after_parent_response_filter'],
            "dropped_invalid_grade": log_step1['after_parent_response_filter'] - log_step1['after_grade_filter'],
            "dropped_by_study_exclusion": log_step2['dropped_by_study_exclusion'],
            "dropped_by_missing_identifiers": log_step2['dropped_by_missing_identifiers'],
            "dropped_as_duplicates": dropped_duplicates
        }
    }

    # Return the final cleansed DataFrame and the detailed log.
    return df_step3, final_log


In [None]:
# Task 5: Harmonize and standardize categorical codings and country codes

# ==============================================================================
# Task 5: Harmonize and standardize categorical codings and country codes
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 5, Step 1: Standardize ISO country codes for student and parents.
# ------------------------------------------------------------------------------

def _standardize_country_codes(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Standardizes and validates ISO 3166-1 alpha-3 country codes.

    This function processes specified country code columns to ensure they conform
    to a standard format: 3-character, uppercase, alphabetic strings. It cleans
    the data by stripping whitespace and converting to uppercase. Any codes that
    do not meet the format requirements after cleaning are set to NaN.

    Args:
        df: The DataFrame containing country code columns to be standardized.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame with standardized country code columns.
        - Dict[str, int]: A log detailing the number of invalid codes nullified
                          for each column.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # --- Standardization and Validation ---
    # Define the columns containing country codes that require standardization.
    cob_cols = ['student_cob', 'parent_cob_mother', 'parent_cob_father']
    # Initialize a log to track the number of nullified invalid entries.
    log = {f"nullified_{col}": 0 for col in cob_cols}

    # Create a copy to avoid SettingWithCopyWarning.
    df_processed = df.copy()

    # Iterate through each country code column to apply standardization.
    for col in cob_cols:
        # Ensure the column exists before processing.
        if col not in df_processed.columns:
            raise SchemaValidationError(f"Missing expected country code column: '{col}'")

        # Use the .str accessor for robust, NaN-safe string operations.
        # 1. Convert to uppercase and strip leading/trailing whitespace.
        cleaned_series = df_processed[col].str.upper().str.strip()

        # 2. Identify invalid formats (not 3 chars, not alphabetic) among non-null entries.
        # This mask will be True for any value that is correctly formatted.
        valid_mask = (
            (cleaned_series.str.len() == 3) &
            (cleaned_series.str.isalpha())
        )

        # The invalid mask applies to entries that are not null AND not valid.
        invalid_mask = cleaned_series.notna() & ~valid_mask

        # If any invalid entries are found, log the count and nullify them.
        if invalid_mask.any():
            log[f"nullified_{col}"] = int(invalid_mask.sum())
            # Use .loc to safely modify the DataFrame slice.
            df_processed.loc[invalid_mask, col] = np.nan
        else:
            # If no invalid entries, update the column with the cleaned version.
            df_processed[col] = cleaned_series

    return df_processed, log

# ------------------------------------------------------------------------------
# Task 5, Step 2: Validate and clip/nullify numeric categorical variables.
# ------------------------------------------------------------------------------

def _validate_numeric_categoricals(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Validates and cleans numeric categorical variables based on defined domains.

    This function enforces domain integrity for various numeric columns.
    Depending on the variable, values outside the valid domain are either
    clipped to the nearest boundary or set to NaN.

    Args:
        df: The DataFrame with numeric categorical columns to validate.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame with cleaned numeric categorical columns.
        - Dict[str, int]: A log detailing the number of values clipped or
                          nullified for each column.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # --- Domain Rules Definition ---
    # Define rules for each variable: domain and action ('clip' or 'nullify').
    rules = {
        'days_homework_raw': {'domain': (0, 7), 'action': 'clip'},
        'days_homework_cat': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'parent_edu_mother': {'domain': set(range(9)), 'action': 'nullify'},
        'parent_edu_father': {'domain': set(range(9)), 'action': 'nullify'},
        'parent_talk_school': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'parent_teach_homework': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'parent_help_homework': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'parent_check_homework': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'freq_use_books_home': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'freq_use_computer_home': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'freq_use_internet_home': {'domain': {1, 2, 3, 4}, 'action': 'nullify'},
        'books_at_home': {'domain': {1, 2, 3, 4, 5}, 'action': 'nullify'},
    }

    # --- Validation and Cleaning ---
    # Initialize a log for actions taken.
    log = {}
    # Create a copy to ensure modifications are safe.
    df_processed = df.copy()

    # Iterate through the rules and apply them.
    for col, rule in rules.items():
        if col not in df_processed.columns:
            raise SchemaValidationError(f"Missing expected numeric categorical column: '{col}'")

        original_series = df_processed[col].copy()

        if rule['action'] == 'clip':
            # Apply clipping for range-bound variables.
            lower, upper = rule['domain']
            df_processed[col] = df_processed[col].clip(lower, upper)
            # Count how many values were changed.
            num_clipped = (original_series != df_processed[col]).sum()
            if num_clipped > 0:
                log[f"clipped_{col}"] = int(num_clipped)

        elif rule['action'] == 'nullify':
            # Nullify values outside the valid set for discrete variables.
            valid_set = rule['domain']
            invalid_mask = ~df_processed[col].isin(valid_set) & df_processed[col].notna()
            if invalid_mask.any():
                num_nullified = int(invalid_mask.sum())
                log[f"nullified_{col}"] = num_nullified
                df_processed.loc[invalid_mask, col] = np.nan

    return df_processed, log

# ------------------------------------------------------------------------------
# Task 5, Step 3: Document reference categories for all categorical variables.
# ------------------------------------------------------------------------------

def _document_reference_categories(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extracts and documents reference categories for modeling.

    This function creates a metadata dictionary that serves as a single source
    of truth for the reference (omitted) category of each categorical variable
    used in the regression models. This ensures consistency in dummy variable
    creation and model interpretation.

    Args:
        config: The validated study configuration dictionary.

    Returns:
        A dictionary mapping variable names to their reference categories.

    Raises:
        ConfigurationError: If a required reference category is not defined
                            in the configuration.
    """
    # --- Input Validation ---
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Metadata Extraction ---
    # Initialize the metadata dictionary.
    reference_categories = {}

    # Define paths in the config to find reference category definitions.
    paths = {
        'days_homework_cat': ('effort_coding', 'reference_category'),
        'parent_talk_school': ('parental_investment_coding', 'reference_category'),
        'parent_teach_homework': ('parental_investment_coding', 'reference_category'),
        'parent_help_homework': ('parental_investment_coding', 'reference_category'),
        'parent_check_homework': ('parental_investment_coding', 'reference_category'),
        'books_at_home': ('books_at_home_coding', 'reference_category'),
        'freq_use_books_home': ('parental_investment_coding', 'reference_category'), # Assumes same scale
        'freq_use_computer_home': ('parental_investment_coding', 'reference_category'),
        'freq_use_internet_home': ('parental_investment_coding', 'reference_category'),
    }

    # Extract reference categories from the config using the defined paths.
    for var, path in paths.items():
        try:
            ref_cat = config[path[0]][path[1]]
            reference_categories[var] = ref_cat
        except KeyError:
            raise ConfigurationError(f"Missing reference category definition in config for path: {path}")

    # Manually add documented reference categories not in paths.
    # For gender, we consistently use Female (coded as 1) as the reference.
    reference_categories['gender'] = 1
    # For employment status, we use Not employed/Inactive (coded as 0) as the reference.
    reference_categories['employment_status_mother'] = 0
    reference_categories['employment_status_father'] = 0

    return reference_categories

# ------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# ------------------------------------------------------------------------------

def harmonize_and_standardize_data(
    cleansed_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any], Dict[str, Any]]:
    """
    Orchestrates the harmonization and standardization of categorical data.

    This function executes a sequence of cleaning and documentation steps:
    1. Standardizes country codes to a consistent format.
    2. Validates and cleans numeric categorical variables based on their domains.
    3. Extracts and documents reference categories for all model variables.

    Args:
        cleansed_df: The cleansed DataFrame from the previous task.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The harmonized and standardized DataFrame.
        - Dict[str, Any]: A log detailing all cleaning actions taken.
        - Dict[str, Any]: A metadata dictionary of reference categories.
    """
    # --- Input Validation ---
    if not isinstance(cleansed_df, pd.DataFrame):
        raise TypeError("Input 'cleansed_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Standardize ISO country codes ---
    # Ensures country codes are clean and consistently formatted for merging.
    df_step1, log_step1 = _standardize_country_codes(df=cleansed_df)

    # --- Step 2: Validate and clean numeric categoricals ---
    # Enforces domain integrity for survey response variables.
    df_step2, log_step2 = _validate_numeric_categoricals(df=df_step1)

    # --- Step 3: Document reference categories ---
    # Creates an authoritative source for model reference levels.
    reference_categories_metadata = _document_reference_categories(config=config)

    # --- Compile Final Log ---
    # Combine logs from all steps into a single, comprehensive audit trail.
    final_log = {
        "country_code_standardization": log_step1,
        "numeric_categorical_validation": log_step2
    }

    # Return the processed DataFrame, the log, and the metadata.
    return df_step2, final_log, reference_categories_metadata


In [None]:
# Task 6: Construct the parental highest education variable (PHE) and associated dummies

# ==============================================================================
# Task 6: Construct the parental highest education variable (PHE) and
#         associated dummies
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 6, Step 1: Compute raw parental highest education (PHE_raw).
# ------------------------------------------------------------------------------

def _compute_phe_raw(df: pd.DataFrame) -> Tuple[pd.DataFrame, int]:
    """
    Computes the raw parental highest education level (PHE_raw).

    This function determines the highest ISCED level of education between the
    two parents for each student. The result is stored in a new column, 'PHE_raw'.
    The logic correctly handles missing values for one or both parents.

    Equation implemented: PHE_raw_i = max(parent_edu_mother_i, parent_edu_father_i)

    Args:
        df: The harmonized student survey DataFrame.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame augmented with the 'PHE_raw' column.
        - int: The number of rows where 'PHE_raw' could not be computed
               (i.e., is NaN).
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    required_cols = ['parent_edu_mother', 'parent_edu_father']
    if not all(col in df.columns for col in required_cols):
        raise SchemaValidationError(f"Missing one or more required columns: {required_cols}")

    # --- Computation ---
    # Create a copy to avoid modifying the original DataFrame in place.
    df_processed = df.copy()

    # Compute the element-wise maximum of the two parent education columns.
    # The .max(axis=1) method on a DataFrame selection is NaN-aware:
    # - max(5, 3) -> 5
    # - max(5, NaN) -> 5
    # - max(NaN, NaN) -> NaN
    # This behavior is exactly what is required for the logic.
    df_processed['PHE_raw'] = df_processed[required_cols].max(axis=1)

    # --- Logging ---
    # Count the number of observations where PHE_raw is missing.
    # This occurs if and only if both parent education levels are missing.
    num_missing_phe_raw = df_processed['PHE_raw'].isna().sum()

    return df_processed, int(num_missing_phe_raw)

# ------------------------------------------------------------------------------
# Task 6, Step 2: Bin PHE_raw into three ISCED-level groups (PHE).
# ------------------------------------------------------------------------------

def _bin_phe_raw_to_phe(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.Series]:
    """
    Bins the raw parental education level into three analysis categories (PHE).

    This function maps the continuous 'PHE_raw' variable (ISCED levels 0-8)
    into the three-level categorical variable 'PHE' (0, 1, 2) as defined in
    the study's configuration.

    Args:
        df: The DataFrame containing the 'PHE_raw' column.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame augmented with the 'PHE' column.
        - pd.Series: The value counts of the newly created 'PHE' column.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    if 'PHE_raw' not in df.columns:
        raise SchemaValidationError("Missing required input column: 'PHE_raw'")

    # --- Binning Logic ---
    # Create a copy for safe modification.
    df_processed = df.copy()

    # Extract the binning definitions from the configuration.
    phe_coding = config['parental_education_coding']
    level_0_bins = phe_coding['level_0_raw']
    level_1_bins = phe_coding['level_1_raw']
    level_2_bins = phe_coding['level_2_raw']

    # Define the conditions for each category based on the config.
    conditions = [
        df_processed['PHE_raw'].isin(level_0_bins),
        df_processed['PHE_raw'].isin(level_1_bins),
        df_processed['PHE_raw'].isin(level_2_bins),
    ]
    # Define the corresponding values for each condition.
    choices = [0, 1, 2]

    # Use np.select for efficient, vectorized conditional assignment.
    # The default value is np.nan, which correctly handles missing PHE_raw.
    df_processed['PHE'] = np.select(conditions, choices, default=np.nan)

    # --- Final Validation and Logging ---
    # Assert that all non-null PHE_raw values were successfully binned.
    unmapped_mask = df_processed['PHE_raw'].notna() & df_processed['PHE'].isna()
    if unmapped_mask.any():
        unmapped_values = df_processed.loc[unmapped_mask, 'PHE_raw'].unique()
        raise DataIntegrityError(
            f"PHE binning failed. The following 'PHE_raw' values were not mapped "
            f"to a category: {unmapped_values}"
        )

    # Get the distribution of the new 'PHE' variable for logging.
    phe_distribution = df_processed['PHE'].value_counts().sort_index()

    return df_processed, phe_distribution

# ------------------------------------------------------------------------------
# Task 6, Step 3: Create dummy variables for PHE.
# ------------------------------------------------------------------------------

def _create_phe_dummies(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Creates dummy variables for the categorical PHE variable.

    This function generates two binary indicator variables, 'p1' and 'p2',
    representing PHE levels 1 and 2, respectively. The base/reference
    category is PHE level 0. These dummies are essential for the OLS model
    specifications.

    Args:
        df: The DataFrame containing the 'PHE' column.
        config: The validated study configuration dictionary.

    Returns:
        The DataFrame augmented with the 'p1' and 'p2' dummy columns.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    if 'PHE' not in df.columns:
        raise SchemaValidationError("Missing required input column: 'PHE'")

    # --- Dummy Creation ---
    # Create a copy for safe modification.
    df_processed = df.copy()

    # Extract the specified names for the dummy variables from the config.
    dummy_names = config['variable_definitions']['phe_dummy_names']
    if len(dummy_names) != 2:
        raise ConfigurationError("Expected two names in 'phe_dummy_names'.")
    p1_name, p2_name = dummy_names

    # Create the dummy for PHE level 1.
    # The result of the boolean comparison is cast to an integer (0 or 1).
    df_processed[p1_name] = (df_processed['PHE'] == 1).astype(float)

    # Create the dummy for PHE level 2.
    df_processed[p2_name] = (df_processed['PHE'] == 2).astype(float)

    # Explicitly set dummies to NaN where the original PHE is NaN.
    # This ensures correct handling during listwise deletion in modeling.
    phe_is_na = df_processed['PHE'].isna()
    df_processed.loc[phe_is_na, [p1_name, p2_name]] = np.nan

    # --- Final Validation ---
    # Check that for any row, at most one dummy is active.
    if (df_processed[p1_name] + df_processed[p2_name] > 1).any():
        raise DataIntegrityError("PHE dummy creation failed: a row has multiple dummies active.")

    return df_processed

# ------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# ------------------------------------------------------------------------------

def construct_parental_education_variable(
    harmonized_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the construction of the Parental Highest Education (PHE) variable.

    This function executes the full feature engineering pipeline for the study's
    primary endogenous variable. It computes the raw maximum education level,
    bins it into the three analytical categories, and creates the corresponding
    dummy variables for regression analysis.

    Args:
        harmonized_df: The harmonized and standardized DataFrame from Task 5.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame augmented with 'PHE_raw', 'PHE', 'p1',
                        and 'p2' columns.
        - Dict[str, Any]: A log detailing the construction process, including
                          missing value counts and distributional summaries.
    """
    # --- Input Validation ---
    if not isinstance(harmonized_df, pd.DataFrame):
        raise TypeError("Input 'harmonized_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Compute raw parental highest education (PHE_raw) ---
    # This determines the maximum ISCED level between both parents.
    df_step1, num_missing_phe_raw = _compute_phe_raw(df=harmonized_df)

    # --- Step 2: Bin PHE_raw into the three 'PHE' categories ---
    # This maps the raw ISCED levels to the 0, 1, 2 analytical groups.
    df_step2, phe_distribution = _bin_phe_raw_to_phe(df=df_step1, config=config)

    # --- Step 3: Create dummy variables for 'PHE' ---
    # This generates the 'p1' and 'p2' regressors for the OLS models.
    df_final = _create_phe_dummies(df=df_step2, config=config)

    # --- Compile Final Log ---
    # Create a comprehensive log of the entire feature construction process.
    final_log = {
        "missing_phe_raw_count": num_missing_phe_raw,
        "phe_category_distribution": phe_distribution.to_dict()
    }

    # Return the final DataFrame and the detailed log.
    return df_final, final_log


In [None]:
# Task 7: Construct student effort and parental investment control variables

# ==============================================================================
# Task 7: Construct student effort and parental investment control variables
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 7, Generic Helper Function for Dummy Variable Creation
# ------------------------------------------------------------------------------

def _create_dummies_from_categorical(
    df: pd.DataFrame,
    column_name: str,
    reference_category: Any
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Creates dummy variables for a single categorical column.

    This generic helper function takes a categorical column, identifies its unique
    non-null values, and creates a binary indicator (dummy) column for each
    category except for the specified reference category. The new columns are
    named systematically as '{column_name}_{category}'.

    Args:
        df: The DataFrame to be modified.
        column_name: The name of the source categorical column.
        reference_category: The category to be omitted (the base level).

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame augmented with the new dummy columns.
        - List[str]: A list of the names of the newly created dummy columns.
    """
    # --- Input Validation ---
    if column_name not in df.columns:
        raise SchemaValidationError(f"Source column '{column_name}' not found in DataFrame.")

    # --- Dummy Creation Logic ---
    df_processed = df.copy()
    created_dummies = []

    # Find all unique categories in the column, excluding NaNs.
    categories = df_processed[column_name].dropna().unique()

    # Iterate over each category to create a corresponding dummy variable.
    for category in categories:
        # Skip the creation of a dummy for the reference category.
        if category == reference_category:
            continue

        # Define a systematic and descriptive name for the new dummy column.
        dummy_name = f"{column_name}_{int(category)}"

        # Create the binary indicator column.
        df_processed[dummy_name] = (df_processed[column_name] == category).astype(float)

        # Ensure NaNs in the source propagate to the dummy variables.
        df_processed.loc[df_processed[column_name].isna(), dummy_name] = np.nan

        # Add the new dummy's name to the list of created columns.
        created_dummies.append(dummy_name)

    return df_processed, created_dummies

# ------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# ------------------------------------------------------------------------------

def construct_control_variables(
    df_with_phe: pd.DataFrame,
    reference_categories: Dict[str, Any]
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Orchestrates the construction of all control variables for regression models.

    This function systematically converts all categorical student and household
    characteristics into dummy variables, preparing the DataFrame for the
    modeling stage. It uses a generic helper to ensure consistent and accurate
    dummy creation based on the provided reference category metadata.

    Args:
        df_with_phe: The DataFrame from Task 6, containing the PHE variables.
        reference_categories: The metadata dictionary from Task 5, specifying
                              the reference category for each variable.

    Returns:
        A tuple containing:
        - pd.DataFrame: The fully-featured DataFrame ready for modeling.
        - List[str]: A comprehensive list of all newly created control
                     variable names (the dummy variables).
    """
    # --- Input Validation ---
    if not isinstance(df_with_phe, pd.DataFrame):
        raise TypeError("Input 'df_with_phe' must be a pandas DataFrame.")
    if not isinstance(reference_categories, dict):
        raise TypeError("Input 'reference_categories' must be a dictionary.")

    # --- Feature Engineering ---
    df_processed = df_with_phe.copy()
    all_created_dummies = []

    # --- Step 1 & 2 & 3 (Combined): Create dummies for all multi-category variables ---
    # Define the list of all categorical variables that require dummification.
    # This list is the single source of truth for this operation.
    vars_to_dummify = [
        'days_homework_cat',
        'parent_talk_school', 'parent_teach_homework', 'parent_help_homework', 'parent_check_homework',
        'freq_use_books_home', 'freq_use_computer_home', 'freq_use_internet_home',
        'books_at_home',
        'employment_status_mother', 'employment_status_father'
    ]

    # Loop through the list and apply the generic dummy creation function.
    for var in vars_to_dummify:
        if var not in reference_categories:
            raise ConfigurationError(f"Reference category for '{var}' not found in metadata.")

        # Call the helper to create dummies for the current variable.
        df_processed, new_dummies = _create_dummies_from_categorical(
            df=df_processed,
            column_name=var,
            reference_category=reference_categories[var]
        )
        # Aggregate the names of the newly created dummies.
        all_created_dummies.extend(new_dummies)

    # --- Step 3 (Continued): Create binary indicator variables ---

    # 1. Gender dummy (Male indicator).
    # Reference category is Female (coded as 1).
    gender_ref = reference_categories.get('gender')
    if gender_ref is None:
        raise ConfigurationError("Reference category for 'gender' not found.")

    # Create a dummy for the non-reference gender category (Male, coded as 2).
    df_processed['gender_male'] = (df_processed['gender'] != gender_ref).astype(float)
    df_processed.loc[df_processed['gender'].isna(), 'gender_male'] = np.nan
    all_created_dummies.append('gender_male')

    # 2. Foreign-born status indicators.
    # The reference is being born in Spain ('ESP').
    # Student foreign-born indicator.
    df_processed['student_foreign_born'] = (df_processed['student_cob'] != 'ESP').astype(float)
    df_processed.loc[df_processed['student_cob'].isna(), 'student_foreign_born'] = np.nan
    all_created_dummies.append('student_foreign_born')

    # Mother foreign-born indicator.
    df_processed['mother_foreign_born'] = (df_processed['parent_cob_mother'] != 'ESP').astype(float)
    df_processed.loc[df_processed['parent_cob_mother'].isna(), 'mother_foreign_born'] = np.nan
    all_created_dummies.append('mother_foreign_born')

    # Father foreign-born indicator.
    df_processed['father_foreign_born'] = (df_processed['parent_cob_father'] != 'ESP').astype(float)
    df_processed.loc[df_processed['parent_cob_father'].isna(), 'father_foreign_born'] = np.nan
    all_created_dummies.append('father_foreign_born')

    # Return the final DataFrame and the list of all new control variables.
    return df_processed, all_created_dummies


In [None]:
# Task 8: Construct the instrumental variable (GG) by selecting the more-educated parent's country and merging with Barro-Lee

# ==============================================================================
# Task 8: Construct the instrumental variable (GG) by selecting the
#         more-educated parent's country and merging with Barro-Lee
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 8, Step 1: Identify the more-educated parent's country of birth.
# ------------------------------------------------------------------------------

def _select_instrument_country_of_birth(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, int]:
    """
    Identifies and selects the country of birth of the more-educated parent.

    This function implements the logic to determine which parent's country of
    birth to use for the instrumental variable merge. The selection is based on
    the parent with the higher ISCED education level, with a deterministic
    tie-breaking rule specified in the configuration.

    Args:
        df: The feature-engineered student survey DataFrame.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame augmented with the column
                        'more_educated_parent_cob'.
        - int: The number of rows where the instrument country could not be
               determined.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    required_cols = [
        'parent_edu_mother', 'parent_edu_father',
        'parent_cob_mother', 'parent_cob_father'
    ]
    if not all(col in df.columns for col in required_cols):
        raise SchemaValidationError(f"Missing one or more required columns: {required_cols}")

    # --- Selection Logic ---
    df_processed = df.copy()

    # Define boolean conditions for parent education comparison.
    # These masks handle NaN values correctly (comparisons with NaN yield False).
    cond_mother_gt = df_processed['parent_edu_mother'] > df_processed['parent_edu_father']
    cond_father_gt = df_processed['parent_edu_father'] > df_processed['parent_edu_mother']
    cond_equal = df_processed['parent_edu_mother'] == df_processed['parent_edu_father']

    # Handle cases where only one parent's education is available.
    cond_mother_only = df_processed['parent_edu_mother'].notna() & df_processed['parent_edu_father'].isna()
    cond_father_only = df_processed['parent_edu_father'].notna() & df_processed['parent_edu_mother'].isna()

    # Get the tie-breaking rule from the configuration.
    tie_break_rule = config['parental_education_coding']['tie_break_rule']
    if tie_break_rule == "mother_if_equal":
        # Define the sequence of conditions for np.select.
        conditions = [
            cond_mother_gt,
            cond_father_gt,
            cond_equal,
            cond_mother_only,
            cond_father_only
        ]
        # Define the corresponding choices (which parent's COB to select).
        choices = [
            df_processed['parent_cob_mother'],
            df_processed['parent_cob_father'],
            df_processed['parent_cob_mother'], # Tie-break rule
            df_processed['parent_cob_mother'],
            df_processed['parent_cob_father']
        ]
    else:
        # Future-proofing for other potential tie-break rules.
        raise NotImplementedError(f"Tie-break rule '{tie_break_rule}' is not implemented.")

    # Use np.select for efficient, vectorized conditional assignment.
    # The default is np.nan, which is assigned if no condition is met.
    df_processed['more_educated_parent_cob'] = np.select(conditions, choices, default=np.nan)

    # --- Logging ---
    # Count the number of rows where the COB could not be determined.
    num_missing = df_processed['more_educated_parent_cob'].isna().sum()

    return df_processed, int(num_missing)

# ------------------------------------------------------------------------------
# Task 8, Step 2: Merge student data with Barro-Lee data.
# ------------------------------------------------------------------------------

def _merge_with_instrument_data(
    student_df: pd.DataFrame,
    barro_lee_df: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Merges the student dataset with the Barro-Lee instrument data.

    This function performs a left join, attaching the historical gender gap
    data from the Barro-Lee dataset to each student record based on the
    previously identified country of birth of the more-educated parent.

    Args:
        student_df: The student DataFrame, augmented with
                    'more_educated_parent_cob'.
        barro_lee_df: The validated Barro-Lee DataFrame.

    Returns:
        A tuple containing:
        - pd.DataFrame: The merged DataFrame.
        - Dict[str, int]: A log detailing the merge success rate.
    """
    # --- Input Validation ---
    if not isinstance(student_df, pd.DataFrame):
        raise TypeError("Input 'student_df' must be a pandas DataFrame.")
    if not isinstance(barro_lee_df, pd.DataFrame):
        raise TypeError("Input 'barro_lee_df' must be a pandas DataFrame.")

    # --- Merge Preparation ---
    # Defensive cleaning of join keys to prevent mismatches due to formatting.
    student_df_copy = student_df.copy()
    barro_lee_df_copy = barro_lee_df.copy()
    student_df_copy['merge_key'] = student_df_copy['more_educated_parent_cob'].str.upper().str.strip()
    barro_lee_df_copy['merge_key'] = barro_lee_df_copy['country_cob'].str.upper().str.strip()

    # --- Merge Operation ---
    # Perform a left merge to keep all students.
    # The 'validate' parameter ensures the merge does not create duplicate rows.
    merged_df = pd.merge(
        student_df_copy,
        barro_lee_df_copy[['merge_key', 'share_tertiary_women_1960', 'share_tertiary_men_1960']],
        on='merge_key',
        how='left',
        validate='many_to_one'
    )

    # Drop the temporary merge key.
    merged_df = merged_df.drop(columns=['merge_key'])

    # --- Logging ---
    # Calculate the number of students who successfully matched.
    num_matched = merged_df['share_tertiary_women_1960'].notna().sum()
    num_total = len(merged_df)
    log = {
        "total_students_for_merge": num_total,
        "students_matched_to_instrument": int(num_matched),
        "students_unmatched": num_total - int(num_matched)
    }

    return merged_df, log

# ------------------------------------------------------------------------------
# Task 8, Step 3: Compute the gender gap instrument (GG).
# ------------------------------------------------------------------------------

def _compute_gender_gap_instrument(
    merged_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, int]:
    """
    Computes the gender gap (GG) instrumental variable.

    This function calculates the difference between the share of women and men
    with tertiary education in 1960, based on the merged Barro-Lee data.

    Equation implemented: GG_i = share_tertiary_women_1960_i - share_tertiary_men_1960_i

    Args:
        merged_df: The DataFrame after merging with the Barro-Lee data.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame augmented with the 'GG' column.
        - int: The number of rows with a valid, non-null 'GG' value.
    """
    # --- Input Validation ---
    if not isinstance(merged_df, pd.DataFrame):
        raise TypeError("Input 'merged_df' must be a pandas DataFrame.")

    # --- Computation ---
    df_processed = merged_df.copy()

    # Get the name for the instrument variable from the config.
    gg_col_name = config['variable_definitions']['instrumental_variable']

    # Calculate the gender gap. Pandas' vectorized subtraction handles NaNs correctly.
    df_processed[gg_col_name] = (
        df_processed['share_tertiary_women_1960'] - df_processed['share_tertiary_men_1960']
    )

    # --- Final Validation and Logging ---
    # Verify that all computed GG values are within the logical range [-1.0, 1.0].
    valid_gg = df_processed[gg_col_name].dropna()
    if not valid_gg.between(-1.0, 1.0).all():
        raise DataIntegrityError("Computed 'GG' values fall outside the logical range of [-1.0, 1.0].")

    # Count the number of valid, non-null GG values.
    num_valid_gg = int(valid_gg.notna().sum())

    return df_processed, num_valid_gg

# ------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# ------------------------------------------------------------------------------

def construct_instrumental_variable(
    featured_df: pd.DataFrame,
    barro_lee_df: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrates the construction of the instrumental variable (GG).

    This function executes the full pipeline to create the gender gap instrument:
    1. Selects the appropriate parent's country of birth.
    2. Merges the student data with the external Barro-Lee dataset.
    3. Computes the final 'GG' instrumental variable.

    Args:
        featured_df: The fully-featured student DataFrame from Task 7.
        barro_lee_df: The validated Barro-Lee DataFrame from Task 3.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - pd.DataFrame: The DataFrame augmented with the 'GG' column.
        - Dict[str, Any]: A comprehensive log of the instrument construction
                          process, including merge rates and final sample size.
    """
    # --- Input Validation ---
    if not isinstance(featured_df, pd.DataFrame):
        raise TypeError("Input 'featured_df' must be a pandas DataFrame.")
    if not isinstance(barro_lee_df, pd.DataFrame):
        raise TypeError("Input 'barro_lee_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Step 1: Select the more-educated parent's country of birth ---
    df_step1, num_missing_cob = _select_instrument_country_of_birth(
        df=featured_df,
        config=config
    )

    # --- Step 2: Merge with the Barro-Lee instrument data ---
    df_step2, merge_log = _merge_with_instrument_data(
        student_df=df_step1,
        barro_lee_df=barro_lee_df
    )

    # --- Step 3: Compute the gender gap instrument (GG) ---
    df_final, num_valid_gg = _compute_gender_gap_instrument(
        merged_df=df_step2,
        config=config
    )

    # --- Compile Final Log ---
    final_log = {
        "cob_selection": {
            "rows_with_missing_instrument_cob": num_missing_cob
        },
        "merge_statistics": merge_log,
        "instrument_computation": {
            "final_rows_with_valid_gg": num_valid_gg
        }
    }

    return df_final, final_log


In [None]:
# Task 9: Define model-specific analysis samples with listwise deletion

# ==============================================================================
# Task 9: Define model-specific analysis samples with listwise deletion
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 9, Generic Helper Function for Sample Creation
# ------------------------------------------------------------------------------

def _create_analysis_sample(
    df: pd.DataFrame,
    required_cols: List[str]
) -> pd.DataFrame:
    """
    Creates a model-specific analysis sample by performing listwise deletion.

    This function takes a DataFrame and a list of columns required for a
    specific model. It removes all rows that have a missing value in any of
    the specified columns, resulting in a complete-case analysis sample.

    Args:
        df: The source DataFrame from which to create the sample.
        required_cols: A list of column names that must be non-missing for a
                       row to be included in the final sample.

    Returns:
        A new DataFrame containing only the complete cases for the specified
        columns.
    """
    # --- Input Validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")

    # Check if all required columns exist in the DataFrame.
    missing_in_df = set(required_cols) - set(df.columns)
    if missing_in_df:
        raise SchemaValidationError(
            f"Cannot create sample. The following required columns are missing "
            f"from the DataFrame: {sorted(list(missing_in_df))}"
        )

    # --- Listwise Deletion ---
    # Use .dropna() with the specified subset of columns.
    # .copy() is used to ensure the output is a new DataFrame, not a view.
    analysis_sample = df.dropna(subset=required_cols).copy()

    return analysis_sample

# ------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# ------------------------------------------------------------------------------

def define_analysis_samples(
    final_df: pd.DataFrame,
    control_vars_list: List[str],
    config: Dict[str, Any]
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """
    Orchestrates the creation of all model-specific analysis samples.

    This function systematically generates all datasets required for the study's
    various regression models (OLS, IV, pooled, by-grade, interaction). It
    applies listwise deletion based on the specific variables needed for each
    model, ensuring each analysis is run on the appropriate complete-case sample.

    Args:
        final_df: The fully feature-engineered DataFrame from Task 8.
        control_vars_list: The list of all created dummy/control variable names.
        config: The validated study configuration dictionary.

    Returns:
        A tuple containing:
        - Dict[str, Any]: A nested dictionary holding all the generated
                          analysis sample DataFrames.
        - Dict[str, Any]: A nested dictionary logging the final sample size (N)
                          for each generated sample.
    """
    # --- Input Validation ---
    if not isinstance(final_df, pd.DataFrame):
        raise TypeError("Input 'final_df' must be a pandas DataFrame.")
    if not isinstance(control_vars_list, list):
        raise TypeError("Input 'control_vars_list' must be a list.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Initialization ---
    # Initialize dictionaries to store the output samples and their sizes.
    samples: Dict[str, Any] = {
        'ols': {'pooled': {}, 'interaction': {}},
        'iv': {'pooled': {}, 'by_grade': {}, 'interaction': {}}
    }
    sample_logs: Dict[str, Any] = {
        'ols': {'pooled': {}, 'interaction': {}},
        'iv': {'pooled': {}, 'by_grade': {}, 'interaction': {}}
    }

    # Extract key variable names from the configuration for convenience.
    dep_vars = config['variable_definitions']['dependent_variables']
    phe_dummies = config['variable_definitions']['phe_dummy_names']
    phe_var = config['variable_definitions']['endogenous_variable']
    iv_var = config['variable_definitions']['instrumental_variable']
    grade_pairs = config['model_specifications']['grade_pair_definitions']
    grades = config['population_scope']['grades_of_interest']

    # --- Step 1: Define Pooled OLS Samples ---
    base_ols_cols = phe_dummies + control_vars_list
    for y_var in dep_vars:
        required_cols = [y_var] + base_ols_cols
        sample = _create_analysis_sample(final_df, required_cols)
        samples['ols']['pooled'][y_var] = sample
        sample_logs['ols']['pooled'][y_var] = len(sample)

    # --- Step 2: Define Grade-Interaction OLS Samples ---
    for y_var in dep_vars:
        samples['ols']['interaction'][y_var] = {}
        sample_logs['ols']['interaction'][y_var] = {}
        for u, v in grade_pairs:
            # Filter data to the relevant pair of grades.
            grade_pair_df = final_df[final_df['grade'].isin([u, v])].copy()

            # Create the interaction indicator and terms.
            indicator = f"I_{u}_{v}"
            grade_pair_df[indicator] = (grade_pair_df['grade'] == v).astype(float)
            interaction_terms = []
            for dummy in phe_dummies:
                inter_term = f"{dummy}_x_{indicator}"
                grade_pair_df[inter_term] = grade_pair_df[dummy] * grade_pair_df[indicator]
                interaction_terms.append(inter_term)

            # Define required columns and create the sample.
            required_cols = [y_var] + base_ols_cols + [indicator] + interaction_terms
            sample = _create_analysis_sample(grade_pair_df, required_cols)
            samples['ols']['interaction'][y_var][(u, v)] = sample
            sample_logs['ols']['interaction'][y_var][(u, v)] = len(sample)

    # --- Step 3: Define IV Samples ---
    base_iv_cols = [phe_var, iv_var] + control_vars_list

    # Pooled IV Samples
    for y_var in dep_vars:
        required_cols = [y_var] + base_iv_cols
        sample = _create_analysis_sample(final_df, required_cols)
        samples['iv']['pooled'][y_var] = sample
        sample_logs['iv']['pooled'][y_var] = len(sample)

    # By-Grade IV Samples
    for y_var in dep_vars:
        samples['iv']['by_grade'][y_var] = {}
        sample_logs['iv']['by_grade'][y_var] = {}
        for grade in grades:
            grade_df = final_df[final_df['grade'] == grade]
            required_cols = [y_var] + base_iv_cols
            sample = _create_analysis_sample(grade_df, required_cols)
            samples['iv']['by_grade'][y_var][grade] = sample
            sample_logs['iv']['by_grade'][y_var][grade] = len(sample)

    # Interaction IV Samples
    for y_var in dep_vars:
        samples['iv']['interaction'][y_var] = {}
        sample_logs['iv']['interaction'][y_var] = {}
        for u, v in grade_pairs:
            # Filter data to the relevant pair of grades.
            grade_pair_df = final_df[final_df['grade'].isin([u, v])].copy()

            # Create the interaction indicator and term for the endogenous variable.
            indicator = f"I_{u}_{v}"
            inter_term = f"{phe_var}_x_{indicator}"
            grade_pair_df[indicator] = (grade_pair_df['grade'] == v).astype(float)
            grade_pair_df[inter_term] = grade_pair_df[phe_var] * grade_pair_df[indicator]

            # Define required columns and create the sample.
            required_cols = [y_var] + base_iv_cols + [indicator, inter_term]
            sample = _create_analysis_sample(grade_pair_df, required_cols)
            samples['iv']['interaction'][y_var][(u, v)] = sample
            sample_logs['iv']['interaction'][y_var][(u, v)] = len(sample)

    return samples, sample_logs


In [None]:
# Task 10: Specify and document the OLS model equations and fixed-effects structure

# ==============================================================================
# Task 10: Specify and document the OLS model equations and fixed-effects
#          structure
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 10, Step 1: Specify the pooled OLS model.
# ------------------------------------------------------------------------------

def _specify_pooled_ols_model(
    dependent_var: str,
    control_vars: List[str],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Defines the specification for a pooled OLS regression model.

    This function programmatically constructs a dictionary that contains all the
    necessary components to define the pooled OLS model from Equation (1) of
    the study. This includes the dependent variable, exogenous variables (PHE
    dummies and all controls), fixed effects, and clustering variable.

    Equation (1): y_its = α₀ + Σ αⱼpⱼᵢ + Controls + aₜ + bₛ + εᵢₜₛ

    Args:
        dependent_var: The name of the dependent variable (e.g., 'Score_Math').
        control_vars: A list of all control variable names (dummies).
        config: The validated study configuration dictionary.

    Returns:
        A dictionary representing the complete model specification.
    """
    # --- Specification Construction ---
    # Extract the names of the PHE dummy variables from the config.
    phe_dummies = config['variable_definitions']['phe_dummy_names']

    # Combine PHE dummies and all other control variables into one list of regressors.
    exog_vars = phe_dummies + control_vars

    # Assemble the full model specification into a structured dictionary.
    specification = {
        "dependent": dependent_var,
        "exog": exog_vars,
        "fixed_effects": config['model_specifications']['fixed_effects'],
        "cluster_by": config['inference_parameters']['standard_error_clustering_level'],
        "model_type": "Pooled OLS (Persistent Effect)"
    }

    return specification

# ------------------------------------------------------------------------------
# Task 10, Step 2: Specify the OLS model with grade-by-PHE interactions.
# ------------------------------------------------------------------------------

def _specify_interaction_ols_model(
    dependent_var: str,
    grade_pair: Tuple[int, int],
    control_vars: List[str],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Defines the specification for an OLS regression with interaction terms.

    This function constructs the specification for the Matthew Effect test model
    from Equation (2) of the study. It includes main effects, a grade indicator,
    and interaction terms between the grade indicator and PHE dummies.

    Equation (2): yᵢgₜₛ = α₀ + Σ αⱼpⱼᵢ + ηI(u,v) + Σ θⱼ(pⱼᵢ·I(u,v)) + ...

    Args:
        dependent_var: The name of the dependent variable.
        grade_pair: A tuple (u, v) representing the reference and comparison grades.
        control_vars: A list of all control variable names.
        config: The validated study configuration dictionary.

    Returns:
        A dictionary representing the complete interaction model specification.
    """
    # --- Specification Construction ---
    # Unpack the grade pair.
    u, v = grade_pair

    # Get the base list of PHE dummies.
    phe_dummies = config['variable_definitions']['phe_dummy_names']

    # Programmatically define the names for the indicator and interaction terms.
    # This matches the naming convention used in Task 9.
    indicator = f"I_{u}_{v}"
    interaction_terms = [f"{dummy}_x_{indicator}" for dummy in phe_dummies]

    # Combine all regressors: PHE dummies, controls, indicator, and interactions.
    exog_vars = phe_dummies + control_vars + [indicator] + interaction_terms

    # Assemble the full model specification.
    specification = {
        "dependent": dependent_var,
        "exog": exog_vars,
        "fixed_effects": config['model_specifications']['fixed_effects'],
        "cluster_by": config['inference_parameters']['standard_error_clustering_level'],
        "model_type": f"Interaction OLS (Matthew Effect Test, Grades {u} vs {v})"
    }

    return specification

# ------------------------------------------------------------------------------
# Task 10, Step 3: Specify OLS models for effort and parental investment.
# ------------------------------------------------------------------------------

def _specify_effort_investment_ols_model(
    dependent_var: str,
    grade: int,
    control_vars: List[str],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Defines the specification for by-grade effort and investment models.

    This function constructs the specification for the models that analyze the
    impact of student effort and parental investment within a specific subject
    and grade. The model includes PHE dummies and the full set of controls.

    Args:
        dependent_var: The name of the dependent variable.
        grade: The specific grade for this model (3, 6, or 10).
        control_vars: A list of all control variable names.
        config: The validated study configuration dictionary.

    Returns:
        A dictionary representing the complete model specification.
    """
    # --- Specification Construction ---
    # This model uses the same regressor structure as the pooled OLS.
    phe_dummies = config['variable_definitions']['phe_dummy_names']
    exog_vars = phe_dummies + control_vars

    # Assemble the full model specification.
    specification = {
        "dependent": dependent_var,
        "exog": exog_vars,
        "fixed_effects": config['model_specifications']['fixed_effects'],
        "cluster_by": config['inference_parameters']['standard_error_clustering_level'],
        "model_type": f"Effort/Investment OLS (Grade {grade})"
    }

    return specification

# ------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# ------------------------------------------------------------------------------

def specify_ols_models(
    control_vars_list: List[str],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the specification of all OLS models for the study.

    This function serves as a factory for creating the precise definitions of
    every OLS regression to be estimated. It does not run the models but
    produces a structured dictionary of specifications that will drive the
    estimation tasks, ensuring consistency and fidelity to the research design.

    Args:
        control_vars_list: The comprehensive list of all created control
                           variable names from Task 7.
        config: The validated study configuration dictionary.

    Returns:
        A nested dictionary containing the complete specifications for all
        pooled, interaction, and by-grade effort/investment OLS models.
    """
    # --- Input Validation ---
    if not isinstance(control_vars_list, list):
        raise TypeError("Input 'control_vars_list' must be a list.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Initialization ---
    # Initialize the main dictionary to store all model specifications.
    specifications: Dict[str, Any] = {
        'pooled': {},
        'interaction': {},
        'effort_investment': {}
    }

    # Extract key metadata from the configuration.
    dep_vars = config['variable_definitions']['dependent_variables']
    grade_pairs = config['model_specifications']['grade_pair_definitions']
    grades = config['population_scope']['grades_of_interest']

    # --- Step 1: Generate Pooled OLS Specifications ---
    for y_var in dep_vars:
        specifications['pooled'][y_var] = _specify_pooled_ols_model(
            dependent_var=y_var,
            control_vars=control_vars_list,
            config=config
        )

    # --- Step 2: Generate Interaction OLS Specifications ---
    for y_var in dep_vars:
        specifications['interaction'][y_var] = {}
        for pair in grade_pairs:
            specifications['interaction'][y_var][pair] = _specify_interaction_ols_model(
                dependent_var=y_var,
                grade_pair=pair,
                control_vars=control_vars_list,
                config=config
            )

    # --- Step 3: Generate Effort/Investment OLS Specifications ---
    for y_var in dep_vars:
        specifications['effort_investment'][y_var] = {}
        for grade in grades:
            # The study design does not assess Grade 10 in 2016, but the model
            # specification is still valid for other years. The sample creation
            # in Task 9 handles the data filtering.
            specifications['effort_investment'][y_var][grade] = _specify_effort_investment_ols_model(
                dependent_var=y_var,
                grade=grade,
                control_vars=control_vars_list,
                config=config
            )

    return specifications


In [None]:
# Task 11: Specify and document the IV (2SLS) model equations and diagnostics

# ==============================================================================
# Task 11: Specify and document the IV (2SLS) model equations and diagnostics
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 11, Generic Helper Function for IV Model Specification
# ------------------------------------------------------------------------------

def _specify_iv_model_components(
    dependent_var: str,
    control_vars: List[str],
    config: Dict[str, Any],
    grade_pair: Tuple[int, int] = None
) -> Dict[str, Any]:
    """
    Defines the specifications for all components of a 2SLS model.

    This function programmatically constructs a dictionary containing the precise
    definitions for the first-stage, reduced-form, and second-stage regressions
    of a complete IV analysis, including handling for interaction terms.

    Args:
        dependent_var: The name of the final outcome variable (for the 2nd stage).
        control_vars: A list of all exogenous control variable names.
        config: The validated study configuration dictionary.
        grade_pair: Optional tuple (u, v) for interaction models.

    Returns:
        A dictionary containing the specifications for 'first_stage',
        'reduced_form', and 'second_stage'.
    """
    # --- Variable Definitions from Config ---
    # Extract names for endogenous, instrumental, and fixed effect variables.
    endog_var = config['variable_definitions']['endogenous_variable']  # 'PHE'
    instrument_var = config['variable_definitions']['instrumental_variable']  # 'GG'
    fixed_effects = config['model_specifications']['fixed_effects']
    cluster_by = config['inference_parameters']['standard_error_clustering_level']

    # --- Base Specification Components ---
    # Exogenous regressors are the controls. The instrument is handled separately.
    exog_controls = control_vars

    # --- Handle Interaction Terms (if applicable) ---
    if grade_pair:
        u, v = grade_pair
        indicator = f"I_{u}_{v}"

        # For interaction models, the indicator is an additional exogenous control.
        exog_controls_with_indicator = exog_controls + [indicator]

        # The endogenous variable and its interaction.
        endog_vars_interaction = [endog_var, f"{endog_var}_x_{indicator}"]

        # The instrument and its interaction with the indicator.
        instruments_interaction = [instrument_var, f"{instrument_var}_x_{indicator}"]
    else:
        # For non-interaction models, the lists are simpler.
        exog_controls_with_indicator = exog_controls
        endog_vars_interaction = [endog_var]
        instruments_interaction = [instrument_var]

    # --- Step 1: Specify the First-Stage Regression ---
    # Regresses the endogenous variable(s) on instruments and all exogenous controls.
    # Equation (3): PHEᵢ = π₀ + π₁GGᵢ + Controls + aₜ + bₛ + νᵢₜₛ
    first_stage_spec = {
        "dependent": endog_vars_interaction, # Dependent var(s) of the first stage
        "exog": instruments_interaction + exog_controls_with_indicator,
        "fixed_effects": fixed_effects,
        "cluster_by": cluster_by,
        "model_type": "IV First Stage"
    }

    # --- Step 2: Specify the Reduced-Form Regression ---
    # Regresses the final outcome on the instrument(s) and all exogenous controls.
    # Equation: yᵢₜₛ = ρ₀ + ρ₁GGᵢ + Controls + aₜ + bₛ + ωᵢₜₛ
    reduced_form_spec = {
        "dependent": dependent_var,
        "exog": instruments_interaction + exog_controls_with_indicator,
        "fixed_effects": fixed_effects,
        "cluster_by": cluster_by,
        "model_type": "IV Reduced Form (Diagnostic)"
    }

    # --- Step 3: Specify the Second-Stage Regression ---
    # Defines the main causal model for a 2SLS estimator.
    # Equation (4): yᵢₜₛ = α₀ + α₁P̂HEᵢ + Controls + aₜ + bₛ + εᵢₜₛ
    # Equation (5): yᵢgₜₛ = α₀ + α₁P̂HEᵢ + ηI(u,v) + α₂(P̂HEᵢ·I(u,v)) + ...
    second_stage_spec = {
        "dependent": dependent_var,
        "exog": exog_controls_with_indicator, # Exogenous controls only
        "endog": endog_vars_interaction,     # Endogenous variable(s)
        "instruments": instruments_interaction, # Instrumental variable(s)
        "fixed_effects": fixed_effects,
        "cluster_by": cluster_by,
        "model_type": "IV Second Stage (Causal Effect)"
    }

    # --- Assemble Final Specification ---
    # Combine all three components into a single dictionary for the IV model.
    iv_model_specification = {
        "first_stage": first_stage_spec,
        "reduced_form": reduced_form_spec,
        "second_stage": second_stage_spec
    }

    return iv_model_specification

# ------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# ------------------------------------------------------------------------------

def specify_iv_models(
    control_vars_list: List[str],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the specification of all IV (2SLS) models for the study.

    This function acts as a factory for creating the precise definitions of
    every IV regression and its associated diagnostics (first-stage and
    reduced-form). It produces a structured dictionary of specifications that
    will drive the causal estimation tasks.

    Args:
        control_vars_list: The comprehensive list of all created control
                           variable names from Task 7.
        config: The validated study configuration dictionary.

    Returns:
        A nested dictionary containing the complete specifications for all
        pooled, by-grade, and interaction IV models.
    """
    # --- Input Validation ---
    if not isinstance(control_vars_list, list):
        raise TypeError("Input 'control_vars_list' must be a list.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Initialization ---
    # Initialize the main dictionary to store all IV model specifications.
    specifications: Dict[str, Any] = {
        'pooled': {},
        'by_grade': {},
        'interaction': {}
    }

    # Extract key metadata from the configuration.
    dep_vars = config['variable_definitions']['dependent_variables']
    grade_pairs = config['model_specifications']['grade_pair_definitions']
    grades = config['population_scope']['grades_of_interest']

    # --- Generate Pooled IV Specifications ---
    for y_var in dep_vars:
        specifications['pooled'][y_var] = _specify_iv_model_components(
            dependent_var=y_var,
            control_vars=control_vars_list,
            config=config
        )

    # --- Generate By-Grade IV Specifications ---
    for y_var in dep_vars:
        specifications['by_grade'][y_var] = {}
        for grade in grades:
            specifications['by_grade'][y_var][grade] = _specify_iv_model_components(
                dependent_var=y_var,
                control_vars=control_vars_list,
                config=config
            )

    # --- Generate Interaction IV Specifications ---
    for y_var in dep_vars:
        specifications['interaction'][y_var] = {}
        for pair in grade_pairs:
            specifications['interaction'][y_var][pair] = _specify_iv_model_components(
                dependent_var=y_var,
                control_vars=control_vars_list,
                config=config,
                grade_pair=pair
            )

    return specifications


In [None]:
# Task 12: Implement cluster-robust standard error computation at the school level

# ==============================================================================
# Task 12: Implement the generic estimation engine for models with fixed
#          effects and cluster-robust standard errors
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 12, Orchestrator and Implementation Function
# ------------------------------------------------------------------------------

def estimate_model(
    analysis_df: pd.DataFrame,
    model_spec: Dict[str, Any],
    config: Dict[str, Any]
) -> Union[PanelEffectsResults, IVPanelEffectsResults]:
    """
    Estimates a specified regression model with fixed effects and clustered SEs.

    This function serves as a generic estimation engine for the entire study.
    It takes a model specification and a corresponding analysis sample, then
    estimates the appropriate model (PanelOLS or IV2SLS) using the `linearmodels`
    library. It automatically handles high-dimensional fixed effects (school and
    year) and computes standard errors clustered at the school level, precisely
    implementing the study's econometric methodology.

    Methodology:
    1.  **Panel Data Setup**: The input DataFrame is indexed by the entity
        (school_id) and time (academic_year) variables to create a panel structure.
    2.  **Fixed Effects**: The `entity_effects=True` and `time_effects=True`
        arguments instruct the estimator to absorb the school and year fixed
        effects using a within-transformation (de-meaning), which is
        computationally efficient and numerically stable (Frisch-Waugh-Lovell).
    3.  **Cluster-Robust SE**: The `.fit()` method is configured with
        `cov_type='clustered'` and `cluster_entity=True` to compute the
        cluster-robust variance-covariance matrix, which is robust to arbitrary
        heteroskedasticity and within-school correlation of errors.
        The formula is: V_cluster = (X'X)⁻¹(Σ X_g'û_gû_g'X_g)(X'X)⁻¹

    Args:
        analysis_df: The model-specific, complete-case analysis sample DataFrame.
        model_spec: A dictionary defining the model (dependent, exog, endog, etc.).
        config: The validated study configuration dictionary.

    Returns:
        The fitted model result object from the `linearmodels` library, which
        contains all regression outputs (coefficients, SEs, p-values, etc.).

    Raises:
        ValueError: If the model specification is invalid or incomplete.
        TypeError: If input types are incorrect.
    """
    # --- Input Validation ---
    if not isinstance(analysis_df, pd.DataFrame):
        raise TypeError("Input 'analysis_df' must be a pandas DataFrame.")
    if not isinstance(model_spec, dict):
        raise TypeError("Input 'model_spec' must be a dictionary.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Data Preparation for Panel Model ---
    # Create a copy to avoid modifying the original sample DataFrame.
    df = analysis_df.copy()

    # Extract fixed effect and cluster variable names from the specification.
    entity_col, time_col = model_spec['fixed_effects']
    cluster_col = model_spec['cluster_by']

    # The linearmodels library requires a MultiIndex of (entity, time).
    # Verify the cluster variable is the same as the entity variable.
    if cluster_col != entity_col:
        raise ConfigurationError(
            f"Clustering variable '{cluster_col}' must be the same as the "
            f"entity variable '{entity_col}' for this implementation."
        )

    # Set the panel index.
    df = df.set_index([entity_col, time_col])

    # --- Model Specification Parsing ---
    # Extract dependent and exogenous variables from the specification.
    dependent = df[[model_spec['dependent']]] if isinstance(model_spec['dependent'], str) else df[model_spec['dependent']]
    exog = df[model_spec['exog']]

    # Add a constant to the exogenous variables, which is required by linearmodels.
    # The fixed effects absorption will correctly handle the intercept.
    exog['const'] = 1.0

    # --- Model Estimation ---
    # Determine whether to run an IV or OLS model based on the specification keys.
    is_iv_model = 'endog' in model_spec and 'instruments' in model_spec

    if is_iv_model:
        # This is a 2SLS model.
        endog = df[model_spec['endog']]
        instruments = df[model_spec['instruments']]

        # Instantiate the IV2SLS model with fixed effects.
        model = IV2SLS(
            dependent=dependent,
            exog=exog,
            endog=endog,
            instruments=instruments,
            entity_effects=True,
            time_effects=True
        )
    else:
        # This is a standard OLS model.
        # Instantiate the PanelOLS model with fixed effects.
        model = PanelOLS(
            dependent=dependent,
            exog=exog,
            entity_effects=True,
            time_effects=True
        )

    # --- Fit the Model with Cluster-Robust Standard Errors ---
    # The configuration specifies the covariance type and clustering.
    # `cov_type='clustered'` enables robust standard errors.
    # `cluster_entity=True` specifies clustering on the entity index (school_id).
    # `debiased=True` corresponds to the HC1 small-sample adjustment.
    results = model.fit(
        cov_type='clustered',
        cluster_entity=True,
        debiased=config['inference_parameters']['robust_se_type'] == 'HC1'
    )

    return results


In [None]:
# Task 13: Estimate pooled OLS models with FE and extract persistent-effect coefficients

# ==============================================================================
# Task 13: Estimate pooled OLS models with FE and extract persistent-effect
#          coefficients
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 13, Step 2: Helper function to extract and format model results.
# ------------------------------------------------------------------------------

def _get_significance_stars(p_value: float, config: Dict[str, Any]) -> str:
    """
    Assigns significance stars to a p-value based on configured thresholds.

    Args:
        p_value: The p-value of a coefficient estimate.
        config: The validated study configuration dictionary.

    Returns:
        A string containing the appropriate significance stars ('***', '**', '*').
    """
    # Extract significance levels from the configuration.
    levels = config['inference_parameters']['significance_levels']
    # Determine the star rating based on the p-value.
    if p_value < levels['star_3']:
        return '***'
    elif p_value < levels['star_2']:
        return '**'
    elif p_value < levels['star_1']:
        return '*'
    return ''

def _extract_result_summary(
    results: PanelEffectsResults,
    target_coeffs: List[str],
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Extracts, formats, and summarizes key statistics from a fitted model.

    This function processes a `linearmodels` result object to create a clean,
    publication-ready summary table. It focuses on specified target
    coefficients and includes essential model statistics.

    Args:
        results: The fitted model object from `linearmodels`.
        target_coeffs: A list of coefficient names to include in the summary.
        config: The validated study configuration dictionary.

    Returns:
        A pandas DataFrame summarizing the regression results.
    """
    # --- Result Extraction ---
    # Initialize a list to hold dictionaries of results for each coefficient.
    summary_data = []

    # Iterate through the target coefficients to extract their statistics.
    for coeff in target_coeffs:
        # Extract core statistics directly from the results object attributes.
        estimate = results.params[coeff]
        std_err = results.std_errors[coeff]
        p_value = results.pvalues[coeff]
        t_stat = results.tstats[coeff]

        # Calculate the 95% confidence interval.
        ci_lower = estimate - 1.96 * std_err
        ci_upper = estimate + 1.96 * std_err

        # Get significance stars.
        stars = _get_significance_stars(p_value, config)

        # Append the formatted results to the list.
        summary_data.append({
            'Coefficient': coeff,
            'Estimate': estimate,
            'Std. Error': std_err,
            'T-stat': t_stat,
            'P-value': p_value,
            'CI Lower': ci_lower,
            'CI Upper': ci_upper,
            'Significance': stars
        })

    # --- DataFrame Creation and Formatting ---
    # Convert the list of dictionaries to a pandas DataFrame.
    summary_df = pd.DataFrame(summary_data).set_index('Coefficient')

    # --- Add Model-Level Statistics ---
    # Add key model statistics for context and reporting.
    summary_df['N'] = int(results.nobs)
    summary_df['Num_Clusters'] = int(results.n_clusters['cluster_entity'])
    summary_df['R2_Within'] = results.rsquared_within

    return summary_df

# ------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# ------------------------------------------------------------------------------

def run_pooled_ols_estimation(
    analysis_samples: Dict[str, pd.DataFrame],
    model_specifications: Dict[str, Dict[str, Any]],
    config: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the estimation of pooled OLS models for all subjects.

    This function iterates through each subject (dependent variable), retrieves
    the appropriate analysis sample and model specification, runs the
    regression using the generic estimation engine, extracts the results, and
    performs validation checks.

    Args:
        analysis_samples: A dictionary of the pooled OLS analysis samples,
                          keyed by dependent variable name.
        model_specifications: A dictionary of the pooled OLS model
                              specifications, keyed by dependent variable name.
        config: The validated study configuration dictionary.

    Returns:
        A dictionary where keys are dependent variable names and values are
        pandas DataFrames containing the formatted regression results.
    """
    # --- Input Validation ---
    if not isinstance(analysis_samples, dict):
        raise TypeError("Input 'analysis_samples' must be a dictionary.")
    if not isinstance(model_specifications, dict):
        raise TypeError("Input 'model_specifications' must be a dictionary.")

    # --- Estimation Loop ---
    # Initialize a dictionary to store the final results tables.
    all_results: Dict[str, pd.DataFrame] = {}

    # Get the list of dependent variables to iterate over.
    dependent_vars = config['variable_definitions']['dependent_variables']

    # Get the names of the key coefficients of interest.
    target_coeffs = config['variable_definitions']['phe_dummy_names']

    # Loop through each subject (e.g., 'Score_Math', 'Score_Lit', 'Score_Eng').
    for y_var in dependent_vars:
        print(f"--- Estimating Pooled OLS for: {y_var} ---")

        # --- Step 1: Fit the pooled OLS model ---
        # Retrieve the correct sample and specification for the current subject.
        sample = analysis_samples[y_var]
        spec = model_specifications[y_var]

        # Estimate the model using the generic estimation engine from Task 12.
        try:
            results = estimate_model(
                analysis_df=sample,
                model_spec=spec,
                config=config
            )
        except Exception as e:
            raise RuntimeError(f"Estimation failed for Pooled OLS on {y_var}.") from e

        # --- Step 2: Extract and format results ---
        # Process the results object into a clean summary table.
        summary_table = _extract_result_summary(
            results=results,
            target_coeffs=target_coeffs,
            config=config
        )

        # Store the final summary table.
        all_results[y_var] = summary_table
        print(f"Estimation successful for {y_var}.")

    # --- Step 3: Validate expected sign and magnitude patterns ---
    # This cross-model validation is performed after all models are estimated.
    print("\n--- Validating Pooled OLS Results Patterns ---")

    # Check that coefficients are positive for all subjects.
    for y_var, res_df in all_results.items():
        for coeff in target_coeffs:
            estimate = res_df.loc[coeff, 'Estimate']
            if estimate <= 0:
                warnings.warn(
                    f"ValidationWarning: Coefficient '{coeff}' for subject '{y_var}' "
                    f"is not positive as expected (Estimate: {estimate:.3f}).",
                    UserWarning
                )

    # Check the expected ordering of effect sizes (Eng > Lit > Math).
    try:
        p2_coeff_name = target_coeffs[1] # Assumes 'p2' is the second dummy
        eng_eff = all_results['Score_Eng'].loc[p2_coeff_name, 'Estimate']
        lit_eff = all_results['Score_Lit'].loc[p2_coeff_name, 'Estimate']
        math_eff = all_results['Score_Math'].loc[p2_coeff_name, 'Estimate']

        if not (eng_eff > lit_eff > math_eff):
            warnings.warn(
                "ValidationWarning: The expected ordering of effect sizes "
                f"(Eng > Lit > Math) for '{p2_coeff_name}' was not observed. "
                f"Found: Eng={eng_eff:.2f}, Lit={lit_eff:.2f}, Math={math_eff:.2f}",
                UserWarning
            )
        else:
            print("Result patterns validated successfully.")
    except (KeyError, IndexError):
        warnings.warn("Could not perform pattern validation due to missing results.", UserWarning)

    return all_results


In [None]:
# Task 14: Estimate OLS models with grade-by-PHE interactions and extract Matthew-effect coefficients

# ==============================================================================
# Task 14: Estimate OLS models with grade-by-PHE interactions and extract
#          Matthew-effect coefficients
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# ------------------------------------------------------------------------------

def run_interaction_ols_estimation(
    interaction_samples: Dict[str, Dict[Tuple[int, int], pd.DataFrame]],
    interaction_specifications: Dict[str, Dict[Tuple[int, int], Dict[str, Any]]],
    config: Dict[str, Any]
) -> Dict[str, Dict[Tuple[int, int], pd.DataFrame]]:
    """
    Orchestrates the estimation of OLS models with grade-by-PHE interactions.

    This function systematically estimates the 9 models required to test for the
    Matthew Effect (one for each combination of 3 subjects and 3 grade-pair
    transitions). It uses the generic estimation engine and results extractor
    and concludes by validating the results against the study's key findings.

    Args:
        interaction_samples: A nested dictionary of the interaction analysis
                             samples from Task 9.
        interaction_specifications: A nested dictionary of the interaction model
                                    specifications from Task 10.
        config: The validated study configuration dictionary.

    Returns:
        A nested dictionary where keys are subject and grade-pair, and values
        are pandas DataFrames with the formatted regression results.
    """
    # --- Input Validation ---
    if not isinstance(interaction_samples, dict):
        raise TypeError("Input 'interaction_samples' must be a dictionary.")
    if not isinstance(interaction_specifications, dict):
        raise TypeError("Input 'interaction_specifications' must be a dictionary.")

    # --- Initialization ---
    # Initialize a nested dictionary to store the final results tables.
    all_results: Dict[str, Dict[Tuple[int, int], pd.DataFrame]] = {}

    # Extract key metadata from the configuration.
    dependent_vars = config['variable_definitions']['dependent_variables']
    grade_pairs = config['model_specifications']['grade_pair_definitions']
    phe_dummies = config['variable_definitions']['phe_dummy_names']

    # --- Estimation Loop ---
    # Use nested loops to iterate through every subject and grade-pair combination.
    for y_var in dependent_vars:
        # Initialize the inner dictionary for the current subject.
        all_results[y_var] = {}

        for pair in grade_pairs:
            u, v = pair
            print(f"--- Estimating Interaction OLS for: {y_var}, Grades {u} vs {v} ---")

            # --- Step 1: Fit the grade-interaction OLS model ---
            # Retrieve the correct sample and specification for the current model.
            try:
                sample = interaction_samples[y_var][pair]
                spec = interaction_specifications[y_var][pair]
            except KeyError:
                raise ConfigurationError(
                    f"Missing sample or specification for subject '{y_var}' "
                    f"and grade pair {pair}."
                )

            # Estimate the model using the generic estimation engine.
            try:
                results = estimate_model(
                    analysis_df=sample,
                    model_spec=spec,
                    config=config
                )
            except Exception as e:
                raise RuntimeError(
                    f"Estimation failed for Interaction OLS on {y_var}, "
                    f"Grades {u} vs {v}."
                ) from e

            # --- Step 2: Extract and format results for interaction terms ---
            # Programmatically generate the names of the interaction coefficients.
            indicator = f"I_{u}_{v}"
            interaction_coeffs = [f"{dummy}_x_{indicator}" for dummy in phe_dummies]

            # The target coefficients are the base effects and the interactions.
            target_coeffs = phe_dummies + interaction_coeffs

            # Process the results object into a clean summary table.
            summary_table = _extract_result_summary(
                results=results,
                target_coeffs=target_coeffs,
                config=config
            )

            # Store the final summary table in the nested dictionary.
            all_results[y_var][pair] = summary_table
            print(f"Estimation successful for {y_var}, Grades {u} vs {v}.")

    # --- Step 3: Validate expected Matthew-effect patterns by subject ---
    print("\n--- Validating Interaction OLS Results Patterns (Matthew Effect) ---")

    # Define the significance threshold for validation checks.
    alpha = config['inference_parameters']['significance_levels']['star_1'] # e.g., 0.10

    # Get the name of the interaction term for the highest education group (PHE=2).
    p2_dummy_name = phe_dummies[1] # Assumes 'p2' is the second dummy.

    try:
        # Check Mathematics: Expect negative effect for 6 vs 10.
        math_inter_6_10 = all_results['Score_Math'][(6, 10)]
        math_coeff_name = f"{p2_dummy_name}_x_I_6_10"
        math_est = math_inter_6_10.loc[math_coeff_name, 'Estimate']
        math_pval = math_inter_6_10.loc[math_coeff_name, 'P-value']
        if not (math_est < 0 and math_pval < alpha):
            warnings.warn(
                "ValidationWarning: Mathematics (6 vs 10) did not show the expected "
                f"significant negative interaction. Found Est={math_est:.3f}, P-val={math_pval:.3f}",
                UserWarning
            )

        # Check Literature: Expect positive for 3 vs 6, negative for 6 vs 10.
        lit_inter_3_6 = all_results['Score_Lit'][(3, 6)]
        lit_coeff_name_3_6 = f"{p2_dummy_name}_x_I_3_6"
        lit_est_3_6 = lit_inter_3_6.loc[lit_coeff_name_3_6, 'Estimate']
        if not (lit_est_3_6 > 0): # Paper finds it positive, may not be significant
            warnings.warn(
                "ValidationWarning: Literature (3 vs 6) did not show the expected "
                f"positive interaction. Found Est={lit_est_3_6:.3f}",
                UserWarning
            )

        lit_inter_6_10 = all_results['Score_Lit'][(6, 10)]
        lit_coeff_name_6_10 = f"{p2_dummy_name}_x_I_6_10"
        lit_est_6_10 = lit_inter_6_10.loc[lit_coeff_name_6_10, 'Estimate']
        lit_pval_6_10 = lit_inter_6_10.loc[lit_coeff_name_6_10, 'P-value']
        if not (lit_est_6_10 < 0 and lit_pval_6_10 < alpha):
            warnings.warn(
                "ValidationWarning: Literature (6 vs 10) did not show the expected "
                f"significant negative interaction. Found Est={lit_est_6_10:.3f}, P-val={lit_pval_6_10:.3f}",
                UserWarning
            )

        # Check English: Expect positive effects for both transitions.
        eng_inter_3_6 = all_results['Score_Eng'][(3, 6)]
        eng_coeff_name_3_6 = f"{p2_dummy_name}_x_I_3_6"
        eng_est_3_6 = eng_inter_3_6.loc[eng_coeff_name_3_6, 'Estimate']
        eng_pval_3_6 = eng_inter_3_6.loc[eng_coeff_name_3_6, 'P-value']
        if not (eng_est_3_6 > 0 and eng_pval_3_6 < alpha):
            warnings.warn(
                "ValidationWarning: English (3 vs 6) did not show the expected "
                f"significant positive interaction. Found Est={eng_est_3_6:.3f}, P-val={eng_pval_3_6:.3f}",
                UserWarning
            )

        eng_inter_6_10 = all_results['Score_Eng'][(6, 10)]
        eng_coeff_name_6_10 = f"{p2_dummy_name}_x_I_6_10"
        eng_est_6_10 = eng_inter_6_10.loc[eng_coeff_name_6_10, 'Estimate']
        eng_pval_6_10 = eng_inter_6_10.loc[eng_coeff_name_6_10, 'P-value']
        if not (eng_est_6_10 > 0 and eng_pval_6_10 < alpha):
            warnings.warn(
                "ValidationWarning: English (6 vs 10) did not show the expected "
                f"significant positive interaction. Found Est={eng_est_6_10:.3f}, P-val={eng_pval_6_10:.3f}",
                UserWarning
            )

        print("Result patterns for interaction models validated.")
    except KeyError as e:
        warnings.warn(f"Could not perform pattern validation due to missing coefficient: {e}", UserWarning)

    return all_results


In [None]:
# Task 15: Estimate OLS effort and parental-investment models by subject × grade

# ==============================================================================
# Task 15: Estimate OLS effort and parental-investment models by subject × grade
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 15, Step 3: Helper function to validate result patterns.
# ------------------------------------------------------------------------------

def _validate_effort_investment_patterns(
    results: Dict[str, Dict[int, pd.DataFrame]],
    config: Dict[str, Any]
) -> None:
    """
    Validates the results of effort/investment models against expected patterns.

    This function checks for key developmental trends reported in the study, such
    as the changing effect of homework and parental investment across grades and
    subjects. It issues warnings for any significant deviations.

    Args:
        results: A nested dictionary of the regression summary tables.
        config: The validated study configuration dictionary.
    """
    # Define the significance threshold for validation checks.
    alpha = config['inference_parameters']['significance_levels']['star_1']

    # --- Validate Homework Effort Patterns ---
    try:
        # Grade 3: Expect negative or weak effects.
        g3_math_eff = results['Score_Math'][3].loc['days_homework_cat_4', 'Estimate']
        if g3_math_eff > 0:
            warnings.warn(
                "ValidationWarning: Homework effort in Grade 3 (Math) was expected to be "
                f"negative or weak, but found a positive estimate: {g3_math_eff:.3f}",
                UserWarning
            )

        # Grade 10: Expect monotonically positive and significant effects.
        g10_math_eff_4 = results['Score_Math'][10].loc['days_homework_cat_4', 'Estimate']
        g10_math_pval_4 = results['Score_Math'][10].loc['days_homework_cat_4', 'P-value']
        if not (g10_math_eff_4 > 0 and g10_math_pval_4 < alpha):
            warnings.warn(
                "ValidationWarning: Homework effort in Grade 10 (Math, cat 4) was expected "
                f"to be positive and significant. Found Est={g10_math_eff_4:.3f}, P-val={g10_math_pval_4:.3f}",
                UserWarning
            )

    except KeyError as e:
        warnings.warn(f"Could not perform homework pattern validation due to missing coefficient: {e}", UserWarning)

    # --- Validate Parental Investment (Talk Frequency) Patterns ---
    try:
        # English: Expect effect to increase with grade.
        eng_g3_talk = results['Score_Eng'][3].loc['parent_talk_school_4', 'Estimate']
        eng_g10_talk = results['Score_Eng'][10].loc['parent_talk_school_4', 'Estimate']
        if not eng_g10_talk > eng_g3_talk:
            warnings.warn(
                "ValidationWarning: Parental talk effect for English was expected to increase "
                f"from G3 to G10. Found G3={eng_g3_talk:.3f}, G10={eng_g10_talk:.3f}",
                UserWarning
            )

        # Literature: Expect effect to decrease with grade.
        lit_g3_talk = results['Score_Lit'][3].loc['parent_talk_school_4', 'Estimate']
        lit_g10_talk = results['Score_Lit'][10].loc['parent_talk_school_4', 'Estimate']
        if not lit_g10_talk < lit_g3_talk:
            warnings.warn(
                "ValidationWarning: Parental talk effect for Literature was expected to decrease "
                f"from G3 to G10. Found G3={lit_g3_talk:.3f}, G10={lit_g10_talk:.3f}",
                UserWarning
            )
    except KeyError as e:
        warnings.warn(f"Could not perform parental talk pattern validation due to missing coefficient: {e}", UserWarning)

# ------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# ------------------------------------------------------------------------------

def run_effort_investment_ols_estimation(
    full_df: pd.DataFrame,
    model_specifications: Dict[str, Dict[int, Dict[str, Any]]],
    control_vars_list: List[str],
    config: Dict[str, Any]
) -> Dict[str, Dict[int, pd.DataFrame]]:
    """
    Orchestrates the estimation of by-grade effort and investment OLS models.

    This function systematically estimates 9 separate OLS models, one for each
    combination of subject and grade. It creates the specific analysis sample
    for each cell on-the-fly, runs the regression, extracts key results for
    effort and investment variables, and validates the findings against the
    developmental patterns reported in the study.

    Args:
        full_df: The complete, feature-engineered DataFrame from Task 8.
        model_specifications: A nested dictionary of the by-grade model
                              specifications from Task 10.
        control_vars_list: The comprehensive list of all control variable names.
        config: The validated study configuration dictionary.

    Returns:
        A nested dictionary where keys are subject and grade, and values are
        pandas DataFrames with the formatted regression results.
    """
    # --- Input Validation ---
    if not isinstance(full_df, pd.DataFrame):
        raise TypeError("Input 'full_df' must be a pandas DataFrame.")
    if not isinstance(model_specifications, dict):
        raise TypeError("Input 'model_specifications' must be a dictionary.")

    # --- Initialization ---
    all_results: Dict[str, Dict[int, pd.DataFrame]] = {}
    dependent_vars = config['variable_definitions']['dependent_variables']
    grades = config['population_scope']['grades_of_interest']

    # Define the key effort and investment coefficients to extract.
    target_coeffs = [
        'days_homework_cat_2', 'days_homework_cat_3', 'days_homework_cat_4',
        'parent_talk_school_2', 'parent_talk_school_3', 'parent_talk_school_4'
    ]
    # Also include the main PHE dummies for context.
    target_coeffs.extend(config['variable_definitions']['phe_dummy_names'])

    # --- Estimation Loop ---
    for y_var in dependent_vars:
        all_results[y_var] = {}
        for grade in grades:
            print(f"--- Estimating Effort/Investment OLS for: {y_var}, Grade {grade} ---")

            # --- Step 1: Create sample and fit the model ---
            # Retrieve the correct model specification.
            try:
                spec = model_specifications[y_var][grade]
            except KeyError:
                raise ConfigurationError(f"Missing spec for subject '{y_var}', grade {grade}.")

            # Create the analysis sample for this specific cell on-the-fly.
            # 1. Filter by grade.
            grade_df = full_df[full_df['grade'] == grade]
            # 2. Perform listwise deletion using all required variables.
            required_cols = [spec['dependent']] + spec['exog']
            sample = _create_analysis_sample(grade_df, required_cols)

            if sample.empty:
                warnings.warn(f"Sample for {y_var}, Grade {grade} is empty after listwise deletion. Skipping.", UserWarning)
                continue

            # Estimate the model.
            try:
                results = estimate_model(
                    analysis_df=sample,
                    model_spec=spec,
                    config=config
                )
            except Exception as e:
                raise RuntimeError(
                    f"Estimation failed for Effort/Investment OLS on {y_var}, Grade {grade}."
                ) from e

            # --- Step 2: Extract and store results ---
            # Extract results for the coefficients of interest.
            summary_table = _extract_result_summary(
                results=results,
                target_coeffs=[c for c in target_coeffs if c in results.params.index],
                config=config
            )
            all_results[y_var][grade] = summary_table
            print(f"Estimation successful for {y_var}, Grade {grade}.")

    # --- Step 3: Validate expected patterns across grades ---
    print("\n--- Validating Effort/Investment Results Patterns ---")
    _validate_effort_investment_patterns(results=all_results, config=config)
    print("Pattern validation for effort/investment models complete.")

    return all_results


In [None]:
# Task 16: Estimate IV first-stage regressions and validate instrument relevance

# ==============================================================================
# Task 16: Estimate IV first-stage regressions and validate instrument relevance
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 16, Step 2: Helper to extract and validate first-stage results.
# ------------------------------------------------------------------------------

def _extract_and_validate_first_stage(
    results: PanelEffectsResults,
    instrument_var: str,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Extracts key statistics from a first-stage regression and validates them.

    This function processes a fitted first-stage model to extract the
    instrument's coefficient and the weak instrument F-statistic. It then
    validates these against the study's theoretical expectations (sign) and
    methodological requirements (strength).

    Args:
        results: The fitted first-stage model object from `linearmodels`.
        instrument_var: The name of the instrumental variable (e.g., 'GG').
        config: The validated study configuration dictionary.

    Returns:
        A dictionary containing the extracted statistics and validation flags.
    """
    # --- Statistic Extraction ---
    # Extract the coefficient for the instrumental variable.
    pi1_estimate = results.params[instrument_var]
    # Extract the standard error, t-statistic, and p-value.
    std_err = results.std_errors[instrument_var]
    t_stat = results.tstats[instrument_var]
    p_value = results.pvalues[instrument_var]

    # The F-statistic for a single excluded instrument is the square of its t-statistic.
    # This is the correct statistic to use for the weak instrument test in this context.
    f_statistic = t_stat ** 2

    # --- Validation ---
    # 1. Validate instrument strength against the Staiger-Stock rule-of-thumb.
    f_stat_threshold = config['iv_validation']['weak_instrument_threshold_f_stat']
    is_strong = f_statistic >= f_stat_threshold
    if not is_strong:
        warnings.warn(
            f"WeakInstrumentWarning: Instrument '{instrument_var}' may be weak. "
            f"F-statistic ({f_statistic:.2f}) is below the threshold of {f_stat_threshold}.",
            UserWarning
        )

    # 2. Validate the sign of the instrument's coefficient against theory.
    expected_sign = config['instrument_construction']['expected_sign_pi1']
    sign_is_correct = (pi1_estimate < 0) if expected_sign == 'negative' else (pi1_estimate > 0)
    if not sign_is_correct:
        warnings.warn(
            f"UnexpectedSignWarning: Instrument '{instrument_var}' has an unexpected sign. "
            f"Expected '{expected_sign}', but found estimate of {pi1_estimate:.3f}.",
            UserWarning
        )

    # --- Assemble Summary ---
    # Compile all extracted and validated information into a dictionary.
    summary = {
        'pi1_estimate': pi1_estimate,
        'std_error': std_err,
        'p_value': p_value,
        'f_statistic': f_statistic,
        'is_strong': is_strong,
        'sign_is_correct': sign_is_correct
    }
    return summary

# ------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# ------------------------------------------------------------------------------

def run_iv_first_stage_estimation(
    iv_samples: Dict[str, Any],
    iv_specifications: Dict[str, Any],
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the estimation and validation of all first-stage IV models.

    This function systematically runs the first-stage regression for all pooled
    and by-grade IV specifications. It extracts key diagnostic statistics,
    validates instrument relevance (strength and sign), and compiles the
    results into a single, comprehensive summary table.

    Args:
        iv_samples: The nested dictionary of all IV analysis samples from Task 9.
        iv_specifications: The nested dictionary of all IV model specifications
                           from Task 11.
        config: The validated study configuration dictionary.

    Returns:
        A pandas DataFrame summarizing the results of all first-stage regressions.
    """
    # --- Input Validation ---
    if not isinstance(iv_samples, dict) or not isinstance(iv_specifications, dict):
        raise TypeError("Inputs 'iv_samples' and 'iv_specifications' must be dictionaries.")

    # --- Initialization ---
    all_results_data = []
    dependent_vars = config['variable_definitions']['dependent_variables']
    grades = config['population_scope']['grades_of_interest']
    instrument_var = config['variable_definitions']['instrumental_variable']

    # --- Estimation Loop ---
    # Define the model types to iterate over.
    model_types_to_run = {'pooled': dependent_vars, 'by_grade': grades}

    for model_type, items in model_types_to_run.items():
        if model_type == 'pooled':
            # Loop through subjects for pooled models.
            for y_var in items:
                print(f"--- Estimating IV First Stage for: Pooled, {y_var} ---")
                sample = iv_samples[model_type][y_var]
                spec = iv_specifications[model_type][y_var]['first_stage']

                # --- Step 1: Fit the first-stage regression ---
                results = estimate_model(sample, spec, config)

                # --- Step 2: Extract and validate results ---
                summary = _extract_and_validate_first_stage(results, instrument_var, config)
                summary['subject'] = y_var
                summary['grade'] = 'Pooled'
                all_results_data.append(summary)

        elif model_type == 'by_grade':
            # Use nested loops for by-grade models.
            for y_var in dependent_vars:
                for grade in items:
                    print(f"--- Estimating IV First Stage for: {y_var}, Grade {grade} ---")
                    sample = iv_samples[model_type][y_var][grade]
                    spec = iv_specifications[model_type][y_var][grade]['first_stage']

                    if sample.empty:
                        warnings.warn(f"Sample for {y_var}, Grade {grade} is empty. Skipping first stage.", UserWarning)
                        continue

                    # --- Step 1 & 2 ---
                    results = estimate_model(sample, spec, config)
                    summary = _extract_and_validate_first_stage(results, instrument_var, config)
                    summary['subject'] = y_var
                    summary['grade'] = grade
                    all_results_data.append(summary)

    # --- Step 3: Store and report results ---
    # Convert the list of dictionaries into a final, well-structured DataFrame.
    results_df = pd.DataFrame(all_results_data)

    # Set a MultiIndex for clear organization and easy access.
    results_df = results_df.set_index(['subject', 'grade'])

    # Reorder columns for a clean presentation.
    column_order = [
        'pi1_estimate', 'std_error', 'p_value', 'f_statistic',
        'is_strong', 'sign_is_correct'
    ]
    results_df = results_df[column_order]

    print("\n--- IV First-Stage Estimation and Validation Complete ---")
    return results_df


In [None]:
# Task 17: Estimate IV reduced-form regressions (diagnostic)

# ==============================================================================
# Task 17: Estimate IV reduced-form regressions (diagnostic)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 17, Step 2: Helper to extract and validate reduced-form results.
# ------------------------------------------------------------------------------

def _extract_and_validate_reduced_form(
    results: PanelEffectsResults,
    instrument_var: str,
    first_stage_pi1: float,
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Extracts key statistics from a reduced-form regression and validates them.

    This function processes a fitted reduced-form model to extract the
    instrument's direct effect on the outcome. It validates the sign of this
    effect for consistency with the overall IV framework and computes the
    implied Wald estimator as a diagnostic.

    Args:
        results: The fitted reduced-form model object from `linearmodels`.
        instrument_var: The name of the instrumental variable (e.g., 'GG').
        first_stage_pi1: The estimated first-stage coefficient (π̂₁) for the
                         corresponding model.
        config: The validated study configuration dictionary.

    Returns:
        A dictionary containing the extracted statistics and diagnostics.
    """
    # --- Statistic Extraction ---
    # Extract the reduced-form coefficient (ρ̂₁), its standard error, and p-value.
    rho1_estimate = results.params[instrument_var]
    std_err = results.std_errors[instrument_var]
    p_value = results.pvalues[instrument_var]

    # --- Validation and Diagnostics ---
    # 1. Validate the sign of the reduced-form coefficient.
    # Given π̂₁ < 0 and an expected causal effect α̂₁ > 0, we expect ρ̂₁ < 0.
    sign_is_consistent = rho1_estimate < 0
    if not sign_is_consistent:
        warnings.warn(
            f"InconsistentSignWarning: Reduced-form coefficient for '{instrument_var}' "
            f"has an unexpected sign. Expected negative, but found {rho1_estimate:.3f}.",
            UserWarning
        )

    # 2. Compute the implied Wald estimator as a diagnostic.
    # α̂₁_wald = ρ̂₁ / π̂₁
    # Avoid division by zero, though π̂₁ should be non-zero if the instrument is relevant.
    if first_stage_pi1 != 0:
        wald_estimate = rho1_estimate / first_stage_pi1
    else:
        wald_estimate = float('nan')

    # --- Assemble Summary ---
    # Compile all information into a summary dictionary.
    summary = {
        'rho1_estimate': rho1_estimate,
        'std_error': std_err,
        'p_value': p_value,
        'sign_is_consistent': sign_is_consistent,
        'implied_wald_estimate': wald_estimate
    }
    return summary

# ------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# ------------------------------------------------------------------------------

def run_iv_reduced_form_estimation(
    iv_samples: Dict[str, Any],
    iv_specifications: Dict[str, Any],
    first_stage_results: pd.DataFrame,
    config: Dict[str, Any]
) -> Optional[pd.DataFrame]:
    """
    Orchestrates the estimation of all reduced-form IV diagnostic models.

    This function systematically runs the reduced-form regression for all pooled
    and by-grade IV specifications. It serves as a diagnostic check on the
    instrument's effect on the final outcome. The function is conditional on the
    'report_reduced_form' flag in the configuration.

    Args:
        iv_samples: The nested dictionary of all IV analysis samples.
        iv_specifications: The nested dictionary of all IV model specifications.
        first_stage_results: The DataFrame of results from the first-stage
                             estimations (Task 16).
        config: The validated study configuration dictionary.

    Returns:
        A pandas DataFrame summarizing the reduced-form results, or None if
        this diagnostic step is disabled in the configuration.
    """
    # --- Check Configuration ---
    # This entire diagnostic step is optional, controlled by the config.
    if not config['inference_parameters'].get('report_reduced_form', False):
        print("--- Skipping IV Reduced-Form Estimation (disabled in config) ---")
        return None

    # --- Input Validation ---
    if not isinstance(iv_samples, dict) or not isinstance(iv_specifications, dict):
        raise TypeError("Inputs 'iv_samples' and 'iv_specifications' must be dictionaries.")
    if not isinstance(first_stage_results, pd.DataFrame):
        raise TypeError("Input 'first_stage_results' must be a pandas DataFrame.")

    # --- Initialization ---
    all_results_data = []
    dependent_vars = config['variable_definitions']['dependent_variables']
    grades = config['population_scope']['grades_of_interest']
    instrument_var = config['variable_definitions']['instrumental_variable']

    # --- Estimation Loop ---
    # Iterate through the same models as the first-stage estimation.
    for y_var in dependent_vars:
        # Pooled Model
        print(f"--- Estimating IV Reduced-Form for: Pooled, {y_var} ---")
        sample = iv_samples['pooled'][y_var]
        spec = iv_specifications['pooled'][y_var]['reduced_form']
        pi1 = first_stage_results.loc[(y_var, 'Pooled'), 'pi1_estimate']

        results = estimate_model(sample, spec, config)
        summary = _extract_and_validate_reduced_form(results, instrument_var, pi1, config)
        summary['subject'] = y_var
        summary['grade'] = 'Pooled'
        all_results_data.append(summary)

        # By-Grade Models
        for grade in grades:
            print(f"--- Estimating IV Reduced-Form for: {y_var}, Grade {grade} ---")
            try:
                sample = iv_samples['by_grade'][y_var][grade]
                spec = iv_specifications['by_grade'][y_var][grade]['reduced_form']
                pi1 = first_stage_results.loc[(y_var, grade), 'pi1_estimate']
            except KeyError:
                warnings.warn(f"Missing sample, spec, or first-stage result for {y_var}, Grade {grade}. Skipping.", UserWarning)
                continue

            if sample.empty:
                warnings.warn(f"Sample for {y_var}, Grade {grade} is empty. Skipping reduced form.", UserWarning)
                continue

            results = estimate_model(sample, spec, config)
            summary = _extract_and_validate_reduced_form(results, instrument_var, pi1, config)
            summary['subject'] = y_var
            summary['grade'] = grade
            all_results_data.append(summary)

    # --- Step 3: Store and report results ---
    # Convert the list of dictionaries into a final DataFrame.
    results_df = pd.DataFrame(all_results_data).set_index(['subject', 'grade'])

    # Define a clean column order for the final report.
    column_order = [
        'rho1_estimate', 'std_error', 'p_value',
        'sign_is_consistent', 'implied_wald_estimate'
    ]
    results_df = results_df[column_order]

    print("\n--- IV Reduced-Form Estimation and Validation Complete ---")
    return results_df


In [None]:
# Task 18: Estimate IV second-stage regressions and extract causal persistent-effect coefficients

# ==============================================================================
# Task 18: Estimate IV second-stage regressions and extract causal
#          persistent-effect coefficients
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# ------------------------------------------------------------------------------

def run_pooled_iv_estimation(
    iv_samples: Dict[str, Any],
    iv_specifications: Dict[str, Any],
    ols_results: Dict[str, pd.DataFrame],
    first_stage_results: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the estimation of pooled IV second-stage models.

    This function estimates the main causal effect of parental education on
    student achievement for each subject using a 2SLS approach. It leverages
    the generic estimation engine, extracts and formats the results, and
    performs validation checks against theoretical expectations and OLS estimates.

    Args:
        iv_samples: The nested dictionary of IV analysis samples.
        iv_specifications: The nested dictionary of IV model specifications.
        ols_results: The dictionary of pooled OLS results from Task 13 for comparison.
        first_stage_results: The DataFrame of first-stage results from Task 16
                             for diagnostic checks.
        config: The validated study configuration dictionary.

    Returns:
        A dictionary where keys are subject names and values are pandas
        DataFrames containing the formatted 2SLS regression results.
    """
    # --- Input Validation ---
    if not isinstance(iv_samples, dict) or not isinstance(iv_specifications, dict):
        raise TypeError("Inputs 'iv_samples' and 'iv_specifications' must be dictionaries.")
    if not isinstance(ols_results, dict):
        raise TypeError("Input 'ols_results' must be a dictionary.")

    # --- Initialization ---
    all_results: Dict[str, pd.DataFrame] = {}
    dependent_vars = config['variable_definitions']['dependent_variables']
    endog_var = config['variable_definitions']['endogenous_variable']

    # --- Estimation Loop ---
    for y_var in dependent_vars:
        print(f"--- Estimating Pooled IV Second Stage for: {y_var} ---")

        # --- Step 1: Fit the pooled IV second-stage model ---
        # Retrieve the correct sample and the 'second_stage' specification.
        try:
            sample = iv_samples['pooled'][y_var]
            spec = iv_specifications['pooled'][y_var]['second_stage']
        except KeyError:
            raise ConfigurationError(f"Missing sample or spec for Pooled IV on {y_var}.")

        # Check if the instrument for this model was strong.
        is_strong = first_stage_results.loc[(y_var, 'Pooled'), 'is_strong']
        if not is_strong:
            warnings.warn(
                f"Estimating 2SLS for {y_var} with a weak instrument. "
                "Results may be unreliable.", UserWarning
            )

        # Estimate the 2SLS model using the generic estimation engine.
        try:
            results = estimate_model(
                analysis_df=sample,
                model_spec=spec,
                config=config
            )
        except Exception as e:
            raise RuntimeError(f"Estimation failed for Pooled IV on {y_var}.") from e

        # --- Step 2: Extract and format results ---
        # Process the results object into a clean summary table.
        # The target coefficient is the endogenous variable itself.
        summary_table = _extract_result_summary(
            results=results,
            target_coeffs=[endog_var],
            config=config
        )

        all_results[y_var] = summary_table
        print(f"Estimation successful for {y_var}.")

    # --- Step 3: Validate expected causal effects ---
    print("\n--- Validating Pooled IV Results Patterns ---")

    # Check for positive sign and compare magnitudes.
    for y_var, res_df in all_results.items():
        # 1. Check for positive sign.
        estimate = res_df.loc[endog_var, 'Estimate']
        if estimate <= 0:
            warnings.warn(
                f"ValidationWarning: Causal estimate for '{y_var}' is not positive "
                f"as expected (Estimate: {estimate:.3f}).", UserWarning
            )

        # 2. Compare IV estimate to the (scaled) OLS estimate.
        try:
            p2_coeff_name = config['variable_definitions']['phe_dummy_names'][1]
            ols_est_p2 = ols_results[y_var].loc[p2_coeff_name, 'Estimate']
            # The IV estimate is per-level; the OLS p2 estimate is for two levels.
            # We compare the IV estimate to half the OLS p2 estimate.
            scaled_ols_est = ols_est_p2 / 2.0
            if estimate < scaled_ols_est:
                warnings.warn(
                    f"ValidationWarning: IV estimate for '{y_var}' ({estimate:.2f}) is smaller "
                    f"than the scaled OLS estimate ({scaled_ols_est:.2f}). This may suggest "
                    "a different direction of endogeneity bias than expected.", UserWarning
                )
        except (KeyError, IndexError):
            warnings.warn(f"Could not perform OLS comparison for {y_var}.", UserWarning)

    # 3. Check the expected ordering of effect sizes (Eng > Lit > Math).
    try:
        eng_eff = all_results['Score_Eng'].loc[endog_var, 'Estimate']
        lit_eff = all_results['Score_Lit'].loc[endog_var, 'Estimate']
        math_eff = all_results['Score_Math'].loc[endog_var, 'Estimate']

        if not (eng_eff > lit_eff > math_eff):
            warnings.warn(
                "ValidationWarning: The expected ordering of IV causal effect sizes "
                f"(Eng > Lit > Math) was not observed. "
                f"Found: Eng={eng_eff:.2f}, Lit={lit_eff:.2f}, Math={math_eff:.2f}",
                UserWarning
            )
        else:
            print("Result patterns for pooled IV models validated successfully.")
    except KeyError:
        warnings.warn("Could not perform pattern validation due to missing results.", UserWarning)

    return all_results


In [None]:
# Task 19: Estimate IV second-stage by-grade regressions

# ==============================================================================
# Task 19: Estimate IV second-stage by-grade regressions
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 19, Step 3: Helper function to validate result patterns.
# ------------------------------------------------------------------------------

def _validate_by_grade_iv_patterns(
    results: Dict[str, Dict[int, pd.DataFrame]],
    endog_var: str,
    config: Dict[str, Any]
) -> None:
    """
    Validates the results of by-grade IV models against expected age-profiles.

    This function checks for the key dynamic patterns in the causal effect of
    parental education across grades, as reported in the study. It issues
    warnings for significant deviations from these expected trends.

    Args:
        results: A nested dictionary of the regression summary tables.
        endog_var: The name of the endogenous variable ('PHE').
        config: The validated study configuration dictionary.
    """
    # Define the significance threshold for validation checks.
    alpha = config['inference_parameters']['significance_levels']['star_1']

    try:
        # --- Validate Mathematics & Literature: Declining Influence ---
        for subject in ['Score_Math', 'Score_Lit']:
            # Check that the effect in Grade 10 is smaller and/or insignificant.
            est_g6 = results[subject][6].loc[endog_var, 'Estimate']
            est_g10 = results[subject][10].loc[endog_var, 'Estimate']
            pval_g10 = results[subject][10].loc[endog_var, 'P-value']

            if not (est_g10 < est_g6):
                warnings.warn(
                    f"ValidationWarning: Causal effect for '{subject}' was expected to "
                    f"decrease from G6 to G10. Found G6={est_g6:.2f}, G10={est_g10:.2f}",
                    UserWarning
                )
            if pval_g10 < alpha:
                warnings.warn(
                    f"ValidationWarning: Causal effect for '{subject}' in G10 was "
                    f"expected to be insignificant. Found P-val={pval_g10:.3f}",
                    UserWarning
                )

        # --- Validate English: Persistent Influence ---
        # Check that the effect remains positive and significant in all grades.
        for grade in [3, 6, 10]:
            est_eng = results['Score_Eng'][grade].loc[endog_var, 'Estimate']
            pval_eng = results['Score_Eng'][grade].loc[endog_var, 'P-value']
            if not (est_eng > 0 and pval_eng < alpha):
                warnings.warn(
                    f"ValidationWarning: Causal effect for 'Score_Eng' in G{grade} was "
                    f"expected to be positive and significant. Found Est={est_eng:.2f}, "
                    f"P-val={pval_eng:.3f}",
                    UserWarning
                )
    except KeyError as e:
        warnings.warn(f"Could not perform by-grade pattern validation due to missing result: {e}", UserWarning)

# ------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# ------------------------------------------------------------------------------

def run_by_grade_iv_estimation(
    iv_samples: Dict[str, Any],
    iv_specifications: Dict[str, Any],
    first_stage_results: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, Dict[int, pd.DataFrame]]:
    """
    Orchestrates the estimation of by-grade IV second-stage models.

    This function systematically estimates the 9 models required to analyze the
    causal effect of parental education at each grade level (3, 6, 10) for each
    subject. It validates the results against the expected age-profile patterns.

    Args:
        iv_samples: The nested dictionary of all IV analysis samples.
        iv_specifications: The nested dictionary of all IV model specifications.
        first_stage_results: The DataFrame of results from the first-stage
                             estimations (Task 16).
        config: The validated study configuration dictionary.

    Returns:
        A nested dictionary where keys are subject and grade, and values are
        pandas DataFrames with the formatted 2SLS regression results.
    """
    # --- Input Validation ---
    if not isinstance(iv_samples, dict) or not isinstance(iv_specifications, dict):
        raise TypeError("Inputs 'iv_samples' and 'iv_specifications' must be dictionaries.")
    if not isinstance(first_stage_results, pd.DataFrame):
        raise TypeError("Input 'first_stage_results' must be a pandas DataFrame.")

    # --- Initialization ---
    all_results: Dict[str, Dict[int, pd.DataFrame]] = {}
    dependent_vars = config['variable_definitions']['dependent_variables']
    grades = config['population_scope']['grades_of_interest']
    endog_var = config['variable_definitions']['endogenous_variable']

    # --- Estimation Loop ---
    for y_var in dependent_vars:
        all_results[y_var] = {}
        for grade in grades:
            print(f"--- Estimating By-Grade IV Second Stage for: {y_var}, Grade {grade} ---")

            # --- Step 1: Fit the by-grade IV second-stage model ---
            try:
                sample = iv_samples['by_grade'][y_var][grade]
                spec = iv_specifications['by_grade'][y_var][grade]['second_stage']
                is_strong = first_stage_results.loc[(y_var, grade), 'is_strong']
            except KeyError:
                warnings.warn(f"Missing sample, spec, or first-stage result for {y_var}, Grade {grade}. Skipping.", UserWarning)
                continue

            if sample.empty:
                warnings.warn(f"Sample for {y_var}, Grade {grade} is empty. Skipping.", UserWarning)
                continue

            if not is_strong:
                warnings.warn(
                    f"Estimating 2SLS for {y_var}, Grade {grade} with a weak instrument. "
                    "Results may be unreliable.", UserWarning
                )

            try:
                results = estimate_model(sample, spec, config)
            except Exception as e:
                raise RuntimeError(f"Estimation failed for By-Grade IV on {y_var}, Grade {grade}.") from e

            # --- Step 2: Extract and store results ---
            summary_table = _extract_result_summary(
                results=results,
                target_coeffs=[endog_var],
                config=config
            )
            all_results[y_var][grade] = summary_table
            print(f"Estimation successful for {y_var}, Grade {grade}.")

    # --- Step 3: Validate expected age-profile patterns ---
    print("\n--- Validating By-Grade IV Results Patterns ---")
    _validate_by_grade_iv_patterns(results=all_results, endog_var=endog_var, config=config)
    print("Pattern validation for by-grade IV models complete.")

    return all_results


In [None]:
# Task 20: Estimate IV second-stage with grade-by-PHE interactions and extract causal Matthew-effect coefficients

# ==============================================================================
# Task 20: Estimate IV second-stage with grade-by-PHE interactions and
#          extract causal Matthew-effect coefficients
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 20, Step 3: Helper function to validate result patterns.
# ------------------------------------------------------------------------------

def _validate_interaction_iv_patterns(
    results: Dict[str, Dict[Tuple[int, int], pd.DataFrame]],
    endog_var: str,
    config: Dict[str, Any]
) -> None:
    """
    Validates causal Matthew-effect patterns from IV interaction models.

    This function checks the sign and significance of the key interaction
    coefficients (α̂₂) against the study's main causal findings for each
    subject, issuing warnings for any deviations.

    Args:
        results: A nested dictionary of the regression summary tables.
        endog_var: The name of the endogenous variable ('PHE').
        config: The validated study configuration dictionary.
    """
    # Define the significance threshold for validation checks.
    alpha = config['inference_parameters']['significance_levels']['star_1']

    try:
        # --- Validate Mathematics: Declining Causal Influence ---
        math_res_6_10 = results['Score_Math'][(6, 10)]
        math_coeff_name = f"{endog_var}_x_I_6_10"
        math_est = math_res_6_10.loc[math_coeff_name, 'Estimate']
        math_pval = math_res_6_10.loc[math_coeff_name, 'P-value']
        if not (math_est < 0 and math_pval < alpha):
            warnings.warn(
                "ValidationWarning: Causal interaction for Mathematics (6 vs 10) did not show "
                f"the expected significant negative effect. Found Est={math_est:.3f}, P-val={math_pval:.3f}",
                UserWarning
            )

        # --- Validate Literature: Bell-Shaped Causal Influence ---
        lit_res_3_6 = results['Score_Lit'][(3, 6)]
        lit_coeff_name_3_6 = f"{endog_var}_x_I_3_6"
        lit_est_3_6 = lit_res_3_6.loc[lit_coeff_name_3_6, 'Estimate']
        if not (lit_est_3_6 > 0):
            warnings.warn(
                "ValidationWarning: Causal interaction for Literature (3 vs 6) did not show "
                f"the expected positive effect. Found Est={lit_est_3_6:.3f}",
                UserWarning
            )

        lit_res_6_10 = results['Score_Lit'][(6, 10)]
        lit_coeff_name_6_10 = f"{endog_var}_x_I_6_10"
        lit_est_6_10 = lit_res_6_10.loc[lit_coeff_name_6_10, 'Estimate']
        lit_pval_6_10 = lit_res_6_10.loc[lit_coeff_name_6_10, 'P-value']
        if not (lit_est_6_10 < 0 and lit_pval_6_10 < alpha):
            warnings.warn(
                "ValidationWarning: Causal interaction for Literature (6 vs 10) did not show "
                f"the expected significant negative effect. Found Est={lit_est_6_10:.3f}, P-val={lit_pval_6_10:.3f}",
                UserWarning
            )

        # --- Validate English: Increasing Causal Influence ---
        eng_res_3_6 = results['Score_Eng'][(3, 6)]
        eng_coeff_name_3_6 = f"{endog_var}_x_I_3_6"
        eng_est_3_6 = eng_res_3_6.loc[eng_coeff_name_3_6, 'Estimate']
        eng_pval_3_6 = eng_res_3_6.loc[eng_coeff_name_3_6, 'P-value']
        if not (eng_est_3_6 > 0 and eng_pval_3_6 < alpha):
            warnings.warn(
                "ValidationWarning: Causal interaction for English (3 vs 6) did not show "
                f"the expected significant positive effect. Found Est={eng_est_3_6:.3f}, P-val={eng_pval_3_6:.3f}",
                UserWarning
            )
    except KeyError as e:
        warnings.warn(f"Could not perform IV interaction pattern validation due to missing coefficient: {e}", UserWarning)

# ------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# ------------------------------------------------------------------------------

def run_interaction_iv_estimation(
    iv_samples: Dict[str, Any],
    iv_specifications: Dict[str, Any],
    config: Dict[str, Any]
) -> Dict[str, Dict[Tuple[int, int], pd.DataFrame]]:
    """
    Orchestrates the estimation of IV models with grade-by-PHE interactions.

    This function estimates the 9 complex 2SLS models required to derive the
    causal Matthew Effect. It correctly specifies models with two endogenous
    variables (PHE and its interaction) and two instruments, extracts the key
    causal coefficients, and validates them against the study's main findings.

    Args:
        iv_samples: The nested dictionary of IV interaction analysis samples.
        iv_specifications: The nested dictionary of IV interaction model specs.
        config: The validated study configuration dictionary.

    Returns:
        A nested dictionary with the formatted 2SLS interaction model results.
    """
    # --- Input Validation ---
    if not isinstance(iv_samples, dict) or not isinstance(iv_specifications, dict):
        raise TypeError("Inputs 'iv_samples' and 'iv_specifications' must be dictionaries.")

    # --- Initialization ---
    all_results: Dict[str, Dict[Tuple[int, int], pd.DataFrame]] = {}
    dependent_vars = config['variable_definitions']['dependent_variables']
    grade_pairs = config['model_specifications']['grade_pair_definitions']
    endog_var = config['variable_definitions']['endogenous_variable']

    # --- Estimation Loop ---
    for y_var in dependent_vars:
        all_results[y_var] = {}
        for pair in grade_pairs:
            u, v = pair
            print(f"--- Estimating Interaction IV for: {y_var}, Grades {u} vs {v} ---")

            # --- Step 1: Fit the IV interaction model ---
            try:
                sample = iv_samples['interaction'][y_var][pair]
                spec = iv_specifications['interaction'][y_var][pair]['second_stage']
            except KeyError:
                raise ConfigurationError(f"Missing sample or spec for IV Interaction on {y_var}, {pair}.")

            if sample.empty:
                warnings.warn(f"Sample for {y_var}, {pair} is empty. Skipping.", UserWarning)
                continue

            try:
                results = estimate_model(sample, spec, config)
            except Exception as e:
                raise RuntimeError(f"Estimation failed for IV Interaction on {y_var}, {pair}.") from e

            # --- Step 2: Extract and store results ---
            # Programmatically define the names of the key endogenous coefficients.
            indicator = f"I_{u}_{v}"
            interaction_coeff_name = f"{endog_var}_x_{indicator}"
            target_coeffs = [endog_var, interaction_coeff_name]

            summary_table = _extract_result_summary(
                results=results,
                target_coeffs=target_coeffs,
                config=config
            )
            all_results[y_var][pair] = summary_table
            print(f"Estimation successful for {y_var}, Grades {u} vs {v}.")

    # --- Step 3: Validate expected causal Matthew-effect patterns ---
    print("\n--- Validating Causal Matthew-Effect Patterns ---")
    _validate_interaction_iv_patterns(results=all_results, endog_var=endog_var, config=config)
    print("Pattern validation for IV interaction models complete.")

    return all_results


In [None]:
# Task 21: Generate empirical CDF plots by PHE group for each subject and selected grades

# ==============================================================================
# Task 21: Generate empirical CDF plots by PHE group for each subject and
#          selected grades
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 21, Step 1: Helper function to compute an empirical CDF.
# ------------------------------------------------------------------------------

def _compute_ecdf(data: np.ndarray, num_points: int = None) -> Tuple[np.ndarray, np.ndarray]:
    """
    Computes the x and y coordinates for an empirical CDF.

    Args:
        data: A 1D numpy array of numerical data.
        num_points: If specified, the number of points to down-sample the ECDF
                    to for efficient plotting. If None, all points are returned.

    Returns:
        A tuple of two numpy arrays: (sorted_data, cumulative_probabilities).
    """
    # --- ECDF Computation ---
    # Get the number of data points.
    n = len(data)
    if n == 0:
        return np.array([]), np.array([])

    # Sort the data in ascending order.
    x = np.sort(data)

    # Create the cumulative probability vector.
    y = np.arange(1, n + 1) / n

    # --- Down-sampling for Efficiency (Optional) ---
    # If a specific number of points is requested for plotting.
    if num_points is not None and n > num_points:
        # Create an array of evenly spaced indices to sample from the full ECDF.
        indices = np.linspace(0, n - 1, num=num_points, dtype=int)
        # Select the data points at these indices.
        x = x[indices]
        y = y[indices]

    return x, y

# ------------------------------------------------------------------------------
# Task 21, Step 2 & 3: Helper function to plot and save a single CDF figure.
# ------------------------------------------------------------------------------

def _plot_and_save_single_cdf(
    df_subset: pd.DataFrame,
    score_col: str,
    grade: int,
    output_dir: str,
    config: Dict[str, Any]
) -> str:
    """
    Generates and saves a single CDF plot for a given subject and grade.

    This function plots the empirical CDF of scores for three parental
    education (PHE) groups and saves the figure to a specified directory.

    Args:
        df_subset: DataFrame filtered for the specific grade.
        score_col: The name of the score column to plot.
        grade: The grade being plotted.
        output_dir: The directory in which to save the plot.
        config: The validated study configuration dictionary.

    Returns:
        The file path of the saved plot.
    """
    # --- Plotting Setup ---
    # Create a new figure and axes for the plot.
    fig, ax = plt.subplots(figsize=(8, 6), dpi=100)

    # Define plot aesthetics for each PHE group.
    phe_labels = {
        0.0: 'ISCED 0-2 (Lowest)',
        1.0: 'ISCED 3-5 (Middle)',
        2.0: 'ISCED 6-8 (Highest)'
    }
    colors = {0.0: 'red', 1.0: 'black', 2.0: 'blue'}

    # --- Data Processing and Plotting Loop ---
    # Iterate through the PHE groups to plot each CDF curve.
    for phe_level, label in phe_labels.items():
        # Filter the data for the current PHE group.
        scores = df_subset.loc[df_subset['PHE'] == phe_level, score_col].dropna().values

        # If there is no data for this group, skip to the next one.
        if len(scores) == 0:
            continue

        # --- Step 1 (call): Compute the ECDF coordinates ---
        x, y = _compute_ecdf(scores, num_points=1000)

        # --- Step 2 (plot): Plot the CDF curve ---
        ax.plot(x, y, label=label, color=colors[phe_level], linewidth=1.5)

    # --- Final Plot Formatting ---
    # Set titles and labels for clarity and professional appearance.
    subject_name = score_col.replace('Score_', '')
    ax.set_title(f'CDF of {subject_name} Scores, Grade {grade}, by Parental Education', fontsize=14)
    ax.set_xlabel('Test Score', fontsize=12)
    ax.set_ylabel('Cumulative Probability', fontsize=12)
    ax.legend(title='Parental Education Group')
    ax.grid(True, which='both', linestyle='--', linewidth=0.5)
    ax.set_xlim(200, 800) # Set reasonable score limits
    ax.set_ylim(0, 1)

    # --- Step 3: Save the figure ---
    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # Generate a descriptive filename.
    filename = f"cdf_{subject_name.lower()}_grade_{grade}.png"
    filepath = os.path.join(output_dir, filename)

    # Save the figure with high resolution.
    fig.savefig(filepath, dpi=300, bbox_inches='tight')
    plt.close(fig) # Close the figure to free up memory.

    return filepath

# ------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# ------------------------------------------------------------------------------

def generate_cdf_plots(
    full_df: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str
) -> List[str]:
    """
    Orchestrates the generation of all required CDF plots.

    This function iterates through the specified subjects and grades, calling
    a plotting helper to generate and save a CDF plot for each combination.
    It provides a visual check for first-order stochastic dominance.

    Args:
        full_df: The complete, feature-engineered DataFrame.
        config: The validated study configuration dictionary.
        output_dir: The path to the directory where plots will be saved.

    Returns:
        A list of file paths for all the generated plots.
    """
    # --- Input Validation ---
    if not isinstance(full_df, pd.DataFrame):
        raise TypeError("Input 'full_df' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")
    if not isinstance(output_dir, str):
        raise TypeError("Input 'output_dir' must be a string.")

    # --- Initialization ---
    # Define the subjects and grades for which to generate plots.
    subjects_to_plot = config['variable_definitions']['dependent_variables']
    grades_to_plot = [3, 10]

    # List to store the paths of the saved plots.
    saved_plot_paths = []

    # --- Plotting Loop ---
    # Use nested loops to generate a plot for each required combination.
    for subject in subjects_to_plot:
        for grade in grades_to_plot:
            print(f"--- Generating CDF plot for: {subject}, Grade {grade} ---")

            # Filter the DataFrame to the specific grade of interest.
            df_subset = full_df[full_df['grade'] == grade]

            # Call the helper function to create and save the plot.
            filepath = _plot_and_save_single_cdf(
                df_subset=df_subset,
                score_col=subject,
                grade=grade,
                output_dir=output_dir,
                config=config
            )

            # Add the path of the new plot to the list.
            saved_plot_paths.append(filepath)
            print(f"Plot saved to: {filepath}")

    return saved_plot_paths


In [None]:
# Task 22: Generate coefficient plots for global PHE effects and grade-interaction margins

# ==============================================================================
# Task 22: Generate coefficient plots for global PHE effects and
#          grade-interaction margins
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 22, Step 1: Plot global (pooled) PHE effects.
# ------------------------------------------------------------------------------

def _plot_global_phe_effects(
    ols_results: Dict[str, pd.DataFrame],
    iv_results: Dict[str, pd.DataFrame],
    config: Dict[str, Any],
    output_dir: str
) -> str:
    """
    Generates and saves a plot of the global (pooled) PHE effects.

    This function visualizes the main persistent effect of parental education
    from both OLS and IV models for each subject, replicating Figure 2.

    Args:
        ols_results: The dictionary of pooled OLS results from Task 13.
        iv_results: The dictionary of pooled IV results from Task 18.
        config: The validated study configuration dictionary.
        output_dir: The directory in which to save the plot.

    Returns:
        The file path of the saved plot.
    """
    # --- Plotting Setup ---
    # Get subject names and PHE dummy names from config.
    subjects = config['variable_definitions']['dependent_variables']
    p1_name, p2_name = config['variable_definitions']['phe_dummy_names']
    endog_var = config['variable_definitions']['endogenous_variable']

    # Create a 1x3 subplot grid, one for each subject.
    fig, axes = plt.subplots(1, 3, figsize=(18, 6), sharey=True)
    fig.suptitle('Global Impact of Parental Highest Education on Student Scores (Pooled Models)', fontsize=16)

    # --- Plotting Loop ---
    for i, subject in enumerate(subjects):
        ax = axes[i]

        # --- Plot OLS Results ---
        # Extract OLS estimates and confidence intervals.
        ols_res = ols_results[subject]
        ols_p1_est = ols_res.loc[p1_name, 'Estimate']
        ols_p2_est = ols_res.loc[p2_name, 'Estimate']
        ols_p1_err = ols_p1_est - ols_res.loc[p1_name, 'CI Lower']
        ols_p2_err = ols_p2_est - ols_res.loc[p2_name, 'CI Lower']

        # Plot OLS points with error bars.
        ax.errorbar([1, 2], [ols_p1_est, ols_p2_est], yerr=[ols_p1_err, ols_p2_err],
                    fmt='-o', capsize=5, label='OLS Estimate', color='black')

        # --- Plot IV Results ---
        # Extract IV estimate and scale it for comparison.
        iv_res = iv_results[subject]
        iv_est = iv_res.loc[endog_var, 'Estimate']
        iv_err = iv_est - iv_res.loc[endog_var, 'CI Lower']

        # Plot the IV estimate (per level) and the total effect (scaled by 2).
        ax.errorbar([1], [iv_est], yerr=[iv_err], fmt='D', capsize=5,
                    label='IV Estimate (per level)', color='blue', markersize=8)
        ax.errorbar([2], [iv_est * 2], yerr=[iv_err * 2], fmt='D', capsize=5,
                    label='IV Estimate (total effect)', color='blue', alpha=0.5, markersize=8)

        # --- Formatting ---
        ax.set_title(subject.replace('Score_', ''), fontsize=14)
        ax.set_xticks([0, 1, 2])
        ax.set_xticklabels(['ISCED 0-2\n(Reference)', 'ISCED 3-5', 'ISCED 6-8'])
        ax.set_xlabel('Parental Education Level')
        ax.grid(True, which='both', linestyle='--', linewidth=0.5)
        ax.axhline(0, color='black', linewidth=0.8)

    # Set common Y-label and legend.
    axes[0].set_ylabel('Score Increase (points)')
    handles, labels = axes[0].get_legend_handles_labels()
    fig.legend(handles, labels, loc='lower center', ncol=4, bbox_to_anchor=(0.5, -0.05))

    # --- Save Figure ---
    filepath = os.path.join(output_dir, "coefplot_global_effects.png")
    fig.savefig(filepath, dpi=300, bbox_inches='tight')
    plt.close(fig)

    return filepath

# ------------------------------------------------------------------------------
# Task 22, Step 2: Plot grade-interaction marginal effects.
# ------------------------------------------------------------------------------

def _plot_interaction_effects(
    ols_results: Dict[str, Dict[Tuple[int, int], pd.DataFrame]],
    iv_results: Dict[str, Dict[Tuple[int, int], pd.DataFrame]],
    config: Dict[str, Any],
    output_dir: str
) -> str:
    """
    Generates and saves a plot of the grade-interaction marginal effects.

    This function visualizes the Matthew Effect from both OLS and IV models,
    replicating Figures 3 and 8.

    Args:
        ols_results: The dictionary of interaction OLS results.
        iv_results: The dictionary of interaction IV results.
        config: The validated study configuration dictionary.
        output_dir: The directory in which to save the plot.

    Returns:
        The file path of the saved plot.
    """
    # --- Plotting Setup ---
    subjects = config['variable_definitions']['dependent_variables']
    grade_pairs = config['model_specifications']['grade_pair_definitions']
    endog_var = config['variable_definitions']['endogenous_variable']
    p2_name = config['variable_definitions']['phe_dummy_names'][1]

    fig, axes = plt.subplots(len(subjects), len(grade_pairs), figsize=(20, 15), sharex=True, sharey=True)
    fig.suptitle('Marginal Impact of Parental Education Across Grades (Matthew Effect)', fontsize=18)

    # --- Plotting Loop ---
    for i, subject in enumerate(subjects):
        for j, pair in enumerate(grade_pairs):
            ax = axes[i, j]
            u, v = pair

            # --- Plot OLS Interaction ---
            ols_res = ols_results[subject][pair]
            ols_coeff_name = f"{p2_name}_x_I_{u}_{v}"
            ols_est = ols_res.loc[ols_coeff_name, 'Estimate']
            ols_err = ols_est - ols_res.loc[ols_coeff_name, 'CI Lower']
            ax.errorbar([0], [ols_est], yerr=[ols_err], fmt='o', capsize=5, label='OLS', color='black')

            # --- Plot IV Interaction ---
            iv_res = iv_results[subject][pair]
            iv_coeff_name = f"{endog_var}_x_I_{u}_{v}"
            iv_est = iv_res.loc[iv_coeff_name, 'Estimate']
            iv_err = iv_est - iv_res.loc[iv_coeff_name, 'CI Lower']
            ax.errorbar([1], [iv_est], yerr=[iv_err], fmt='D', capsize=5, label='IV', color='blue', markersize=8)

            # --- Formatting ---
            ax.axhline(0, color='red', linestyle='--', linewidth=1)
            ax.set_title(f'{subject.replace("Score_", "")}\nGrades {u} vs {v}', fontsize=12)
            ax.grid(True, linestyle='--', linewidth=0.5)

    # Set common labels and layout adjustments.
    for ax in axes[-1, :]:
        ax.set_xticks([0, 1])
        ax.set_xticklabels(['OLS', 'IV'])
    for ax in axes[:, 0]:
        ax.set_ylabel('Marginal Change in PHE Effect (points)')

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])

    # --- Save Figure ---
    filepath = os.path.join(output_dir, "coefplot_interaction_effects.png")
    fig.savefig(filepath, dpi=300, bbox_inches='tight')
    plt.close(fig)

    return filepath

# ------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# ------------------------------------------------------------------------------

def generate_coefficient_plots(
    ols_results_pooled: Dict[str, pd.DataFrame],
    iv_results_pooled: Dict[str, pd.DataFrame],
    ols_results_interaction: Dict[str, Dict[Tuple[int, int], pd.DataFrame]],
    iv_results_interaction: Dict[str, Dict[Tuple[int, int], pd.DataFrame]],
    config: Dict[str, Any],
    output_dir: str
) -> List[str]:
    """
    Orchestrates the generation of all primary coefficient plots.

    This function creates and saves two key visualizations:
    1. A plot of the global (pooled) effects of parental education.
    2. A grid of plots showing the grade-interaction (Matthew Effect) margins.

    Args:
        ols_results_pooled: Results from the pooled OLS models.
        iv_results_pooled: Results from the pooled IV models.
        ols_results_interaction: Results from the interaction OLS models.
        iv_results_interaction: Results from the interaction IV models.
        config: The validated study configuration dictionary.
        output_dir: The path to the directory where plots will be saved.

    Returns:
        A list of file paths for all the generated plots.
    """
    # --- Input Validation ---
    if not all(isinstance(arg, dict) for arg in [ols_results_pooled, iv_results_pooled, ols_results_interaction, iv_results_interaction]):
        raise TypeError("All results inputs must be dictionaries.")
    if not isinstance(output_dir, str):
        raise TypeError("Input 'output_dir' must be a string.")

    # --- Plot Generation ---
    saved_paths = []

    # --- Step 1: Generate and save the global effects plot ---
    print("--- Generating global effects coefficient plot ---")
    global_plot_path = _plot_global_phe_effects(
        ols_results=ols_results_pooled,
        iv_results=iv_results_pooled,
        config=config,
        output_dir=output_dir
    )
    saved_paths.append(global_plot_path)
    print(f"Global effects plot saved to: {global_plot_path}")

    # --- Step 2: Generate and save the interaction effects plot ---
    print("--- Generating interaction effects (Matthew Effect) coefficient plot ---")
    interaction_plot_path = _plot_interaction_effects(
        ols_results=ols_results_interaction,
        iv_results=iv_results_interaction,
        config=config,
        output_dir=output_dir
    )
    saved_paths.append(interaction_plot_path)
    print(f"Interaction effects plot saved to: {interaction_plot_path}")

    return saved_paths


In [None]:
# Task 23: Generate effort and parental-investment effect plots by grade and subject

# ==============================================================================
# Task 23: Generate effort and parental-investment effect plots by grade
#          and subject
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 23, Step 1: Helper to transform results into a tidy DataFrame.
# ------------------------------------------------------------------------------

def _create_tidy_results_df(
    results: Dict[str, Dict[int, pd.DataFrame]],
    target_vars: List[str]
) -> pd.DataFrame:
    """
    Transforms nested regression results into a single tidy DataFrame for plotting.

    Args:
        results: A nested dictionary of regression summary tables.
        target_vars: A list of base variable names to extract (e.g.,
                     'days_homework_cat').

    Returns:
        A tidy pandas DataFrame with one row per coefficient per model.
    """
    # --- Data Transformation ---
    tidy_data = []
    # Iterate through the nested results dictionary.
    for subject, grade_results in results.items():
        for grade, res_df in grade_results.items():
            # Add the reference category (estimate=0) for each target variable.
            for var in target_vars:
                tidy_data.append({
                    'subject': subject.replace('Score_', ''), 'grade': grade,
                    'variable': var, 'category': 1, 'estimate': 0.0,
                    'ci_lower': 0.0, 'ci_upper': 0.0
                })

            # Process the actual coefficients from the results table.
            for idx, row in res_df.iterrows():
                # Parse the variable name and category from the coefficient index.
                for var in target_vars:
                    if idx.startswith(var):
                        try:
                            category = int(idx.rsplit('_', 1)[1])
                            tidy_data.append({
                                'subject': subject.replace('Score_', ''), 'grade': grade,
                                'variable': var, 'category': category,
                                'estimate': row['Estimate'],
                                'ci_lower': row['CI Lower'],
                                'ci_upper': row['CI Upper']
                            })
                        except (ValueError, IndexError):
                            continue # Not a parsable coefficient

    return pd.DataFrame(tidy_data)

# ------------------------------------------------------------------------------
# Task 23, Step 2 & 3: Generic plotting function.
# ------------------------------------------------------------------------------

def _plot_by_grade_effects(
    tidy_df: pd.DataFrame,
    variable_name: str,
    x_labels: Dict[int, str],
    title: str,
    output_dir: str,
    filename: str
) -> str:
    """
    Generic function to plot coefficient estimates by grade across subjects.

    Args:
        tidy_df: The tidy DataFrame of results.
        variable_name: The specific variable to plot (e.g., 'days_homework_cat').
        x_labels: A dictionary mapping category numbers to readable labels.
        title: The main title for the figure.
        output_dir: The directory in which to save the plot.
        filename: The name for the saved plot file.

    Returns:
        The file path of the saved plot.
    """
    # --- Plotting Setup ---
    # Filter the data to the specific variable of interest.
    plot_data = tidy_df[tidy_df['variable'] == variable_name].sort_values(by=['subject', 'grade', 'category'])
    subjects = plot_data['subject'].unique()

    # Create a 1xN subplot grid.
    fig, axes = plt.subplots(1, len(subjects), figsize=(20, 6), sharey=True)
    fig.suptitle(title, fontsize=16)

    # Define aesthetics for each grade's line.
    grade_styles = {
        3: {'color': 'black', 'linestyle': ':', 'marker': 'o'},
        6: {'color': 'gray', 'linestyle': '--', 'marker': 's'},
        10: {'color': 'blue', 'linestyle': '-', 'marker': '^'}
    }

    # --- Plotting Loop ---
    for i, subject in enumerate(subjects):
        ax = axes[i]
        subject_data = plot_data[plot_data['subject'] == subject]

        # Plot a separate line for each grade.
        for grade in sorted(subject_data['grade'].unique()):
            grade_data = subject_data[subject_data['grade'] == grade]
            style = grade_styles[grade]

            # Plot the point estimates.
            ax.plot(grade_data['category'], grade_data['estimate'],
                    label=f'Grade {grade}', **style)

            # Plot the 95% confidence interval as a shaded region.
            ax.fill_between(grade_data['category'], grade_data['ci_lower'], grade_data['ci_upper'],
                            color=style['color'], alpha=0.1)

        # --- Formatting ---
        ax.set_title(subject, fontsize=14)
        ax.axhline(0, color='red', linestyle='--', linewidth=1)
        ax.set_xticks(list(x_labels.keys()))
        ax.set_xticklabels(list(x_labels.values()), rotation=30, ha='right')
        ax.grid(True, linestyle='--', linewidth=0.5)
        ax.legend()

    axes[0].set_ylabel('Effect on Score (points)')

    # --- Save Figure ---
    filepath = os.path.join(output_dir, filename)
    fig.savefig(filepath, dpi=300, bbox_inches='tight')
    plt.close(fig)

    return filepath

# ------------------------------------------------------------------------------
# Task 23, Orchestrator Function
# ------------------------------------------------------------------------------

def generate_effort_investment_plots(
    effort_investment_results: Dict[str, Dict[int, pd.DataFrame]],
    config: Dict[str, Any],
    output_dir: str
) -> List[str]:
    """
    Orchestrates the generation of effort and parental investment plots.

    This function creates and saves two key visualizations:
    1. A plot showing the effect of student homework effort across grades.
    2. A plot showing the effect of parental investment across grades.

    Args:
        effort_investment_results: The nested dictionary of results from Task 15.
        config: The validated study configuration dictionary.
        output_dir: The path to the directory where plots will be saved.

    Returns:
        A list of file paths for all the generated plots.
    """
    # --- Input Validation ---
    if not isinstance(effort_investment_results, dict):
        raise TypeError("Input 'effort_investment_results' must be a dictionary.")
    if not isinstance(output_dir, str):
        raise TypeError("Input 'output_dir' must be a string.")

    # --- Step 1: Create a single tidy DataFrame from all results ---
    target_vars = ['days_homework_cat', 'parent_talk_school']
    tidy_results = _create_tidy_results_df(effort_investment_results, target_vars)

    # --- Plot Generation ---
    saved_paths = []
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 2: Plot effort (homework) profiles ---
    print("--- Generating student effort (homework) effect plot ---")
    homework_labels = {
        1: '1 day\nor less', 2: '2-3 days', 3: '4-5 days', 4: '5+ days'
    }
    effort_plot_path = _plot_by_grade_effects(
        tidy_df=tidy_results,
        variable_name='days_homework_cat',
        x_labels=homework_labels,
        title='Impact of Student Effort (Homework Days) on Achievement',
        output_dir=output_dir,
        filename='coefplot_effort_effects.png'
    )
    saved_paths.append(effort_plot_path)
    print(f"Effort plot saved to: {effort_plot_path}")

    # --- Step 3: Plot parental investment (talk frequency) profiles ---
    print("--- Generating parental investment (talk frequency) effect plot ---")
    talk_labels = {
        1: 'Never', 2: '1-2/month', 3: '1-2/week', 4: 'Daily'
    }
    investment_plot_path = _plot_by_grade_effects(
        tidy_df=tidy_results,
        variable_name='parent_talk_school',
        x_labels=talk_labels,
        title='Impact of Parental Investment (Talk About School) on Achievement',
        output_dir=output_dir,
        filename='coefplot_investment_effects.png'
    )
    saved_paths.append(investment_plot_path)
    print(f"Investment plot saved to: {investment_plot_path}")

    return saved_paths


In [None]:
# Task 24: Perform composition and balance checks across grades and years

# ==============================================================================
# Task 24: Perform composition and balance checks across grades and years
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 24, Step 1: Tabulate PHE shares by grade and academic year.
# ------------------------------------------------------------------------------

def check_phe_composition(
    cleansed_df: pd.DataFrame,
    deviation_threshold: float = 0.05
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Checks the stability of PHE group composition across grades and years.

    This function calculates the proportion of students in each PHE category for
    each grade-year cell and compares it to the overall sample proportions. It
    flags any cell where the deviation exceeds a specified threshold, helping to
    diagnose potential selection or composition effects.

    Args:
        cleansed_df: The fully cleansed DataFrame from Task 4.
        deviation_threshold: The maximum acceptable proportional deviation
                             (e.g., 0.05 for 5 percentage points).

    Returns:
        A tuple containing:
        - pd.DataFrame: A table showing PHE shares for each grade-year cell.
        - List[str]: A list of formatted strings describing any significant
                     deviations found.
    """
    # --- Overall Proportions ---
    # Calculate the overall share of each PHE category in the entire dataset.
    overall_shares = cleansed_df['PHE'].value_counts(normalize=True).sort_index()

    # --- Cell-wise Proportions ---
    # Group by grade and year and calculate PHE shares within each cell.
    cell_shares = cleansed_df.groupby(['grade', 'academic_year'])['PHE'] \
                             .value_counts(normalize=True).unstack(level='PHE')

    # --- Deviation Check ---
    # Initialize a list to store warnings for significant deviations.
    deviations = []
    # Iterate through each cell to compare its shares to the overall shares.
    for idx, row in cell_shares.iterrows():
        for phe_level, overall_share in overall_shares.items():
            cell_share = row.get(phe_level, 0)
            # Check if the absolute difference exceeds the threshold.
            if abs(cell_share - overall_share) > deviation_threshold:
                deviations.append(
                    f"Composition deviation in {idx}: PHE level {phe_level} share is "
                    f"{cell_share:.3f} (vs. overall {overall_share:.3f})"
                )

    return cell_shares, deviations

# ------------------------------------------------------------------------------
# Task 24, Step 2: Re-validate score normalization.
# ------------------------------------------------------------------------------

def revalidate_score_normalization(
    cleansed_df: pd.DataFrame,
    config: Dict[str, Any]
) -> List[str]:
    """
    Re-validates score normalization on the final cleansed DataFrame.

    This function serves as a final sanity check to ensure that no prior
    cleaning steps have inadvertently distorted the score distributions. It
    verifies that the mean and standard deviation of scores in each
    grade-year cell remain within the specified tolerance.

    Args:
        cleansed_df: The fully cleansed DataFrame from Task 4.
        config: The validated study configuration dictionary.

    Returns:
        A list of formatted strings describing any normalization failures.
    """
    # This logic is identical to _validate_score_normalization from Task 2.
    # It is intentionally repeated here as a final diagnostic on the clean data.

    # Extract parameters from the configuration.
    score_cols = config['variable_definitions']['dependent_variables']
    mean_target = config['psychometric_model']['standardization_mean']
    std_target = config['psychometric_model']['standardization_std_dev']
    tolerance = 3.0
    min_sample_size = 30

    # Group by grade and year.
    grouped = cleansed_df.groupby(['grade', 'academic_year'])

    # Initialize a list to store violation messages.
    violations = []

    # Iterate through each score column to validate its normalization.
    for score_col in score_cols:
        stats = grouped[score_col].agg(['mean', 'std', 'count']).reset_index()
        stats_to_check = stats[stats['count'] >= min_sample_size]

        # Check for mean violations.
        mean_violations = stats_to_check[~np.isclose(stats_to_check['mean'], mean_target, atol=tolerance)]
        for _, row in mean_violations.iterrows():
            violations.append(
                f"Mean violation for '{score_col}' in (grade={row['grade']}, year={row['academic_year']}): "
                f"Found {row['mean']:.2f}"
            )

        # Check for standard deviation violations.
        std_violations = stats_to_check[~np.isclose(stats_to_check['std'], std_target, atol=tolerance)]
        for _, row in std_violations.iterrows():
            violations.append(
                f"Std Dev violation for '{score_col}' in (grade={row['grade']}, year={row['academic_year']}): "
                f"Found {row['std']:.2f}"
            )

    return violations

# ------------------------------------------------------------------------------
# Task 24, Step 3: Document and report sample inclusion flow.
# ------------------------------------------------------------------------------

def report_sample_inclusion_flow(
    cleansing_log: Dict[str, Any],
    sample_size_log: Dict[str, Any]
) -> pd.DataFrame:
    """
    Creates a publication-ready table documenting the sample inclusion flow.

    This function synthesizes logs from the data cleaning and sample creation
    tasks to produce a clear and transparent report on sample attrition at
    each stage of the pipeline.

    Args:
        cleansing_log: The log dictionary from the `cleanse_student_survey_data`
                       function (Task 4).
        sample_size_log: The log dictionary of final sample sizes from the
                         `define_analysis_samples` function (Task 9).

    Returns:
        A pandas DataFrame summarizing the sample flow.
    """
    # --- Data Extraction from Logs ---
    # Create a list of dictionaries to build the DataFrame.
    flow_data = [
        {'Stage': '1. Initial Raw Data', 'N': cleansing_log['initial_rows']},
        {'Stage': '2. Kept: Parent Response Complete', 'N': cleansing_log['rows_after_parent_response_filter']},
        {'Stage': '3. Kept: Valid Grade (3, 6, 10)', 'N': cleansing_log['rows_after_grade_filter']},
        {'Stage': '4. Kept: After Exclusions & ID Checks', 'N': cleansing_log['rows_after_exclusions_and_id_check']},
        {'Stage': '5. Final Clean Sample (after deduplication)', 'N': cleansing_log['final_rows_after_deduplication']},
    ]

    # Add final model-specific sample sizes.
    for model_type, subjects in sample_size_log.items():
        for subject, sizes in subjects['pooled'].items():
            flow_data.append({
                'Stage': f"6. Final Sample: {model_type.upper()} Pooled ({subject.replace('Score_','')})",
                'N': sizes
            })

    # --- DataFrame Creation ---
    # Convert the list to a DataFrame and set the stage as the index.
    flow_df = pd.DataFrame(flow_data).set_index('Stage')

    return flow_df

# ------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# ------------------------------------------------------------------------------

def perform_diagnostic_checks(
    cleansed_df: pd.DataFrame,
    cleansing_log: Dict[str, Any],
    sample_size_log: Dict[str, Any],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the execution of all diagnostic and balance checks.

    This function runs a series of checks on the final clean data to verify
    sample composition stability and score normalization. It also compiles a
    comprehensive sample attrition table.

    Args:
        cleansed_df: The fully cleansed DataFrame from Task 4.
        cleansing_log: The log from the data cleansing process (Task 4).
        sample_size_log: The log of final sample sizes (Task 9).
        config: The validated study configuration dictionary.

    Returns:
        A dictionary containing all diagnostic outputs: the composition table,
        a list of normalization failures, and the sample flow table.
    """
    # --- Input Validation ---
    if not isinstance(cleansed_df, pd.DataFrame):
        raise TypeError("Input 'cleansed_df' must be a pandas DataFrame.")
    if not all(isinstance(arg, dict) for arg in [cleansing_log, sample_size_log, config]):
        raise TypeError("Log and config inputs must be dictionaries.")

    # --- Step 1: Check PHE Composition ---
    print("--- Checking PHE composition stability ---")
    composition_table, composition_warnings = check_phe_composition(cleansed_df)
    if composition_warnings:
        print("Composition warnings found:")
        for warning in composition_warnings:
            print(f"  - {warning}")

    # --- Step 2: Re-validate Score Normalization ---
    print("--- Re-validating score normalization ---")
    normalization_failures = revalidate_score_normalization(cleansed_df, config)
    if normalization_failures:
        print("Score normalization failures found:")
        for failure in normalization_failures:
            print(f"  - {failure}")

    # --- Step 3: Report Sample Inclusion Flow ---
    print("--- Generating sample inclusion flow report ---")
    sample_flow_table = report_sample_inclusion_flow(cleansing_log, sample_size_log)

    # --- Assemble Final Output ---
    # Compile all diagnostic results into a single dictionary.
    diagnostics = {
        "composition_table": composition_table,
        "composition_warnings": composition_warnings,
        "normalization_failures": normalization_failures,
        "sample_flow_table": sample_flow_table
    }

    print("\n--- Diagnostic Checks Complete ---")
    return diagnostics


In [None]:
# Task 25: Design and implement an orchestrator function for the end-to-end pipeline

# ==============================================================================
# Task 25: Design and implement an orchestrator function for the end-to-end
#          pipeline
# ==============================================================================

def run_parental_environment_study_pipeline(
    student_parent_survey_raw: pd.DataFrame,
    barro_lee_raw: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str = "results"
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """
    Orchestrates the full end-to-end replication of the study's main analysis.

    This master function executes the entire research pipeline, from raw data
    ingestion and validation through feature engineering, all primary model
    estimations (OLS and IV), and the generation of all main results,
    diagnostics, and figures. This amended version is designed to be a component
    of a larger workflow, returning not only the final results but also key
    intermediate data artifacts required for subsequent downstream tasks, such
    as robustness checks, without requiring redundant computation.

    Args:
        student_parent_survey_raw: The raw student survey DataFrame, unprocessed.
        barro_lee_raw: The raw Barro-Lee instrument DataFrame, unprocessed.
        config: The study's master configuration dictionary, which governs all
                steps of the pipeline.
        output_dir: The root directory where output artifacts, such as figures,
                    will be saved.

    Returns:
        A tuple containing two dictionaries:
        1.  A deeply nested dictionary holding all primary outputs of the
            analysis, including every result table, diagnostic report, and
            a list of paths to the generated figures.
        2.  A dictionary of key intermediate artifacts required for downstream
            tasks. This includes the final feature-engineered DataFrame
            ('final_df'), the complete list of generated control variable names
            ('control_vars_list'), and the log from the initial data cleansing
            ('cleansing_log').

    Raises:
        ConfigurationError: If the config dictionary is invalid.
        SchemaValidationError: If input data does not match the expected schema.
        DataIntegrityError: If input data violates content rules.
        RuntimeError: If any estimation step fails.
    """
    # --- Stage 0: Reproducibility and Initialization ---
    # Set the random seed from the configuration to ensure reproducibility for any
    # stochastic processes (though this specific pipeline is deterministic).
    np.random.seed(config['reproducibility']['random_seed'])

    # Initialize the main output dictionary to structure all final results.
    pipeline_output: Dict[str, Any] = {
        "results": {}, "diagnostics": {}, "figures": {},
        "metadata": {"version": config['reproducibility']['version']}
    }

    # --- Stage 1: Validation ---
    # This stage acts as a gatekeeper, ensuring all inputs are valid before proceeding.
    print("--- STAGE 1: VALIDATION ---")
    # Task 1: Validate the configuration dictionary itself. Halts if invalid.
    validate_configuration(config)
    # Task 2: Validate the schema and content of the primary student survey data.
    validate_student_survey_data(student_parent_survey_raw, config)
    # Task 3: Validate the schema and content of the external instrument data.
    validate_barro_lee_data(barro_lee_raw, student_parent_survey_raw, config)
    print("Validation successful.\n")

    # --- Stage 2: Data Preparation and Feature Engineering ---
    # This stage transforms the raw data into the final, analysis-ready dataset.
    print("--- STAGE 2: DATA PREPARATION AND FEATURE ENGINEERING ---")
    # Task 4: Apply inclusion criteria and remove invalid/duplicate records.
    cleansed_df, cleansing_log = cleanse_student_survey_data(student_parent_survey_raw, config)
    # Task 5: Standardize formats (e.g., country codes) and document reference categories.
    harmonized_df, _, ref_cats = harmonize_and_standardize_data(cleansed_df, config)
    # Task 6: Construct the main endogenous variable (PHE) and its dummies.
    df_with_phe, _ = construct_parental_education_variable(harmonized_df, config)
    # Task 7: Construct all dummy variables for the control set.
    df_with_controls, control_list = construct_control_variables(df_with_phe, ref_cats)
    # Task 8: Construct the instrumental variable (GG) by merging with Barro-Lee data.
    final_df, _ = construct_instrumental_variable(df_with_controls, barro_lee_raw, config)
    print("Data preparation and feature engineering complete.\n")

    # --- Stage 3: Model Specification and Sample Definition ---
    # This stage prepares the blueprints for all models and the specific datasets they run on.
    print("--- STAGE 3: MODEL SPECIFICATION AND SAMPLE DEFINITION ---")
    # Task 9: Define all model-specific analysis samples using listwise deletion.
    analysis_samples, sample_size_log = define_analysis_samples(final_df, control_list, config)
    # Task 10: Programmatically generate specifications for all OLS models.
    ols_specs = specify_ols_models(control_list, config)
    # Task 11: Programmatically generate specifications for all IV models.
    iv_specs = specify_iv_models(control_list, config)
    print("Model specifications and samples are ready.\n")

    # --- Stage 4: Econometric Estimation ---
    # This stage executes all primary regressions of the study.
    print("--- STAGE 4: ECONOMETRIC ESTIMATION ---")
    # Task 13-20: Run all primary OLS and IV estimations.
    pooled_ols_res = run_pooled_ols_estimation(analysis_samples['ols']['pooled'], ols_specs['pooled'], config)
    inter_ols_res = run_interaction_ols_estimation(analysis_samples['ols']['interaction'], ols_specs['interaction'], config)
    effort_res = run_effort_investment_ols_estimation(final_df, ols_specs['effort_investment'], control_list, config)
    first_stage_res = run_iv_first_stage_estimation(analysis_samples['iv'], iv_specs, config)
    reduced_form_res = run_iv_reduced_form_estimation(analysis_samples['iv'], iv_specs, first_stage_res, config)
    pooled_iv_res = run_pooled_iv_estimation(analysis_samples['iv']['pooled'], iv_specs['pooled'], pooled_ols_res, first_stage_res, config)
    by_grade_iv_res = run_by_grade_iv_estimation(analysis_samples['iv']['by_grade'], iv_specs['by_grade'], first_stage_res, config)
    inter_iv_res = run_interaction_iv_estimation(analysis_samples['iv']['interaction'], iv_specs['interaction'], config)
    print("All models estimated successfully.\n")

    # Store all estimation results in the main output dictionary.
    pipeline_output["results"] = {
        "pooled_ols": pooled_ols_res, "interaction_ols": inter_ols_res,
        "effort_investment_ols": effort_res, "iv_first_stage": first_stage_res,
        "iv_reduced_form": reduced_form_res, "pooled_iv": pooled_iv_res,
        "by_grade_iv": by_grade_iv_res, "interaction_iv": inter_iv_res
    }

    # --- Stage 5: Visualization and Final Diagnostics ---
    # This stage generates all figures and summary diagnostic tables.
    print("--- STAGE 5: VISUALIZATION AND DIAGNOSTICS ---")
    # Define the output directory for figures.
    figure_dir = os.path.join(output_dir, "figures")
    # Task 21-23: Generate all plots.
    cdf_paths = generate_cdf_plots(final_df, config, figure_dir)
    coef_plot_paths = generate_coefficient_plots(pooled_ols_res, pooled_iv_res, inter_ols_res, inter_iv_res, config, figure_dir)
    effort_plot_paths = generate_effort_investment_plots(effort_res, config, figure_dir)

    # Store the paths to the generated figures.
    pipeline_output["figures"] = {
        "cdf_plots": cdf_paths, "coefficient_plots": coef_plot_paths,
        "effort_investment_plots": effort_plot_paths
    }

    # Task 24: Perform final diagnostic checks on the data.
    diagnostics = perform_diagnostic_checks(cleansed_df, cleansing_log, sample_size_log, config)
    pipeline_output["diagnostics"] = diagnostics
    print("Visualization and diagnostics complete.\n")

    # --- Assemble Intermediate Artifacts for Return ---
    # Package key objects needed by subsequent pipeline stages (e.g., robustness checks).
    intermediate_artifacts = {
        "final_df": final_df,
        "control_vars_list": control_list,
        "cleansing_log": cleansing_log,
        "sample_size_log": sample_size_log
    }

    # Return both the final outputs and the intermediate artifacts.
    return pipeline_output, intermediate_artifacts


In [None]:
# Task 26: Execute robustness analyses and sensitivity checks

# ==============================================================================
# Task 26: Execute robustness analyses and sensitivity checks
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 26, Step 1: Re-estimate pooled OLS with and without household controls.
# ------------------------------------------------------------------------------

def check_control_sensitivity_ols(
    full_df: pd.DataFrame,
    full_ols_results: Dict[str, pd.DataFrame],
    control_vars_list: List[str],
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Checks the sensitivity of pooled OLS estimates to the inclusion of controls.

    This function re-estimates the pooled OLS models using a minimal set of
    controls (excluding household characteristics) and compares the key PHE
    coefficients to the full model estimates to assess robustness. It performs
    the re-estimation from scratch to ensure a complete and accurate comparison.

    Args:
        full_df: The complete, feature-engineered DataFrame.
        full_ols_results: The results from the main pooled OLS estimation (Task 13).
        control_vars_list: The full list of all control variable names.
        config: The validated study configuration dictionary.

    Returns:
        A DataFrame comparing the estimates, standard errors, and sample sizes
        from the full and minimal models.
    """
    # --- Input Validation ---
    if not isinstance(full_df, pd.DataFrame):
        raise TypeError("Input 'full_df' must be a pandas DataFrame.")
    if not isinstance(full_ols_results, dict):
        raise TypeError("Input 'full_ols_results' must be a dictionary.")

    print("--- Running OLS control sensitivity check ---")

    # --- 1. Define Minimal Control Set ---
    # Programmatically determine the minimal control set by excluding household controls.
    household_controls_base = config['variable_definitions']['household_controls']
    minimal_control_vars = [
        var for var in control_vars_list
        if not any(var.startswith(hc_base) for hc_base in household_controls_base)
    ]

    # --- 2. Initialize Comparison Data Storage ---
    comparison_data = []

    # --- 3. Re-estimation Loop for each Subject ---
    for subject in config['variable_definitions']['dependent_variables']:
        print(f"  - Re-estimating for {subject} with minimal controls...")

        # --- 3a. Specify Minimal Model ---
        # Generate the model specification for the minimal control set.
        minimal_spec = specify_ols_models(minimal_control_vars, config)['pooled'][subject]

        # --- 3b. Create Minimal Sample ---
        # The sample must be recreated as listwise deletion depends on the variable set.
        required_cols_minimal = [minimal_spec['dependent']] + minimal_spec['exog']
        minimal_sample = _create_analysis_sample(full_df, required_cols_minimal)

        # --- 3c. Estimate Minimal Model ---
        minimal_results_obj = estimate_model(minimal_sample, minimal_spec, config)

        # --- 3d. Extract Minimal Model Results ---
        minimal_summary = _extract_result_summary(
            minimal_results_obj,
            config['variable_definitions']['phe_dummy_names'],
            config
        )

        # --- 3e. Compare Full vs. Minimal Results ---
        for coeff in config['variable_definitions']['phe_dummy_names']:
            # Extract statistics from the full model results.
            full_est = full_ols_results[subject].loc[coeff, 'Estimate']
            full_se = full_ols_results[subject].loc[coeff, 'Std. Error']
            full_n = full_ols_results[subject].loc[coeff, 'N']

            # Extract statistics from the minimal model results.
            minimal_est = minimal_summary.loc[coeff, 'Estimate']
            minimal_se = minimal_summary.loc[coeff, 'Std. Error']
            minimal_n = minimal_summary.loc[coeff, 'N']

            # Calculate the percentage change in the coefficient estimate.
            pct_change = ((minimal_est - full_est) / full_est) * 100 if full_est != 0 else float('inf')

            # Append all comparison data to the list.
            comparison_data.append({
                'Subject': subject.replace('Score_', ''),
                'Coefficient': coeff,
                'Estimate_Full': full_est,
                'StdErr_Full': full_se,
                'N_Full': int(full_n),
                'Estimate_Minimal': minimal_est,
                'StdErr_Minimal': minimal_se,
                'N_Minimal': int(minimal_n),
                'Pct_Change': pct_change,
            })

    # --- 4. Create Final Comparison DataFrame ---
    # Convert the list of dictionaries to a well-structured DataFrame.
    comparison_df = pd.DataFrame(comparison_data).set_index(['Subject', 'Coefficient'])

    return comparison_df

# ------------------------------------------------------------------------------
# Task 26, Step 2: Validate IV diagnostics robustness across specifications.
# ------------------------------------------------------------------------------

def review_iv_diagnostics(first_stage_results: pd.DataFrame) -> pd.DataFrame:
    """
    Reviews and reports on the validity of the instrument across all models.

    This function systematically checks the first-stage results for all IV
    models, filtering and returning a DataFrame that lists any specifications
    that fail the weak instrument test (F-stat < 10) or have an unexpected
    sign on the instrument coefficient.

    Args:
        first_stage_results: The DataFrame of results from Task 16.

    Returns:
        A DataFrame listing all IV specifications that failed one or more
        diagnostic checks. An empty DataFrame is returned if all checks pass.
    """
    print("--- Reviewing all IV first-stage diagnostics ---")

    # --- Input Validation ---
    if not isinstance(first_stage_results, pd.DataFrame):
        raise TypeError("Input 'first_stage_results' must be a pandas DataFrame.")
    required_cols = ['is_strong', 'sign_is_correct']
    if not all(col in first_stage_results.columns for col in required_cols):
        raise SchemaValidationError(f"Missing required diagnostic columns: {required_cols}")

    # --- Filtering for Failures ---
    # Create a boolean mask to identify any models that fail either check.
    failed_diagnostics_mask = (
        (first_stage_results['is_strong'] == False) |
        (first_stage_results['sign_is_correct'] == False)
    )

    # Apply the mask to get the DataFrame of failed models.
    failed_diagnostics_df = first_stage_results[failed_diagnostics_mask]

    # --- Reporting ---
    if failed_diagnostics_df.empty:
        print("All IV first-stage diagnostics passed successfully.")
    else:
        # Issue a single, comprehensive warning listing the failed models.
        warnings.warn(
            f"Found {len(failed_diagnostics_df)} IV specification(s) with potential "
            f"instrument validity issues. See the returned DataFrame for details.",
            UserWarning
        )

    return failed_diagnostics_df

# ------------------------------------------------------------------------------
# Task 26, Step 3: Validate effort and investment pattern consistency.
# ------------------------------------------------------------------------------

def validate_developmental_patterns(
    effort_investment_results: Dict[str, Dict[int, pd.DataFrame]],
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Formally validates results against expected developmental patterns.

    This function programmatically tests a series of hypotheses about the signs
    and trends of effort and investment coefficients across grades and subjects,
    producing a detailed, auditable report of the outcomes.

    Args:
        effort_investment_results: The nested dictionary of results from Task 15.
        config: The validated study configuration dictionary.

    Returns:
        A DataFrame detailing each pattern check and its outcome.
    """
    print("--- Validating developmental patterns in effort/investment models ---")

    # --- Initialization ---
    report_data = []
    alpha = config['inference_parameters']['significance_levels']['star_1']

    # --- Define Hypotheses to Test ---
    # Each dictionary defines a specific pattern to check.
    hypotheses = [
        {'check': 'Homework G3 Sign (Math)', 'subject': 'Score_Math', 'grade': 3, 'coeff': 'days_homework_cat_4', 'test': 'est < 0'},
        {'check': 'Homework G10 Sign (Math)', 'subject': 'Score_Math', 'grade': 10, 'coeff': 'days_homework_cat_4', 'test': 'est > 0 and pval < alpha'},
        {'check': 'Parental Talk Trend (English)', 'subject': 'Score_Eng', 'coeff': 'parent_talk_school_4', 'test': 'est_g10 > est_g3'},
        {'check': 'Parental Talk Trend (Literature)', 'subject': 'Score_Lit', 'coeff': 'parent_talk_school_4', 'test': 'est_g10 < est_g3'},
    ]

    # --- Validation Loop ---
    for hypo in hypotheses:
        passed = False
        details = "Check not performed due to missing data."
        try:
            if 'trend' in hypo['check'].lower():
                # This is a cross-grade comparison.
                est_g3 = effort_investment_results[hypo['subject']][3].loc[hypo['coeff'], 'Estimate']
                est_g10 = effort_investment_results[hypo['subject']][10].loc[hypo['coeff'], 'Estimate']
                passed = eval(hypo['test'])
                details = f"G3 Est: {est_g3:.3f}, G10 Est: {est_g10:.3f}"
            else:
                # This is a single-cell check.
                res_df = effort_investment_results[hypo['subject']][hypo['grade']]
                est = res_df.loc[hypo['coeff'], 'Estimate']
                pval = res_df.loc[hypo['coeff'], 'P-value']
                passed = eval(hypo['test'])
                details = f"Est: {est:.3f}, P-val: {pval:.3f}"
        except KeyError:
            # Gracefully handle cases where a coefficient might be missing.
            pass

        report_data.append({
            'Pattern Check': hypo['check'],
            'Expectation': hypo['test'],
            'Passed': passed,
            'Details': details
        })

    return pd.DataFrame(report_data).set_index('Pattern Check')

# ------------------------------------------------------------------------------
# Task 26, Orchestrator Function
# ------------------------------------------------------------------------------

def run_robustness_checks(
    pipeline_outputs: Dict[str, Any],
    full_df: pd.DataFrame,
    control_vars_list: List[str],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrates the execution of all robustness and sensitivity checks.

    This function runs a series of post-estimation checks to validate the
    stability and credibility of the main findings. It assesses sensitivity
    to control variable inclusion, reviews all IV diagnostics, and formally
    validates key developmental patterns in the data.

    Args:
        pipeline_outputs: The main dictionary of results from the pipeline.
        full_df: The complete, feature-engineered DataFrame.
        control_vars_list: The full list of all control variable names.
        config: The validated study configuration dictionary.

    Returns:
        A dictionary containing the outputs of all robustness checks, including
        DataFrames for sensitivity analysis and diagnostic failures, and a
        report on developmental pattern validation.
    """
    print("\n--- STAGE 6: ROBUSTNESS AND SENSITIVITY CHECKS ---")

    # --- Step 1: OLS Control Sensitivity ---
    sensitivity_report = check_control_sensitivity_ols(
        full_df=full_df,
        full_ols_results=pipeline_outputs['results']['pooled_ols'],
        control_vars_list=control_vars_list,
        config=config
    )

    # --- Step 2: IV Diagnostics Review ---
    iv_diagnostic_failures = review_iv_diagnostics(
        first_stage_results=pipeline_outputs['results']['iv_first_stage']
    )

    # --- Step 3: Developmental Pattern Validation ---
    pattern_report = validate_developmental_patterns(
        effort_investment_results=pipeline_outputs['results']['effort_investment_ols'],
        config=config
    )

    # --- Assemble Final Output ---
    robustness_results = {
        "ols_control_sensitivity": sensitivity_report,
        "iv_diagnostic_failures": iv_diagnostic_failures,
        "developmental_pattern_validation": pattern_report
    }

    print("\n--- Robustness Checks Complete ---")
    return robustness_results


In [None]:
# Task 27: Synthesize and interpret the final results in alignment with the study's conclusions

# ==============================================================================
# Task 27: Synthesize and interpret the final results in alignment with the
#          study's conclusions
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 27, Step 1: Helper to summarize persistent effects.
# ------------------------------------------------------------------------------

def _summarize_persistent_effects(
    pooled_iv_results: Dict[str, pd.DataFrame],
    config: Dict[str, Any]
) -> str:
    """
    Generates a narrative summary of the causal persistent effects.

    Args:
        pooled_iv_results: The dictionary of pooled IV results from Task 18.
        config: The validated study configuration dictionary.

    Returns:
        A formatted string summarizing the main causal findings.
    """
    # --- Initialization ---
    summary_lines = ["Summary of Causal Persistent Effects (Pooled IV Models):"]
    endog_var = config['variable_definitions']['endogenous_variable']
    alpha = config['inference_parameters']['significance_levels']['star_3'] # p < 0.01

    # --- Narrative Generation Loop ---
    try:
        for subject_raw, res_df in pooled_iv_results.items():
            subject = subject_raw.replace('Score_', '')
            estimate = res_df.loc[endog_var, 'Estimate']
            p_value = res_df.loc[endog_var, 'P-value']

            # Determine significance level for the narrative.
            if p_value < alpha:
                significance_text = f"(p < {alpha})"
            else:
                significance_text = f"(p = {p_value:.3f})"

            # Assemble the sentence.
            line = (
                f"- For {subject}, a one-level increase in parental education (PHE) has a "
                f"statistically significant causal effect, increasing student scores by "
                f"{estimate:.2f} points {significance_text}."
            )
            summary_lines.append(line)

        # --- Validate Ordering ---
        eng_eff = pooled_iv_results['Score_Eng'].loc[endog_var, 'Estimate']
        lit_eff = pooled_iv_results['Score_Lit'].loc[endog_var, 'Estimate']
        math_eff = pooled_iv_results['Score_Math'].loc[endog_var, 'Estimate']

        if eng_eff > lit_eff > math_eff:
            ordering_text = "As expected, the magnitude of this effect is largest in English, followed by Literature, and then Mathematics."
        else:
            ordering_text = "The expected ordering of effect sizes (English > Literature > Math) was not observed."
        summary_lines.append(f"- {ordering_text}")

    except KeyError as e:
        return f"Could not generate persistent effect summary due to missing result: {e}"

    return "\n".join(summary_lines)

# ------------------------------------------------------------------------------
# Task 27, Step 2: Helper to summarize Matthew-effect patterns.
# ------------------------------------------------------------------------------

def _summarize_matthew_effects(
    interaction_iv_results: Dict[str, Dict[tuple, pd.DataFrame]],
    config: Dict[str, Any]
) -> str:
    """
    Generates a narrative summary of the causal Matthew-effect patterns.

    Args:
        interaction_iv_results: The dictionary of interaction IV results.
        config: The validated study configuration dictionary.

    Returns:
        A formatted string summarizing the dynamic findings for each subject.
    """
    # --- Initialization ---
    summary_lines = ["\nSummary of Causal Matthew Effects (Interaction IV Models):"]
    endog_var = config['variable_definitions']['endogenous_variable']
    alpha = config['inference_parameters']['significance_levels']['star_1'] # p < 0.10

    # --- Narrative Generation Loop ---
    try:
        for subject_raw in config['variable_definitions']['dependent_variables']:
            subject = subject_raw.replace('Score_', '')

            # Extract the key interaction coefficients (α̂₂) for the two transitions.
            res_3_6 = interaction_iv_results[subject_raw][(3, 6)]
            res_6_10 = interaction_iv_results[subject_raw][(6, 10)]

            coeff_name_3_6 = f"{endog_var}_x_I_3_6"
            coeff_name_6_10 = f"{endog_var}_x_I_6_10"

            est_3_6 = res_3_6.loc[coeff_name_3_6, 'Estimate']
            pval_3_6 = res_3_6.loc[coeff_name_3_6, 'P-value']
            est_6_10 = res_6_10.loc[coeff_name_6_10, 'Estimate']
            pval_6_10 = res_6_10.loc[coeff_name_6_10, 'P-value']

            # --- Pattern Recognition Logic ---
            line = f"- For {subject}: "
            if est_6_10 < 0 and pval_6_10 < alpha and est_3_6 < 0:
                line += "The influence of parental education shows a pattern of decline, becoming weaker as students age (emancipation effect)."
            elif est_3_6 > 0 and est_6_10 < 0:
                line += "The influence follows a bell-shaped curve, increasing in middle childhood and then declining in adolescence."
            elif est_3_6 > 0 and pval_3_6 < alpha and est_6_10 > 0 and pval_6_10 < alpha:
                line += "The influence is continuously and significantly increasing, indicating a strong Matthew Effect and growing social determinism."
            else:
                line += "The pattern is mixed or not statistically significant."

            summary_lines.append(line)

    except KeyError as e:
        return f"Could not generate Matthew Effect summary due to missing result: {e}"

    return "\n".join(summary_lines)

# ------------------------------------------------------------------------------
# Task 27, Step 3: Helper to confirm policy implications.
# ------------------------------------------------------------------------------

def _confirm_policy_implications(
    matthew_effect_summary: str
) -> str:
    """
    Generates a narrative of policy implications based on the findings.

    Args:
        matthew_effect_summary: The string summary from the previous step.

    Returns:
        A formatted string outlining the policy implications.
    """
    # --- Policy Narrative Generation ---
    implication_lines = ["\nPolicy Implications based on Findings:"]

    # This logic maps the findings from the summary text to policy statements.
    if "English: The influence is continuously and significantly increasing" in matthew_effect_summary:
        implication_lines.append(
            "- The strong Matthew Effect in English suggests that interventions must be early and sustained. "
            "Heavy reliance on foreign language skills in high-stakes admissions may exacerbate inequality of opportunity."
        )

    if "Mathematics: The influence of parental education shows a pattern of decline" in matthew_effect_summary or \
       "Literature: The influence follows a bell-shaped curve" in matthew_effect_summary:
        implication_lines.append(
            "- The declining parental influence in Mathematics and Literature in later grades (adolescence) "
            "indicates a window of opportunity. Remediation programs targeting student effort and cognitive skills "
            "can be effective in these subjects for older students."
        )

    implication_lines.append(
        "- Overall, the results justify a transition from a uniform educational policy to a dynamic, "
        "subject- and age-specific strategy to maximize equality of opportunity."
    )

    return "\n".join(implication_lines)

# ------------------------------------------------------------------------------
# Task 27, Orchestrator Function
# ------------------------------------------------------------------------------

def synthesize_final_report(
    pipeline_outputs: Dict[str, Any],
    config: Dict[str, Any]
) -> str:
    """
    Synthesizes and interprets the final results into a narrative report.

    This function takes the complete set of pipeline outputs and generates a
    human-readable text summary that covers the main causal findings, the
    dynamic Matthew-effect patterns, and their direct policy implications,
    aligning with the conclusions of the original study.

    Args:
        pipeline_outputs: The main dictionary of results from the pipeline.
        config: The validated study configuration dictionary.

    Returns:
        A single, formatted string containing the complete summary report.
    """
    # --- Input Validation ---
    if not isinstance(pipeline_outputs, dict):
        raise TypeError("Input 'pipeline_outputs' must be a dictionary.")
    if 'results' not in pipeline_outputs:
        raise ValueError("'results' key missing from pipeline_outputs.")

    # --- Report Generation ---
    # Initialize a list to hold the sections of the report.
    report_sections = [f"Final Report: Synthesis of Findings (Version {config['reproducibility']['version']})", "="*80]

    # --- Step 1: Summarize Persistent Effects ---
    persistent_summary = _summarize_persistent_effects(
        pooled_iv_results=pipeline_outputs['results']['pooled_iv'],
        config=config
    )
    report_sections.append(persistent_summary)

    # --- Step 2: Summarize Matthew-Effect Patterns ---
    matthew_effect_summary = _summarize_matthew_effects(
        interaction_iv_results=pipeline_outputs['results']['interaction_iv'],
        config=config
    )
    report_sections.append(matthew_effect_summary)

    # --- Step 3: Confirm Policy Implications ---
    policy_implications = _confirm_policy_implications(matthew_effect_summary)
    report_sections.append(policy_implications)

    # --- Final Assembly ---
    # Join all sections into a single string.
    final_report = "\n".join(report_sections)

    print("\n--- Final Report Generated Successfully ---")
    print(final_report)

    return final_report


In [None]:
# Top-Level Orchestrator

# ==============================================================================
# Top-Level Orchestrator: A single function to run the entire study
# ==============================================================================

def execute_full_study(
    student_parent_survey_raw: pd.DataFrame,
    barro_lee_raw: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str = "results"
) -> Dict[str, Any]:
    """
    Executes the entire end-to-end research pipeline for the study.

    This top-level orchestrator serves as the main entry point for the project.
    It runs the three primary stages of the analysis in sequence:
    1.  The main analytical pipeline (data validation, cleaning, feature
        engineering, all OLS and IV estimations, and primary visualizations).
    2.  A suite of robustness and sensitivity checks on the main results.
    3.  The synthesis of a final, human-readable report summarizing the key
        findings and policy implications.

    Args:
        student_parent_survey_raw: The raw student survey DataFrame.
        barro_lee_raw: The raw Barro-Lee instrument DataFrame.
        config: The study's master configuration dictionary.
        output_dir: The root directory to save all output artifacts (e.g., figures).

    Returns:
        A comprehensive dictionary containing all outputs from the entire
        study, including all result tables, diagnostic reports, robustness
        checks, figure paths, and the final narrative report.
    """
    # --- Input Validation ---
    # Ensure the primary inputs are of the correct type before starting.
    if not isinstance(student_parent_survey_raw, pd.DataFrame):
        raise TypeError("Input 'student_parent_survey_raw' must be a pandas DataFrame.")
    if not isinstance(barro_lee_raw, pd.DataFrame):
        raise TypeError("Input 'barro_lee_raw' must be a pandas DataFrame.")
    if not isinstance(config, dict):
        raise TypeError("Input 'config' must be a dictionary.")

    # --- Master Execution Block ---
    # Wrap the entire process in a try-except block for robust, top-level error handling.
    try:
        # --- STAGE I: MAIN ANALYTICAL PIPELINE ---
        # Announce the start of the main analysis stage.
        print("\n" + "="*80)
        print("Executing Main Analytical Pipeline (Task 25)")
        print("="*80)

        # Execute the main pipeline. This single call performs all tasks from 1 to 25.
        # It returns the primary results and the intermediate artifacts needed for the next stage.
        main_pipeline_outputs, intermediate_artifacts = run_parental_environment_study_pipeline(
            student_parent_survey_raw=student_parent_survey_raw,
            barro_lee_raw=barro_lee_raw,
            config=config,
            output_dir=output_dir
        )

        # --- STAGE II: ROBUSTNESS AND SENSITIVITY CHECKS ---
        # Announce the start of the robustness checks stage.
        print("\n" + "="*80)
        print("Executing Robustness and Sensitivity Checks (Task 26)")
        print("="*80)

        # Execute the robustness checks from Task 26.
        # This function now receives the necessary data artifacts directly from the
        # previous stage, eliminating redundant processing and ensuring consistency.
        robustness_outputs = run_robustness_checks(
            pipeline_outputs=main_pipeline_outputs,
            full_df=intermediate_artifacts["final_df"],
            control_vars_list=intermediate_artifacts["control_vars_list"],
            config=config
        )

        # --- STAGE III: FINAL REPORT SYNTHESIS ---
        # Announce the start of the final report generation stage.
        print("\n" + "="*80)
        print("Synthesizing Final Report (Task 27)")
        print("="*80)

        # Execute the report synthesis from Task 27, which translates the
        # quantitative results into a narrative summary.
        final_report_text = synthesize_final_report(
            pipeline_outputs=main_pipeline_outputs,
            config=config
        )

        # --- Final Output Assembly ---
        # Combine all outputs from all stages into a single, comprehensive dictionary.
        full_study_output = {
            "main_analysis": main_pipeline_outputs,
            "robustness_checks": robustness_outputs,
            "final_summary_report": final_report_text
        }

        # Announce the successful completion of the entire pipeline.
        print("\n" + "="*80)
        print("TOP-LEVEL ORCHESTRATOR COMPLETED SUCCESSFULLY")
        print("="*80)

        # Return the final, complete archive of the study's execution.
        return full_study_output

    except Exception as e:
        # If any part of the pipeline fails, provide a clear, top-level error message.
        print("\n" + "="*80)
        print("!!! TOP-LEVEL ORCHESTRATOR FAILED !!!")
        print(f"An unrecoverable error occurred during the pipeline execution: {e}")
        print("="*80)
        # Re-raise the exception to halt execution and provide a full traceback for debugging.
        raise
