# **`README.md`**

# Who Connects Global Aid? The Hidden Geometry of 10 Million Transactions

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2512.17243-b31b1b.svg)](https://arxiv.org/abs/2512.17243)
[![Journal](https://img.shields.io/badge/Journal-ArXiv%20Preprint-003366)](https://arxiv.org/abs/2512.17243)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/who_connects_global_aid)
[![Discipline](https://img.shields.io/badge/Discipline-Network%20Science%20%7C%20Development%20Economics-00529B)](https://github.com/chirindaopensource/who_connects_global_aid)
[![Data Sources](https://img.shields.io/badge/Data-IATI%20Registry-lightgrey)](https://iatiregistry.org/)
[![Data Sources](https://img.shields.io/badge/Data-Web%20Hyperlink%20Graph-lightgrey)](https://commoncrawl.org/)
[![Core Method](https://img.shields.io/badge/Method-Bipartite%20Projection-orange)](https://github.com/chirindaopensource/who_connects_global_aid)
[![Analysis](https://img.shields.io/badge/Analysis-Node2Vec%20Embedding-red)](https://github.com/chirindaopensource/who_connects_global_aid)
[![Validation](https://img.shields.io/badge/Validation-PageRank%20Correlation-green)](https://github.com/chirindaopensource/who_connects_global_aid)
[![Robustness](https://img.shields.io/badge/Robustness-Sensitivity%20Analysis-yellow)](https://github.com/chirindaopensource/who_connects_global_aid)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![SciPy](https://img.shields.io/badge/scipy-%230054A6.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![NetworkX](https://img.shields.io/badge/networkx-%230054A6.svg?style=flat&logo=networkx&logoColor=white)](https://networkx.org/)
[![UMAP](https://img.shields.io/badge/umap-%230054A6.svg?style=flat&logo=python&logoColor=white)](https://umap-learn.readthedocs.io/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/who_connects_global_aid`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Who Connects Global Aid? The Hidden Geometry of 10 Million Transactions"** by:

*   **Paul X. McCarthy** (League of Scholars, Sydney; UNSW)
*   **Xian Gong** (University of Technology Sydney)
*   **Marian-Andrei Rizoiu** (University of Technology Sydney)
*   **Paolo Boldi** (Università degli Studi di Milano)

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from the ingestion and cleansing of massive IATI transaction logs to the rigorous construction of bipartite networks, high-dimensional structural embedding via Node2Vec, and the identification of critical "knowledge brokers" through centrality analysis.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `run_global_aid_pipeline`](#key-callable-run_global_aid_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in McCarthy et al. (2025). The core of this repository is the iPython Notebook `who_connects_global_aid_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed to map the topological structure of the global aid ecosystem, moving beyond aggregate volume flows to reveal the hidden geometry of influence.

The paper addresses the structural complexity of the modern aid system, characterized by a "triple revolution" of new goals, instruments, and actors. This codebase operationalizes the paper's framework, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Cleanse and normalize over 10 million transaction records from the International Aid Transparency Initiative (IATI).
-   Construct a bipartite Provider-Receiver network and project it into a Provider-Provider co-investment graph.
-   Learn high-dimensional structural embeddings using Node2Vec to capture functional roles.
-   Reveal functional clusters (Humanitarian vs. Development) via UMAP dimensionality reduction.
-   Identify the "Solar System" of central actors using Hub Scores (HITS) and Betweenness Centrality.
-   Validate findings externally by correlating offline structural influence with online web authority (PageRank).

## Theoretical Background

The implemented methods combine techniques from Network Science, Graph Theory, and Machine Learning.

**1. Bipartite Network Construction:**
The system is modeled as a bipartite graph $G = (U, V, E)$, where $U$ represents Provider organisations and $V$ represents Receiver organisations. An edge $e_{uv}$ exists if a financial transaction occurs, weighted by frequency.

**2. One-Mode Projection:**
To analyze donor relationships, the bipartite graph is projected into a Provider-Provider co-occurrence network $P = MM^T$, where $M$ is the incidence matrix of providers appearing in shared contexts (countries or sectors).

**3. Structural Embedding (Node2Vec):**
The topology is encoded into a low-dimensional vector space $f: V \to \mathbb{R}^d$ by optimizing a Skip-gram objective over biased random walks:
$$ \max_f \sum_{u \in V} \log \Pr(N_S(u) | f(u)) $$
This captures structural equivalence, grouping actors with similar network roles regardless of direct connectivity.

**4. Centrality and Brokerage:**
Influence is quantified using HITS Hub Scores (for the "Solar System" ranking) and Betweenness Centrality (for identifying brokers):
$$ C_B(v) = \sum_{s \neq v \neq t} \frac{\sigma_{st}(v)}{\sigma_{st}} $$
This metric highlights actors like J-PAL and the Hewlett Foundation that bridge structural holes between disparate clusters.

Below is a diagram which summarizes the proposed approach:

## Features

The provided iPython Notebook (`who_connects_global_aid_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The pipeline is decomposed into 29 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters (ER thresholds, Node2Vec hyperparameters, UMAP settings) are managed in an external `config.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks schema integrity, type coercion feasibility, and referential integrity.
-   **Deterministic Entity Resolution:** Implements a robust TF-IDF blocking and Cosine Similarity pipeline to resolve organisation names to canonical identities.
-   **High-Performance Computing:** Utilizes Numba-accelerated random walk generation and sparse matrix algebra for scalability.
-   **Reproducible Artifacts:** Generates structured dictionaries, serializable outputs, and cryptographic manifests for every intermediate result.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Cleansing (Tasks 1-8):** Ingests raw IATI data, validates schemas, enforces temporal scope (1967-2025), and normalizes multi-valued fields.
2.  **Integration & ER (Tasks 9-10):** Joins transactions to activity contexts and performs entity resolution to construct a canonical organisation map.
3.  **Descriptive Analysis (Tasks 11-12):** Computes geographic transaction density and longitudinal instrument evolution.
4.  **Network Construction (Tasks 13-15):** Builds the bipartite graph and projects it into a provider co-occurrence network.
5.  **Topology & Embeddings (Tasks 16-17):** Generates Node2Vec embeddings and projects them to 2D using UMAP to reveal functional clusters.
6.  **Centrality & Ranking (Tasks 18-20):** Computes HITS and Betweenness centrality to construct the "Solar System" ranking.
7.  **Analysis & Validation (Tasks 21-26):** Analyzes subgroups (Universities/Foundations), characterizes broker networks (Hewlett), and correlates findings with web PageRank.
8.  **Orchestration & Provenance (Tasks 27-29):** Manages the end-to-end execution and packages reproducible outputs.

## Core Components (Notebook Structure)

The `who_connects_global_aid_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 29 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `run_global_aid_pipeline`

The project is designed around a single, top-level user-facing interface function:

-   **`run_global_aid_pipeline`:** This master orchestrator function runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, managing data flow between cleansing, graph construction, and analysis modules.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `networkx`, `gensim`, `scikit-learn`, `umap-learn`, `numba`, `pyyaml`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/who_connects_global_aid.git
    cd who_connects_global_aid
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy networkx gensim scikit-learn umap-learn numba pyyaml
    ```

## Input Data Structure

The pipeline requires four primary DataFrames:
1.  **`df_transactions_raw`**: IATI transaction elements (Value, Date, Provider, Receiver).
2.  **`df_activities_raw`**: Activity metadata (Sectors, Countries, Instruments).
3.  **`df_organisations_raw`**: Master organisation list for entity resolution.
4.  **`df_web_links_raw`**: Web crawl data for external validation.

## Usage

The `who_connects_global_aid_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `execute_master_workflow` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load the master configuration from the YAML file.
    import yaml
    with open('config.yaml', 'r') as f:
        config = yaml.safe_load(f)
    
    # 2. Load raw datasets (Example using synthetic generator provided in the notebook)
    # In production, load from CSV/Parquet: pd.read_csv(...)
    data_inputs = generate_synthetic_data()
    
    # 3. Execute the entire replication study.
    results = execute_master_workflow(
        df_transactions_raw=data_inputs["df_transactions_raw"],
        df_activities_raw=data_inputs["df_activities_raw"],
        df_organisations_raw=data_inputs["df_organisations_raw"],
        df_web_links_raw=data_inputs["df_web_links_raw"],
        config=config,
        output_root="./global_aid_study_output"
    )
    
    # 4. Access results
    if results.success:
        print(f"Validation Correlation: {results.baseline_results.artifacts['correlation'].pearson_r}")
```

## Output Structure

The pipeline returns a `MasterWorkflowResults` object containing:
-   **`baseline_results`**: A `PipelineResults` object with all artifacts from the primary run (Graph, Embeddings, Centrality Tables).
-   **`robustness_results`**: A `RobustnessArtifact` containing sensitivity analysis metrics.
-   **`provenance_artifact`**: A `ProvenanceArtifact` with cryptographic hashes and metadata summaries.

## Project Structure

```
who_connects_global_aid/
│
├── who_connects_global_aid_draft.ipynb  # Main implementation notebook
├── config.yaml                          # Master configuration file
├── requirements.txt                     # Python package dependencies
│
├── LICENSE                              # MIT Project License File
└── README.md                            # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify study parameters such as:
-   **Scope:** `start_year`, `end_year`.
-   **Entity Resolution:** `fuzzy_matching` threshold.
-   **Node2Vec:** `p`, `q`, `walk_length`, `num_walks`.
-   **UMAP:** `n_neighbors`, `min_dist`.
-   **Projection:** Context definitions (Country vs. Sector).

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Temporal Dynamics:** Analyzing the evolution of the network topology over sliding windows.
-   **Multiplex Networks:** Modeling different financial instruments (Grants vs. Loans) as distinct layers.
-   **Impact Analysis:** Correlating network centrality with aid effectiveness outcomes.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{mccarthy2025who,
  title={Who Connects Global Aid? The Hidden Geometry of 10 Million Transactions},
  author={McCarthy, Paul X. and Gong, Xian and Rizoiu, Marian-Andrei and Boldi, Paolo},
  journal={arXiv preprint arXiv:2512.17243},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). Global Aid Network Analysis Pipeline: An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/who_connects_global_aid
```

## Acknowledgments

-   Credit to **Paul X. McCarthy, Xian Gong, Marian-Andrei Rizoiu, and Paolo Boldi** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, NetworkX, Gensim, and UMAP**.

--

*This README was generated based on the structure and content of the `who_connects_global_aid_draft.ipynb` notebook and follows best practices for research software documentation.*


# Paper

Title: "*Who Connects Global Aid? The Hidden Geometry of 10 Million Transactions*"

Authors: Paul X. McCarthy, Xian Gong, Marian-Andrei Rizoiu, Paolo Boldi

E-Journal Submission Date: 19 December 2025

Link: https://arxiv.org/abs/2512.17243

Abstract:

The global aid system functions as a complex and evolving ecosystem; yet widespread understanding of its structure remains largely limited to aggregate volume flows. Here we map the network topology of global aid using a dataset of unprecedented scale: over 10 million transaction records connecting 2,456 publishing organisations across 230 countries between 1967 and 2025. We apply bipartite projection and dimensionality reduction to reveal the geometry of the system and unveil hidden patterns. This exposes distinct functional clusters that are otherwise sparsely connected. We find that while governments and multilateral agencies provide the primary resources, a small set of knowledge brokers provide the critical connectivity. Universities and research foundations specifically act as essential bridges between disparate islands of implementers and funders. We identify a core solar system of 25 central actors who drive this connectivity including unanticipated brokers like J-PAL and the Hewlett Foundation. These findings demonstrate that influence in the aid ecosystem flows through structural connectivity as much as financial volume. Our results provide a new framework for donors to identify strategic partners that accelerate coordination and evidence diffusion across the global network.

# Summary

**Subject:** Network Topology and Systemic Risk in the Global Aid Ecosystem

**Methodology:** Bipartite Graph Projection, `node2vec` Embeddings, UMAP Dimensionality Reduction

**Data Source:** International Aid Transparency Initiative (IATI) Registry (1967–2025)

### Executive Abstract
This paper represents a paradigm shift in the econometric analysis of development assistance. Historically, we have modeled global aid as a linear function of aggregate volume—essentially, "who spends the most." The authors argue this volume-based view is an artifact of outdated statistical aggregation. By treating the aid ecosystem as a complex adaptive system and analyzing over 10 million transaction records, they reveal a "hidden geometry." The core finding is that **structural influence is orthogonal to financial volume.** While governments provide the capital, a small set of "knowledge brokers" (universities and foundations) provide the essential connectivity that prevents the system from fracturing into isolated islands.

--

### The Data and Computational Framework
To understand the topology, the authors moved beyond standard regression models of donor-recipient dyads. They employed a graph-theoretic approach to handle a dataset of unprecedented scale.

*   **The Corpus:** The study utilizes the IATI registry, comprising **10 million transaction records** involving **2,456 publishing organizations** across **230 countries** between 1967 and 2025.
*   **Graph Construction:** The system was modeled as a **bipartite graph** (Provider nodes $\leftrightarrow$ Receiver nodes). To analyze the relationships between organizations, they performed a bipartite projection to create a Provider-Provider co-investment graph. Edges were weighted by frequency of co-occurrence rather than raw dollar volume to emphasize relational strength.
*   **Algorithmic Embedding:** To capture the latent structure, they applied **`node2vec`**, an algorithm that generates high-dimensional vector representations of nodes based on random walks. This captures the "structural equivalence" of actors—grouping them not by geography, but by their topological roles.

### Dimensionality Reduction and The "Hidden Geometry"
Using **Uniform Manifold Approximation and Projection (UMAP)**, the authors projected these high-dimensional embeddings into a 2D manifold. This revealed that the aid ecosystem is not a cohesive whole, but is defined by two primary axes of differentiation:

1.  **The Horizontal Axis (Thematic):** A sharp divide between **Humanitarian** actors (crisis response, short-term) and **Development** institutions (long-term resilience).
2.  **The Vertical Axis (Functional):** A stratification between **Funders** (upstream capital allocators) and **Implementers** (downstream NGOs and contractors).

**The Econometric Implication:** This orthogonality creates distinct "quadrants" with sparse connectivity between them. There are significant "structural holes" where coordination fails, particularly in the transition from crisis relief to long-term development.

### The "Solar System" of Centrality
The authors introduce a "Solar System" visualization to rank organizations based on **Network Centrality (Hub Score)** rather than financial volume. This yields a counter-intuitive architecture:

*   **The Inner Ring (Top 25):** While this core includes financial giants like the World Bank and USAID, it also includes actors with much smaller budgets but disproportionate influence.
*   **The Paradox of Scale:** The analysis identifies a class of **"Knowledge Brokers."** For example, **J-PAL (MIT)** and the **Hewlett Foundation** sit in the "inner ring" of centrality. Despite managing fewer deals and less capital than bilateral agencies, they occupy critical bridging positions (high betweenness centrality).

### The Role of Knowledge Brokers
The paper argues that the resilience of the global aid network depends on these brokers.

*   **Universities and Foundations:** These entities act as the connective tissue. Academic institutions are 85% smaller by deal count than the average but occupy significantly more central positions.
*   **Bridging the Divide:** The analysis shows that actors like the Hewlett Foundation and J-PAL sit on the shortest paths between the disconnected "islands" of the network (e.g., connecting a humanitarian NGO to a development bank).
*   **Evidence Diffusion:** Without these brokers, information (such as evidence from randomized control trials) cannot traverse the structural holes between the development and humanitarian clusters.

### Systemic Risk and Strategic Implications
From a systems engineering perspective, the current architecture exhibits signs of fragility.

*   **Fragility of the Core:** The system relies heavily on a "Donor Core" (USAID, EU, World Bank). In network theory, a hub-and-spoke model is resilient to random failure but highly vulnerable to targeted disruption (e.g., political shifts in a major donor nation).
*   **Resilience via Mesh Topology:** The authors argue for strengthening the "middle layer" of knowledge brokers to create a distributed mesh topology. This allows the network to "reroute" around damaged nodes.
*   **Optimization Strategy:** For donors, the optimal strategy is not merely to fund "doers" (high-volume implementers) but to invest in "connectors." Funding a central broker provides access to the structural integrity of the entire network, effectively de-risking the portfolio.

### Conclusion
The paper concludes that we are transitioning from an era of **"Aid as Charity"** (unidirectional volume transfer) to **"Aid as Network"** (multi-directional connectivity). The metric for success in this new topology is not just capital deployment, but the optimization of brokerage to ensure evidence diffusion and coordination across a fragmented system.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  Who Connects Global Aid? The Hidden Geometry of 10 Million Transactions
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Who Connects Global Aid? The Hidden Geometry
#  of 10 Million Transactions" by Paul X. McCarthy, Xian Gong, Marian-Andrei
#  Rizoiu, and Paolo Boldi (2025). It delivers a computationally rigorous system
#  for mapping the topological structure of the global aid ecosystem, enabling
#  the identification of critical knowledge brokers, the quantification of
#  structural influence versus financial volume, and the detection of functional
#  clusters within the donor-implementer network.
#
#  Core Methodological Components:
#  • Bipartite network construction from large-scale transaction records (IATI)
#  • One-mode projection via co-occurrence frequency weighting
#  • High-dimensional structural embedding using Node2Vec (biased random walks)
#  • Manifold learning and dimensionality reduction via UMAP
#  • Network centrality analysis: Hub Scores (HITS), Betweenness, and Degree
#  • "Solar System" visualization ranking based on structural influence
#  • External validation via web hyperlink graph PageRank correlation
#
#  Technical Implementation Features:
#  • Scalable ETL pipeline for processing 10M+ transaction records
#  • Deterministic entity resolution using TF-IDF blocking and Cosine Similarity
#  • Numba-accelerated random walk generation for efficient graph sampling
#  • Sparse matrix algebra for handling large-scale adjacency structures
#  • Robust artifact management with cryptographic provenance tracking
#  • Comprehensive sensitivity analysis for parameter robustness verification
#
#  Paper Reference:
#  McCarthy, P. X., Gong, X., Rizoiu, M.-A., & Boldi, P. (2025). Who Connects
#  Global Aid? The Hidden Geometry of 10 Million Transactions. arXiv preprint
#  arXiv:2512.17243. https://arxiv.org/abs/2512.17243
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# ------------------------------------------------------------------------------
# Standard Library Imports
# ------------------------------------------------------------------------------
import datetime
import hashlib
import itertools
import json
import logging
import os
import pickle
import random
import re
import time
import traceback
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union

# ------------------------------------------------------------------------------
# Third-Party Scientific Computing & Data Analysis Imports
# ------------------------------------------------------------------------------
import numpy as np
import pandas as pd
import scipy.sparse as sparse
import scipy.stats as stats

# ------------------------------------------------------------------------------
# Network Analysis & Graph Theory Imports
# ------------------------------------------------------------------------------
import networkx as nx

# ------------------------------------------------------------------------------
# Machine Learning & Natural Language Processing Imports
# ------------------------------------------------------------------------------
from gensim.models import Word2Vec
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# ------------------------------------------------------------------------------
# High-Performance Computing & JIT Compilation Imports
# ------------------------------------------------------------------------------
import numba
from numba import njit

# ------------------------------------------------------------------------------
# Manifold Learning & Dimensionality Reduction Imports
# ------------------------------------------------------------------------------
import umap

# ------------------------------------------------------------------------------
# Configure logging for this module
# ------------------------------------------------------------------------------
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


# Implementation

# Draft 1

## **Discussion of the Inputs, Processes, Outputs, and Research Role of Key Callables**

### **Task 1: `validate_study_configuration`**

*   **Inputs:**
    *   `config` (Dict): The master configuration dictionary containing study parameters (scope, filters, hyperparameters).
    *   `input_data` (Dict): A dictionary of loaded raw DataFrames (`df_transactions_raw`, etc.).
*   **Processes:**
    *   **Scope Validation:** Asserts `start_year == 1967` and `end_year == 2025`.
    *   **Methodological Purity Check:** Recursively scans the configuration for forbidden terms (e.g., "LPPLS", "Lomb-Scargle") to ensure strict adherence to the manuscript's methods.
    *   **Dataset Presence Check:** Verifies that all required keys (e.g., `df_web_links_raw`) exist in `input_data`.
    *   **Schema Validation:** Checks that each DataFrame contains the mandatory columns defined in the pre-code analysis.
*   **Outputs:**
    *   `bool`: Returns `True` if all validations pass; raises `ValueError` otherwise.
*   **Research Role:**
    *   This callable enforces the boundary conditions of the study as defined in the **Introduction** and **Methods** sections. It ensures the analysis is performed on the correct temporal window ("1967 and 2025") and utilizes the specific datasets ("IATI registry... over 10 million transaction records") required to reproduce the findings.

### **Task 2: `validate_data_quality`**

*   **Inputs:**
    *   `df_transactions` (DataFrame): Raw transaction records.
    *   `df_activities` (DataFrame): Raw activity metadata.
    *   `df_organisations` (DataFrame): Master organisation list.
*   **Processes:**
    *   **Key Integrity:** Computes duplicate rates for `transaction_id` and `iati_identifier`. Calculates the intersection-over-union for the join keys between transactions and activities.
    *   **Type Coercion Simulation:** Attempts to convert `transaction_value` to numeric and dates to datetime objects (using `errors='coerce'`) to quantify the rate of malformed data without mutating the input.
    *   **Reference Coverage:** Calculates the percentage of provider and receiver references in the transaction table that map to entries in the organisation master list.
*   **Outputs:**
    *   `Dict[str, Any]`: A dictionary of quality metrics (e.g., `transaction_id_duplicate_rate`, `bad_numeric_values`, `provider_ref_match_rate`).
*   **Research Role:**
    *   This callable implements the data quality assessment implied by the manuscript's description of the "complex and evolving ecosystem." By quantifying missing references and malformed dates, it establishes the reliability of the "10 million transaction records" before they are used to construct the network topology.

### **Task 3: `establish_reproducibility_context`**

*   **Inputs:**
    *   `config` (Dict): Master configuration.
    *   `input_data` (Dict): Raw DataFrames.
*   **Processes:**
    *   **Fingerprinting:** Computes row counts, null rates, and a structural hash for each input DataFrame to create an immutable record of the starting state.
    *   **Seed Resolution:** Extracts or assigns fixed integer seeds for stochastic components (`node2vec`, `UMAP`) to ensure deterministic execution.
    *   **Sort Policy Definition:** Defines the immutable column ordering rules (e.g., sort transactions by ID, Date, Value) used for deterministic deduplication.
*   **Outputs:**
    *   `ReproducibilityContext`: A dataclass containing `fingerprints`, `seeds`, and `sort_policies`.
*   **Research Role:**
    *   This callable establishes the computational provenance required for scientific reproducibility. It ensures that the "hidden geometry" revealed by the analysis is a stable property of the data and algorithms, not an artifact of random initialization or non-deterministic processing order.

### **Task 4: `cleanse_transactions_temporal_value`**

*   **Inputs:**
    *   `df_transactions_raw` (DataFrame): Raw transactions.
    *   `config` (Dict): Configuration containing scope parameters.
*   **Processes:**
    *   **Date Parsing:** Converts ISO-8601 date strings to datetime objects, quarantining failures.
    *   **Value Filtering:** Coerces transaction values to numeric and applies the strict inequality filter $V(t) > 0$.
    *   **Scope Filtering:** Filters transactions where the year falls within $[1967, 2025]$.
*   **Outputs:**
    *   `Tuple[DataFrame, TransactionExclusions]`: The cleansed DataFrame ($T^{++}$) and a ledger of excluded row IDs.
*   **Research Role:**
    *   This callable implements the specific data selection criteria described in the **Methods** section: "filtered for nonzero financial transactions" and "covers the period from 1967 to 2025." It defines the set of valid edges $E$ for the network construction.

### **Task 5: `cleanse_transactions_endpoints_dedup`**

*   **Inputs:**
    *   `df_transactions_filtered` (DataFrame): Temporally cleansed transactions.
    *   `sort_policy` (SortPolicy): Deterministic sorting rules.
*   **Processes:**
    *   **Endpoint Validation:** Checks that both provider and receiver identifiers (ref or name) are present and non-empty.
    *   **Deduplication:** Sorts the DataFrame according to the `sort_policy` and removes duplicate `transaction_id` entries, keeping the first occurrence.
*   **Outputs:**
    *   `Tuple[DataFrame, EndpointExclusions]`: The fully cleansed transaction table and an exclusion ledger.
*   **Research Role:**
    *   This callable ensures the topological validity of the network. By enforcing that every transaction has a valid source and target, it guarantees that the bipartite graph $G = (U, V, E)$ is well-defined, where $U$ are providers and $V$ are receivers.

### **Task 6: `cleanse_activities_normalization`**

*   **Inputs:**
    *   `df_activities_raw` (DataFrame): Raw activity metadata.
    *   `sort_policy` (SortPolicy): Sorting rules.
*   **Processes:**
    *   **Backbone Cleansing:** Deduplicates activities on `iati_identifier`.
    *   **Normalization:** Explodes multi-valued `recipient_country_code` and `sector_code` fields into long-form DataFrames (one row per activity-context pair).
*   **Outputs:**
    *   `Tuple[DataFrame, DataFrame, DataFrame, ActivityExclusions]`: The activity backbone, exploded countries, exploded sectors, and exclusions.
*   **Research Role:**
    *   This callable prepares the metadata required for the "Provider-Provider co-investment graph." By normalizing countries and sectors, it enables the calculation of the co-occurrence matrix $M$, where $M_{ix}$ represents the frequency of provider $i$ operating in context $x$.

### **Task 7: `cleanse_activities_instruments_coverage`**

*   **Inputs:**
    *   `df_backbone` (DataFrame): Cleaned activity backbone.
    *   `df_countries` (DataFrame): Normalized countries.
    *   `df_sectors` (DataFrame): Normalized sectors.
*   **Processes:**
    *   **Instrument Normalization:** Maps `instrument_type` values to the controlled vocabulary $\{ \text{Grant, Loan, Equity} \}$, setting invalid entries to `NaN`.
    *   **Coverage Calculation:** Computes the percentage of activities that have valid instrument, country, and sector data.
*   **Outputs:**
    *   `Tuple[DataFrame, InstrumentExclusions, ActivityCoverageReport]`: The normalized backbone, exclusions, and coverage stats.
*   **Research Role:**
    *   This callable standardizes the financial instrument data used to generate **Extended Data Fig. 1** ("Evolution of aid instruments"). It ensures that the analysis of "new instruments" mentioned in the **Introduction** is based on a consistent taxonomy.

### **Task 8: `cleanse_organisations_substrate`**

*   **Inputs:**
    *   `df_organisations_raw` (DataFrame): Raw organisation master list.
*   **Processes:**
    *   **Record Integrity:** Removes rows with missing `org_ref`.
    *   **Normalization:** Standardizes `org_name` (NFKD, lowercase) and `website_domain` (strip protocol/path).
    *   **Taxonomy Validation:** Enforces the controlled vocabulary for `org_type`.
    *   **Alias Preparation:** Parses and normalizes `org_name_aliases`.
*   **Outputs:**
    *   `Tuple[DataFrame, OrganisationExclusions]`: The cleansed organisation table and exclusions.
*   **Research Role:**
    *   This callable prepares the "substrate" for entity resolution. By standardizing names and types, it facilitates the accurate mapping of the "2,456 distinct publishing organisations" described in the **Abstract**, ensuring that the nodes in the network represent unique real-world entities.

### **Task 9: `join_transactions_to_contexts`**

*   **Inputs:**
    *   `df_transactions` (DataFrame): Cleansed transactions.
    *   `df_activities_backbone` (DataFrame): Activity metadata.
    *   `df_activities_countries` (DataFrame): Country contexts.
    *   `df_activities_sectors` (DataFrame): Sector contexts.
*   **Processes:**
    *   **Context Joining:** Performs left joins to attach instruments (1:1) and inner joins to attach countries/sectors (1:M) to transactions.
    *   **Unified Context Construction:** Concatenates country and sector contexts into a single long-form table with namespaced IDs (e.g., `COUNTRY:US`, `SECTOR:111`).
*   **Outputs:**
    *   `Tuple[DataFrame, DataFrame, ContextCoverageReport]`: Transactions with instruments, unified contexts, and coverage stats.
*   **Research Role:**
    *   This callable links the financial flows (transactions) to their operational contexts. It constructs the set of contexts $\mathcal{X}$ used in the bipartite projection $P = MM^T$, where $M_{ix} = \sum_{t} \mathbf{1}[\tilde{u}(t) = i] \cdot \mathbf{1}[\kappa(t) = x]$.

### **Task 10: `construct_canonical_mapping`**

*   **Inputs:**
    *   `df_organisations` (DataFrame): Cleansed organisation list.
    *   `config_dict` (Dict): Configuration parameters.
*   **Processes:**
    *   **Precomputed Check:** Checks for an existing trusted mapping.
    *   **Fuzzy Clustering:** If needed, uses TF-IDF vectorization and Cosine Similarity to cluster organisation names (blocking strategy).
    *   **Canonicalization:** Maps raw `org_ref` to a unique `canonical_org_id` based on the clusters.
*   **Outputs:**
    *   `Tuple[DataFrame, CanonicalMappingArtifact]`: The mapped organisation table and the mapping artifact.
*   **Research Role:**
    *   This callable implements the entity resolution required to handle the "diverse population of 2,456 distinct publishing organisations." It ensures that variations in naming (e.g., "USAID" vs. "United States Agency for International Development") are resolved to a single node in the network graph.

### **Task 11: `compute_geographic_density`**

*   **Inputs:**
    *   `df_transactions` (DataFrame): Cleansed transactions.
    *   `df_tx_countries` (DataFrame): Transaction-country mapping.
*   **Processes:**
    *   **Aggregation:** Counts unique transactions per recipient country: $D(c) = \sum_{t \in T^{++}} \mathbf{1}[c(t) = c]$.
    *   **Transformation:** Applies log transformation $D_{\log}(c) = \log(1 + D(c))$.
*   **Outputs:**
    *   `GeoDensityArtifact`: Contains the density table and coverage metadata.
*   **Research Role:**
    *   This callable generates the data for **Figure 1** ("Global distribution of 10 million aid transactions"). It quantifies the "geographic footprint" of the aid system, revealing the "high-density belts" and "sparser connectivity" described in the **Results**.

### **Task 12: `compute_instrument_evolution`**

*   **Inputs:**
    *   `df_tx_instruments` (DataFrame): Transactions with instrument types.
*   **Processes:**
    *   **Aggregation:** Counts transactions by year and instrument type: $C(y, i) = \sum_{t \in T^{++}} \mathbf{1}[\mathrm{year}(d(t)) = y] \cdot \mathbf{1}[\mathrm{instrument}(t) = i]$.
    *   **Pivoting:** Reshapes the data into a wide-form time series table.
*   **Outputs:**
    *   `InstrumentEvolutionArtifact`: Time series data and validation report.
*   **Research Role:**
    *   This callable produces the data for **Extended Data Fig. 1**. It tracks the "complex mix of financial instruments including standard grants, aid loans and equity investments" over the 1967–2025 period, supporting the narrative of a "triple revolution" in aid architecture.

### **Task 13: `construct_bipartite_graph`**

*   **Inputs:**
    *   `df_transactions` (DataFrame): Cleansed transactions.
    *   `canonical_mapping` (DataFrame): Entity resolution map.
*   **Processes:**
    *   **Node Preparation:** Maps transaction endpoints to canonical IDs and appends role suffixes (`::PROVIDER`, `::RECEIVER`) to ensure disjoint sets $U$ and $V$.
    *   **Matrix Construction:** Builds the sparse frequency-weighted incidence matrix $B$ where $B_{ij}$ is the count of transactions between provider $i$ and receiver $j$.
*   **Outputs:**
    *   `BipartiteGraphArtifact`: Contains the sparse matrix $B$, node tables, and index maps.
*   **Research Role:**
    *   This callable constructs the fundamental topological object of the study: the bipartite graph $G = (U, V, E)$. This graph serves as the basis for the "Hub Score" calculation and the subsequent one-mode projection.

### **Task 14: `compute_node_sizes`**

*   **Inputs:**
    *   `df_tx_mapped` (DataFrame): Transactions with canonical IDs.
    *   `df_organisations` (DataFrame): Master organisation list.
*   **Processes:**
    *   **Deal Counting:** Sums the number of transactions where an organisation appears as either a provider or a receiver: $\text{deals}(o) = \sum_{t} \mathbf{1}[o \in \{u(t), v(t)\}]$.
    *   **Validation:** Computes summary statistics by organisation type.
*   **Outputs:**
    *   `NodeSizeArtifact`: Deal counts and validation stats.
*   **Research Role:**
    *   This callable computes the "node size" metric used in the **Solar System** visualization (Figure 3). It allows the study to contrast "activity volume (size)" with "structural influence (position)," revealing the paradox where smaller actors (like universities) can be highly central.

### **Task 15: `construct_co_occurrence_projection`**

*   **Inputs:**
    *   `df_contexts` (DataFrame): Unified contexts.
    *   `df_tx_mapped` (DataFrame): Mapped transactions.
*   **Processes:**
    *   **Incidence Construction:** Builds the provider-context matrix $M$.
    *   **Projection:** Computes the one-mode projection $P = MM^T$ via sparse matrix multiplication and removes self-loops ($P_{ii} \leftarrow 0$).
*   **Outputs:**
    *   `ProjectionArtifact`: The symmetric adjacency matrix $P$ and edge list.
*   **Research Role:**
    *   This callable implements the "bipartite projection" mentioned in the **Abstract**. It transforms the transaction data into a "Provider-Provider co-investment graph," where edges represent shared funding or implementation contexts. This graph is the substrate for the centrality and community detection analyses.

### **Task 16: `learn_node_embeddings`**

*   **Inputs:**
    *   `bipartite_artifact` (BipartiteGraphArtifact): The bipartite graph.
    *   `config` (Dict): Hyperparameters.
*   **Processes:**
    *   **Graph Preparation:** Constructs the symmetric adjacency $A = \begin{pmatrix} 0 & B \\ B^\top & 0 \end{pmatrix}$.
    *   **Random Walks:** Generates biased random walks using Numba-accelerated sampling, implementing the 2nd-order transition probabilities $\pi_{vx} \propto \alpha_{pq}(t, x) \cdot w_{vx}$.
    *   **Skip-gram Training:** Trains a Word2Vec model on the walks to learn 100-dimensional embeddings $f(u)$ that maximize $\sum \log \Pr(N_S(u) | f(u))$.
*   **Outputs:**
    *   `EmbeddingArtifact`: The learned embeddings and metadata.
*   **Research Role:**
    *   This callable implements the `node2vec` algorithm to capture the "structural equivalence" of actors. These high-dimensional embeddings are the input for the dimensionality reduction step that reveals the "hidden geometry" of the system.

### **Task 17: `project_embeddings_umap`**

*   **Inputs:**
    *   `embedding_artifact` (EmbeddingArtifact): Node embeddings.
    *   `config` (Dict): UMAP parameters.
*   **Processes:**
    *   **Dimensionality Reduction:** Applies Uniform Manifold Approximation and Projection (UMAP) to project the 100D embeddings into 2D space ($z(u) \in \mathbb{R}^2$).
*   **Outputs:**
    *   `UMAPArtifact`: 2D coordinates and axis interpretation.
*   **Research Role:**
    *   This callable generates the coordinates for **Figure 2** ("The hidden geometry of the aid ecosystem"). It reveals the "distinct functional clusters" (Humanitarian vs. Development, Funder vs. Implementer) that characterize the system's topology.

### **Task 18: `compute_hits_centrality`**

*   **Inputs:**
    *   `bipartite_artifact` (BipartiteGraphArtifact): The bipartite graph.
*   **Processes:**
    *   **HITS Algorithm:** Iteratively computes Hub ($h$) and Authority ($a$) scores using the power iteration method: $a^{(k)} = B^\top h^{(k-1)}$, $h^{(k)} = B a^{(k)}$, with L2 normalization.
*   **Outputs:**
    *   `HITSArtifact`: Hub and Authority scores.
*   **Research Role:**
    *   This callable calculates the "Hub Score" used to rank actors in the **Solar System** visualization. It identifies the "core solar system of 25 central actors" by measuring their ability to connect to a wide range of recipients in the bipartite network.

### **Task 19: `compute_network_centrality`**

*   **Inputs:**
    *   `projection_artifact` (ProjectionArtifact): The provider graph.
    *   `hits_artifact` (HITSArtifact): Hub scores.
*   **Processes:**
    *   **Degree/Strength:** Computes unweighted degree ($C_D$) and weighted strength ($C_D^w$) on the projection graph.
    *   **Betweenness:** Computes unweighted Betweenness Centrality ($C_B$) using Brandes' algorithm: $C_B(v) = \sum_{s \neq v \neq t} \frac{\sigma_{st}(v)}{\sigma_{st}}$.
    *   **Unification:** Merges all metrics into a master centrality table.
*   **Outputs:**
    *   `CentralityArtifact`: Unified centrality metrics.
*   **Research Role:**
    *   This callable quantifies "brokerage" and "connectivity." The Betweenness Centrality metric specifically identifies the "knowledge brokers" (like universities) that "sit on the shortest paths between otherwise poorly connected donor-implementer clusters."

### **Task 20: `construct_solar_system`**

*   **Inputs:**
    *   `centrality_artifact` (CentralityArtifact): Centrality scores.
    *   `node_size_artifact` (NodeSizeArtifact): Node sizes.
*   **Processes:**
    *   **Ranking:** Ranks organisations by Hub Score descending.
    *   **Sizing:** Attaches transaction counts as node sizes.
    *   **Ring Assignment:** Assigns "Inner", "Middle", and "Outer" rings based on rank thresholds (Top 25, Top 100).
*   **Outputs:**
    *   `SolarSystemArtifact`: Ranked and sized node table for visualization.
*   **Research Role:**
    *   This callable synthesizes the data for **Figure 3** ("The solar system architecture of global aid"). It operationalizes the core-periphery structure described in the **Results**, highlighting the "critical paradox between scale and influence."

### **Task 21: `compute_subgroup_statistics`**

*   **Inputs:**
    *   `df_organisations` (DataFrame): Master organisation list.
    *   `solar_system_artifact` (SolarSystemArtifact): Ranked nodes.
*   **Processes:**
    *   **Partitioning:** Filters ranked nodes into "Universities" and "Foundations" subgroups.
    *   **Statistical Comparison:** Computes mean/median deal counts and median ranks for each subgroup vs. the global population.
*   **Outputs:**
    *   `SubgroupStatsArtifact`: Summary statistics and broker lookup.
*   **Research Role:**
    *   This callable generates the quantitative evidence for the claim that "Universities and research foundations specifically act as essential bridges." It validates the finding that universities occupy a "significantly more central position" despite having smaller deal volumes.

### **Task 22: `compare_broker_betweenness`**

*   **Inputs:**
    *   `centrality_artifact` (CentralityArtifact): Centrality scores.
    *   `subgroup_artifact` (SubgroupStatsArtifact): Broker info.
    *   `solar_system_artifact` (SolarSystemArtifact): Rankings.
*   **Processes:**
    *   **Extraction:** Retrieves Betweenness scores for specific brokers (J-PAL, Hewlett).
    *   **Baseline Computation:** Calculates the median Betweenness of the Top 25 actors.
    *   **Comparison:** Computes the ratio of broker scores to the baseline median.
*   **Outputs:**
    *   `BrokerComparisonArtifact`: Comparison table.
*   **Research Role:**
    *   This callable produces the data for **Extended Data Fig. 3**. It demonstrates that knowledge brokers are "positive outliers" in terms of structural brokerage, validating their role as "connectors rather than just funders."

### **Task 23: `characterize_hewlett_network`**

*   **Inputs:**
    *   `df_transactions` (DataFrame): Cleansed transactions.
    *   `subgroup_artifact` (SubgroupStatsArtifact): Broker info.
    *   `canonical_mapping_artifact` (CanonicalMappingArtifact): Mapping table.
    *   `solar_system_artifact` (SolarSystemArtifact): Rankings.
*   **Processes:**
    *   **Ego Extraction:** Filters transactions where Hewlett is the provider.
    *   **Metric Computation:** Counts unique partners, total transactions, and overlap with the Top 100.
*   **Outputs:**
    *   `HewlettNetworkArtifact`: Ego network edges and metrics.
*   **Research Role:**
    *   This callable generates the data for **Extended Data Fig. 2**. It visualizes the "highly diverse portfolio of downstream partners" supported by the Hewlett Foundation, illustrating how a central broker "diffuse[s] evidence across the humanitarian-development divide."

### **Task 24: `construct_web_graph`**

*   **Inputs:**
    *   `df_web_links_raw` (DataFrame): Raw web links.
    *   `df_organisations` (DataFrame): Master orgs.
*   **Processes:**
    *   **Graph Construction:** Builds the directed adjacency matrix of the web graph from source/target domains.
    *   **Coverage Validation:** Computes the overlap between organisation websites and web graph nodes.
*   **Outputs:**
    *   `WebGraphArtifact`: Adjacency matrix and coverage stats.
*   **Research Role:**
    *   This callable constructs the external validation network. It allows the study to compare the "offline" aid network with the "online" web network, testing the hypothesis that structural influence in aid is reflected in digital authority.

### **Task 25: `compute_website_pagerank`**

*   **Inputs:**
    *   `web_graph_artifact` (WebGraphArtifact): The web graph.
    *   `config` (Dict): Parameters.
*   **Processes:**
    *   **Matrix Preparation:** Constructs the column-stochastic transition matrix $M$.
    *   **Power Iteration:** Iteratively computes the PageRank vector $x$ using the update rule $x^{(k+1)} = \alpha M x^{(k)} + (1-\alpha)/N \mathbf{1}$.
*   **Outputs:**
    *   `PageRankArtifact`: PageRank scores.
*   **Research Role:**
    *   This callable computes the "online visibility metrics" used for validation. PageRank serves as an independent measure of an organisation's prominence, used to validate the Hub Scores derived from transaction data.

### **Task 26: `compute_validation_correlation`**

*   **Inputs:**
    *   `centrality_artifact` (CentralityArtifact): Offline scores.
    *   `pagerank_artifact` (PageRankArtifact): Online scores.
    *   `df_organisations` (DataFrame): Master table.
*   **Processes:**
    *   **Alignment:** Joins Hub Scores and PageRank scores via organisation domains.
    *   **Correlation:** Computes the Pearson correlation coefficient $r$ between the two vectors.
*   **Outputs:**
    *   `CorrelationArtifact`: Correlation results ($r$, $p$, $n$).
*   **Research Role:**
    *   This callable produces the validation metric reported in the **Methods** section ($r=0.48$). It confirms that "influence in the aid ecosystem flows through structural connectivity" by showing a strong link between financial centrality and web authority.

### **Task 27: `run_global_aid_pipeline`**

*   **Inputs:**
    *   Raw DataFrames and Config.
*   **Processes:**
    *   **Orchestration:** Sequentially executes Tasks 1 through 26, managing data flow and artifact persistence.
    *   **Error Handling:** Catches and logs exceptions.
*   **Outputs:**
    *   `PipelineResults`: A container of all generated artifacts.
*   **Research Role:**
    *   This is the execution engine for the baseline analysis. It ensures that the entire research pipeline is run in the correct order and that all dependencies are satisfied.

### **Task 28: `conduct_robustness_analysis`**

*   **Inputs:**
    *   Baseline results, config, pipeline function, raw data.
*   **Processes:**
    *   **Grid Generation:** Creates a set of perturbed configurations (varying $\tau, p, q$).
    *   **Batch Execution:** Runs the pipeline for each configuration.
    *   **Synthesis:** Computes rank correlations (Spearman) and validation stability (Pearson) across runs.
*   **Outputs:**
    *   `RobustnessArtifact`: Sensitivity analysis results.
*   **Research Role:**
    *   This callable implements the sensitivity analysis required to demonstrate the robustness of the findings. It ensures that the identified "Solar System" structure is not an artifact of specific parameter choices.

### **Task 29: `package_reproducible_outputs`**

*   **Inputs:**
    *   Pipeline results, config, output root.
*   **Processes:**
    *   **Manifest Generation:** Computes SHA-256 hashes for all persisted files.
    *   **Metadata Consolidation:** Aggregates metrics and parameters into a master JSON.
    *   **Table Generation:** Exports specific CSVs for manuscript figures.
*   **Outputs:**
    *   `ProvenanceArtifact`: The final reproducibility package.
*   **Research Role:**
    *   This callable ensures the study adheres to open science principles. It packages the results in a way that allows for independent verification and auditing of the "10 million transaction records" analysis.

### **Top-Level: `execute_master_workflow`**

*   **Inputs:**
    *   Raw DataFrames and Config.
*   **Processes:**
    *   **Coordination:** Invokes the baseline pipeline, robustness analysis, and packaging steps in order.
*   **Outputs:**
    *   `MasterWorkflowResults`: The complete set of study outputs.
*   **Research Role:**
    *   This is the single entry point for the entire research project. It encapsulates the full scientific workflow, from raw data to final validated results and provenance tracking.

<br><br>

## **Usage Example**

Below is a script which uses synthetically-generated data to demonstate how to use the "Who Connects Global Aid?" research pipeline accurately:

```python
#!/usr/bin/env python3
# ==============================================================================
# Global Aid Network Analysis: End-to-End Pipeline Usage Example
# ==============================================================================
#
# This script demonstrates, with high fidelity, the usage of the
# "Who Connects Global Aid?" research pipeline. It synthetically generates
# realistic test data conforming to the IATI schema, loads the study configuration
# from a YAML file, and executes the master orchestrator to produce all
# topological artifacts and validation metrics.
#
# Key Steps:
# 1. Synthetic Data Generation (Faker): Creates realistic DataFrames for
#    Transactions, Activities, Organisations, and Web Links.
# 2. Configuration Loading: Reads 'config.yaml' into a Python dictionary.
# 3. Pipeline Execution: Invokes `execute_master_workflow` to run the full
#    analysis lifecycle (Cleansing -> Network Construction -> Topology -> Validation).
# 4. Result Inspection: Demonstrates how to access and verify the generated artifacts.
#
# ==============================================================================

import logging
import pandas as pd
import numpy as np
import yaml
from faker import Faker
from typing import Dict, List
import random
from datetime import datetime, timedelta

# Import the pipeline orchestrator and necessary classes
# (In a real package, these would be imported from the module)
# from global_aid_pipeline import execute_master_workflow, MasterWorkflowResults

# Configure logging for the example execution
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("GlobalAidExample")

# Initialize Faker for synthetic data generation
fake = Faker()
Faker.seed(42)
np.random.seed(42)
random.seed(42)

# ==============================================================================
# Step 1: Synthetic Data Generation
# ==============================================================================
# We generate data that mimics the structure and complexity of the IATI registry.

def generate_synthetic_data(
    n_orgs: int = 50,
    n_activities: int = 200,
    n_transactions: int = 1000,
    n_web_links: int = 500
) -> Dict[str, pd.DataFrame]:
    """
    Generates synthetic DataFrames for the Global Aid Network analysis.
    
    Parameters
    ----------
    n_orgs : int
        Number of unique organisations to generate.
    n_activities : int
        Number of unique aid activities (projects).
    n_transactions : int
        Number of financial transactions.
    n_web_links : int
        Number of web hyperlinks for validation.
        
    Returns
    -------
    Dict[str, pd.DataFrame]
        Dictionary containing the four required raw DataFrames.
    """
    logger.info("Generating synthetic IATI data...")

    # -------------------------------------------------------------------------
    # 1. Organisations (df_organisations_raw)
    # -------------------------------------------------------------------------
    org_types = [
        "Government", "Multilateral", "Foundation", "Academic/Research",
        "International NGO", "National NGO", "Private Sector", "Other"
    ]
    
    orgs = []
    for i in range(n_orgs):
        name = fake.company()
        # Create realistic variations for aliases
        aliases = [name, name.upper(), name.split()[0] + " Org"]
        
        orgs.append({
            "org_ref": f"ORG-{i:04d}",
            "org_name": name,
            "org_name_aliases": aliases, # List of strings
            "org_type": np.random.choice(org_types, p=[0.1, 0.1, 0.1, 0.2, 0.2, 0.1, 0.1, 0.1]),
            "website_domain": fake.url(),
            "canonical_org_id": None # To be resolved by pipeline
        })
    
    df_organisations_raw = pd.DataFrame(orgs)
    
    # Ensure we have specific brokers for the case study
    # Hewlett Foundation
    df_organisations_raw.loc[0, "org_name"] = "William and Flora Hewlett Foundation"
    df_organisations_raw.loc[0, "org_type"] = "Foundation"
    df_organisations_raw.loc[0, "org_ref"] = "US-EIN-94-1655673"
    
    # J-PAL (Academic)
    df_organisations_raw.loc[1, "org_name"] = "Abdul Latif Jameel Poverty Action Lab"
    df_organisations_raw.loc[1, "org_type"] = "Academic/Research"
    
    # -------------------------------------------------------------------------
    # 2. Activities (df_activities_raw)
    # -------------------------------------------------------------------------
    activities = []
    instruments = ["Grant", "Loan", "Equity", None] # Include nulls
    
    for i in range(n_activities):
        # Multi-valued countries and sectors (semicolon delimited or list)
        # We'll use lists as per our pipeline's capability to handle them
        n_countries = np.random.randint(1, 4)
        countries = [fake.country_code() for _ in range(n_countries)]
        
        n_sectors = np.random.randint(1, 3)
        sectors = [str(np.random.randint(100, 999)) for _ in range(n_sectors)]
        
        activities.append({
            "iati_identifier": f"ACT-{i:05d}",
            "activity_start_date_iso_date": fake.date_between(start_date='-10y', end_date='today').isoformat(),
            "recipient_country_code": countries,
            "sector_code": sectors,
            "instrument_type": np.random.choice(instruments, p=[0.6, 0.3, 0.05, 0.05]),
            "activity_title": fake.sentence(),
            "publisher_org_ref": np.random.choice(df_organisations_raw["org_ref"])
        })
        
    df_activities_raw = pd.DataFrame(activities)

    # -------------------------------------------------------------------------
    # 3. Transactions (df_transactions_raw)
    # -------------------------------------------------------------------------
    transactions = []
    for i in range(n_transactions):
        # Pick random activity
        act = np.random.choice(df_activities_raw["iati_identifier"])
        
        # Pick provider and receiver from orgs
        prov = df_organisations_raw.sample(1).iloc[0]
        recv = df_organisations_raw.sample(1).iloc[0]
        
        # Ensure provider != receiver
        while prov["org_ref"] == recv["org_ref"]:
            recv = df_organisations_raw.sample(1).iloc[0]
            
        date = fake.date_between(start_date='-10y', end_date='today')
        
        transactions.append({
            "transaction_id": f"TX-{i:08d}",
            "iati_identifier": act,
            "reporting_org_ref": prov["org_ref"], # Usually provider reports
            "reporting_org_name": prov["org_name"],
            "transaction_transaction_date_iso_date": date.isoformat(),
            "transaction_transaction_type_code": np.random.choice(["C", "D", "E"]), # Commitment, Disbursement, Expenditure
            "transaction_value": round(np.random.uniform(1000, 1000000), 2),
            "transaction_value_currency": "USD",
            "transaction_provider_org_ref": prov["org_ref"],
            "transaction_provider_org_name": prov["org_name"],
            "transaction_receiver_org_ref": recv["org_ref"],
            "transaction_receiver_org_name": recv["org_name"]
        })
        
    df_transactions_raw = pd.DataFrame(transactions)

    # -------------------------------------------------------------------------
    # 4. Web Links (df_web_links_raw)
    # -------------------------------------------------------------------------
    web_links = []
    # Extract domains from orgs
    domains = [d for d in df_organisations_raw["website_domain"] if d]
    
    for _ in range(n_web_links):
        if not domains: break
        src = np.random.choice(domains)
        tgt = np.random.choice(domains)
        if src != tgt:
            web_links.append({
                "source_domain": src,
                "target_domain": tgt,
                "crawl_date": datetime.now().isoformat(),
                "link_count": 1
            })
            
    df_web_links_raw = pd.DataFrame(web_links)
    
    # -------------------------------------------------------------------------
    # 5. Exchange Rates (Optional - Empty for baseline)
    # -------------------------------------------------------------------------
    df_exchange_rates_raw = pd.DataFrame(columns=["currency_code", "date", "exchange_rate_to_usd"])

    logger.info("Synthetic data generation complete.")
    return {
        "df_transactions_raw": df_transactions_raw,
        "df_activities_raw": df_activities_raw,
        "df_organisations_raw": df_organisations_raw,
        "df_web_links_raw": df_web_links_raw,
        "df_exchange_rates_raw": df_exchange_rates_raw
    }

# ==============================================================================
# Step 2: Configuration Loading
# ==============================================================================

def load_study_config(config_path: str = "config.yaml") -> Dict[str, Any]:
    """
    Reads the YAML configuration file into a Python dictionary.
    """
    logger.info(f"Loading configuration from {config_path}...")
    try:
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        logger.info("Configuration loaded successfully.")
        return config
    except FileNotFoundError:
        logger.error(f"Config file not found at {config_path}. Please ensure it exists.")
        raise

# ==============================================================================
# Step 3: Pipeline Execution Example
# ==============================================================================

def main():
    # 1. Generate Data
    data_inputs = generate_synthetic_data()
    
    # 2. Load Config
    # Note: Ensure 'config.yaml' exists in the working directory with the content provided previously.
    # For this example, we assume it is present.
    try:
        config = load_study_config("config.yaml")
    except Exception as e:
        logger.critical("Failed to load config. Exiting.")
        return

    # 3. Execute Master Workflow
    # This invokes the top-level orchestrator which manages the entire lifecycle:
    # Baseline -> Robustness -> Provenance
    logger.info("Invoking Master Orchestrator...")
    
    results = execute_master_workflow(
        df_transactions_raw=data_inputs["df_transactions_raw"],
        df_activities_raw=data_inputs["df_activities_raw"],
        df_organisations_raw=data_inputs["df_organisations_raw"],
        df_web_links_raw=data_inputs["df_web_links_raw"],
        config=config,
        output_root="./global_aid_study_output"
    )

    # 4. Inspect Results
    if results.success:
        logger.info(f"Workflow completed in {results.execution_time:.2f} seconds.")
        
        # Access Baseline Artifacts
        baseline = results.baseline_results
        
        # Example: Inspect Solar System Rankings
        if baseline.artifacts.get("solar_system"):
            top_nodes = baseline.artifacts["solar_system"].top_100_table
            print("\n--- Top 5 Central Actors (Solar System) ---")
            print(top_nodes[['rank', 'hub_score_hits', 'node_size', 'ring']].head(5))
            
        # Example: Inspect Broker Comparison
        if baseline.artifacts.get("broker_comparison"):
            brokers = baseline.artifacts["broker_comparison"].comparison_table
            print("\n--- Broker Betweenness Comparison ---")
            print(brokers[['org_name', 'betweenness', 'ratio_to_median']])
            
        # Example: Inspect Validation Correlation
        if baseline.artifacts.get("correlation"):
            corr = baseline.artifacts["correlation"]
            print("\n--- External Validation ---")
            print(f"Pearson r: {corr.pearson_r:.4f} (p={corr.p_value:.4e})")
            
        # Access Provenance
        prov = results.provenance_artifact
        print(f"\nProvenance Manifest: {len(prov.artifact_manifest)} files persisted.")
        
    else:
        logger.error("Workflow failed.")
        if results.baseline_results and results.baseline_results.error_message:
            logger.error(f"Baseline Error: {results.baseline_results.error_message}")

if __name__ == "__main__":
    main()
```
<br>

In [None]:
# Task 1: Validate configuration fidelity to manuscript methods

# ==============================================================================
# Task 1: Validate configuration fidelity to manuscript methods
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1: Assert alignment with paper-stated scope
# -------------------------------------------------------------------------------------------------------------------------------

def validate_scope_and_exclusions(config: Dict[str, Any]) -> None:
    """
    Validates that the study configuration strictly adheres to the manuscript's
    stated temporal scope and methodological exclusions (e.g., no LLMs).

    Parameters
    ----------
    config : Dict[str, Any]
        The master study configuration dictionary.

    Raises
    ------
    ValueError
        If the start/end years do not match 1967-2025, if LLM usage is enabled,
        or if forbidden methods (LPPLS, etc.) are referenced.
    """
    # Extract scope parameters
    # Check strict adherence to manuscript start year (1967)
    start_year = config.get("SCOPE", {}).get("start_year")
    # Check strict adherence to manuscript end year (2025)
    end_year = config.get("SCOPE", {}).get("end_year")

    # Validate temporal scope
    if start_year != 1967 or end_year != 2025:
        # Raise error if scope deviates from manuscript
        raise ValueError(
            f"Configured scope ({start_year}-{end_year}) deviates from manuscript "
            "requirement (1967-2025)."
        )

    # Log confirmation of scope
    logger.info(f"Temporal scope validated: {start_year}-{end_year}")

    # Validate LLM exclusion
    # Manuscript does not use LLMs; this must be explicitly disabled
    llm_enabled = config.get("LLM_USAGE", {}).get("enabled")
    if llm_enabled:
        # Raise error if LLMs are enabled
        raise ValueError("LLM usage is enabled but manuscript methods do not utilize LLMs.")

    # Log confirmation of LLM exclusion
    logger.info("LLM usage explicitly disabled, consistent with manuscript.")

    # Scan for forbidden keys recursively
    # These methods (LPPLS, Lomb-Scargle, etc.) are not in the paper
    forbidden_terms = ["LPPLS", "Lomb", "Scargle", "surrogate", "inception"]

    def recursive_scan(d: Dict[str, Any], path: str = ""):
        """Recursively scan dictionary keys for forbidden terms."""
        for k, v in d.items():
            # Check current key against forbidden terms
            for term in forbidden_terms:
                if term.lower() in k.lower():
                    raise ValueError(f"Configuration contains forbidden non-manuscript term '{term}' at '{path}{k}'")
            # Recurse if value is a dictionary
            if isinstance(v, dict):
                recursive_scan(v, path + k + ".")

    # Execute recursive scan on the entire config
    recursive_scan(config)
    # Log confirmation of method purity
    logger.info("No forbidden non-manuscript methods (LPPLS, etc.) detected in configuration.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Assert required dataset presence
# -------------------------------------------------------------------------------------------------------------------------------

def validate_dataset_presence(
    config: Dict[str, Any],
    input_data: Dict[str, pd.DataFrame]
) -> None:
    """
    Validates that all required datasets declared in the configuration are present
    in the input data dictionary.

    Parameters
    ----------
    config : Dict[str, Any]
        The master study configuration dictionary.
    input_data : Dict[str, pd.DataFrame]
        Dictionary mapping dataset names to loaded pandas DataFrames.

    Raises
    ------
    ValueError
        If any dataset marked as 'required' in the config is missing from input_data.
    """
    # Extract input dataset requirements from config
    required_datasets_config = config.get("INPUT_DATASETS", {})

    # Identify missing datasets
    missing_datasets = []

    # Iterate over configured datasets
    for dataset_name, requirements in required_datasets_config.items():
        # Check if the dataset is marked as required
        if requirements.get("required", False):
            # Check if the dataset exists in the input dictionary
            if dataset_name not in input_data:
                missing_datasets.append(dataset_name)
            # Check if the value is actually a DataFrame (not None)
            elif input_data[dataset_name] is None:
                missing_datasets.append(dataset_name)

    # Raise error if any required datasets are missing
    if missing_datasets:
        raise ValueError(
            f"The following required datasets are missing from input: {missing_datasets}. "
            "Ensure df_web_links_raw is included for validation reproduction."
        )

    # Log confirmation of dataset presence
    logger.info("All required datasets are present.")

    # Check for optional FX data
    if "df_exchange_rates_raw" not in input_data:
        logger.info("Optional 'df_exchange_rates_raw' not provided; proceeding with baseline count-based analysis.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Assert column-level schema validity
# -------------------------------------------------------------------------------------------------------------------------------

def validate_dataframe_schemas(input_data: Dict[str, pd.DataFrame]) -> None:
    """
    Validates that the input DataFrames contain the specific columns required
    for the manuscript's analysis pipeline.

    Parameters
    ----------
    input_data : Dict[str, pd.DataFrame]
        Dictionary mapping dataset names to loaded pandas DataFrames.

    Raises
    ------
    ValueError
        If any required columns are missing from the DataFrames.
    """
    # Define required schemas based on manuscript needs
    # These columns are strictly required for the pipeline to function
    required_schemas = {
        "df_transactions_raw": {
            "iati_identifier",
            "reporting_org_ref",
            "reporting_org_name",
            "transaction_transaction_date_iso_date",
            "transaction_transaction_type_code",
            "transaction_value",
            "transaction_value_currency",
            "transaction_provider_org_ref",
            "transaction_provider_org_name",
            "transaction_receiver_org_ref",
            "transaction_receiver_org_name",
            "transaction_id"
        },
        "df_activities_raw": {
            "iati_identifier",
            "recipient_country_code",
            "sector_code",
            "instrument_type",
            "publisher_org_ref"
        },
        "df_organisations_raw": {
            "org_ref",
            "org_name",
            "org_type",
            "website_domain"
        },
        "df_web_links_raw": {
            "source_domain",
            "target_domain"
        }
    }

    # List to collect all schema errors
    schema_errors = []

    # Iterate over each dataset to validate
    for dataset_name, required_columns in required_schemas.items():
        # Skip validation if optional dataset is missing (handled in Step 2)
        if dataset_name not in input_data:
            continue

        df = input_data[dataset_name]
        # Get actual columns from the DataFrame
        actual_columns = set(df.columns)
        # Calculate missing columns
        missing_columns = required_columns - actual_columns

        # If columns are missing, record the error
        if missing_columns:
            schema_errors.append(
                f"{dataset_name} is missing columns: {sorted(list(missing_columns))}"
            )

    # Raise error if any schema violations were found
    if schema_errors:
        raise ValueError("Schema validation failed:\n" + "\n".join(schema_errors))

    # Log confirmation of schema validity
    logger.info("Column-level schema validation passed for all datasets.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_study_configuration(
    config: Dict[str, Any],
    input_data: Dict[str, pd.DataFrame]
) -> bool:
    """
    Orchestrator function for Task 1. Validates the study configuration and input data
    against the strict requirements of the manuscript's methodology.

    Parameters
    ----------
    config : Dict[str, Any]
        The master study configuration dictionary.
    input_data : Dict[str, pd.DataFrame]
        Dictionary mapping dataset names to loaded pandas DataFrames.

    Returns
    -------
    bool
        True if all validations pass. Raises ValueError otherwise.

    Raises
    ------
    ValueError
        If any validation step fails (scope, dataset presence, or schema).
    """
    logger.info("Starting Task 1: Configuration and Data Validation...")

    # Step 1: Validate scope and exclusions
    # Ensures alignment with 1967-2025 scope and no LLM usage
    validate_scope_and_exclusions(config)

    # Step 2: Validate dataset presence
    # Ensures all required DataFrames (including web links) are loaded
    validate_dataset_presence(config, input_data)

    # Step 3: Validate schemas
    # Ensures all required columns exist in the DataFrames
    validate_dataframe_schemas(input_data)

    logger.info("Task 1 Completed Successfully: Configuration and Data are valid.")
    return True


In [None]:
# Task 2 — Validate data quality and integrity constraints

# ==============================================================================
# Task 2: Validate data quality and integrity constraints
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Validate key uniqueness and join compatibility
# -------------------------------------------------------------------------------------------------------------------------------

def validate_key_integrity(
    df_transactions: pd.DataFrame,
    df_activities: pd.DataFrame
) -> Dict[str, float]:
    """
    Validates the uniqueness of primary keys in transactions and activities,
    and computes the join coverage between them.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Raw transactions dataframe containing 'transaction_id' and 'iati_identifier'.
    df_activities : pd.DataFrame
        Raw activities dataframe containing 'iati_identifier'.

    Returns
    -------
    Dict[str, float]
        A dictionary containing:
        - 'transaction_id_duplicate_rate': Fraction of duplicate transaction IDs.
        - 'activity_id_duplicate_rate': Fraction of duplicate activity IDs.
        - 'tx_to_activity_coverage': Fraction of transactions with a valid activity ID.
        - 'activity_to_tx_coverage': Fraction of activities associated with at least one transaction.
    """
    # Validate transaction_id uniqueness
    # Count total rows
    n_tx = len(df_transactions)
    if n_tx == 0:
        tx_dup_rate = 0.0
    else:
        # Count duplicates in transaction_id
        n_tx_dups = df_transactions.duplicated(subset=['transaction_id']).sum()
        tx_dup_rate = n_tx_dups / n_tx

    # Validate activity_id uniqueness
    # Count total rows
    n_act = len(df_activities)
    if n_act == 0:
        act_dup_rate = 0.0
    else:
        # Count duplicates in iati_identifier
        n_act_dups = df_activities.duplicated(subset=['iati_identifier']).sum()
        act_dup_rate = n_act_dups / n_act

    # Compute Join Coverage (Transactions -> Activities)
    # Get set of valid activity IDs
    valid_activity_ids = set(df_activities['iati_identifier'].dropna().unique())

    # Check which transactions have a valid activity ID
    tx_has_activity = df_transactions['iati_identifier'].isin(valid_activity_ids)
    tx_coverage = tx_has_activity.mean() if n_tx > 0 else 0.0

    # Compute Join Coverage (Activities -> Transactions)
    # Get set of activity IDs present in transactions
    active_activity_ids = set(df_transactions['iati_identifier'].dropna().unique())

    # Check which activities are referenced
    act_is_referenced = df_activities['iati_identifier'].isin(active_activity_ids)
    act_coverage = act_is_referenced.mean() if n_act > 0 else 0.0

    logger.info(f"Key Integrity: Tx Dups={tx_dup_rate:.2%}, Act Dups={act_dup_rate:.2%}, "
                f"Tx Coverage={tx_coverage:.2%}")

    return {
        "transaction_id_duplicate_rate": float(tx_dup_rate),
        "activity_id_duplicate_rate": float(act_dup_rate),
        "tx_to_activity_coverage": float(tx_coverage),
        "activity_to_tx_coverage": float(act_coverage)
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Validate type coercion feasibility
# -------------------------------------------------------------------------------------------------------------------------------

def validate_type_coercion(
    df_transactions: pd.DataFrame,
    df_organisations: pd.DataFrame
) -> Dict[str, int]:
    """
    Simulates type coercion for critical columns to identify potential data quality issues
    without modifying the data.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Raw transactions dataframe.
    df_organisations : pd.DataFrame
        Raw organisations dataframe.

    Returns
    -------
    Dict[str, int]
        A dictionary containing counts of rows that would fail coercion:
        - 'bad_numeric_values': Count of non-numeric transaction values.
        - 'bad_dates': Count of unparseable transaction dates.
        - 'bad_org_types': Count of organisation types outside the controlled vocabulary.
    """
    # 1. Numeric Coercion Audit (transaction_value)
    # Attempt to convert to numeric, turning errors into NaNs
    numeric_series = pd.to_numeric(df_transactions['transaction_value'], errors='coerce')
    # Count how many became NaN (excluding those that were already NaN)
    # We assume original NaNs are "missing" not "malformed", but for strict typing,
    # if the column is object, a NaN is a valid float.
    # Here we specifically look for values that exist but fail conversion.
    non_null_mask = df_transactions['transaction_value'].notna()
    failed_numeric = non_null_mask & numeric_series.isna()
    n_bad_numeric = failed_numeric.sum()

    # 2. Date Parsing Audit (transaction_transaction_date_iso_date)
    # Attempt to convert to datetime, turning errors into NaT
    date_series = pd.to_datetime(df_transactions['transaction_transaction_date_iso_date'], errors='coerce')
    non_null_date_mask = df_transactions['transaction_transaction_date_iso_date'].notna()
    failed_dates = non_null_date_mask & date_series.isna()
    n_bad_dates = failed_dates.sum()

    # 3. Org Type Taxonomy Enforcement
    # Define controlled vocabulary per manuscript/task requirements
    allowed_org_types = {
        "Government", "Multilateral", "Foundation", "Academic/Research",
        "International NGO", "National NGO", "Private Sector", "Other"
    }
    # Check validity
    # Normalize to string to handle potential mixed types, though input should be string
    org_types = df_organisations['org_type'].astype(str)

    # Identify types not in the allowed set
    # Note: We treat NaN/None as "bad" here if strict taxonomy is required,
    # or we can exclude them. The task says "Quarantine ambiguous/missing".
    # So we check everything.
    is_valid_type = org_types.isin(allowed_org_types)
    n_bad_org_types = (~is_valid_type).sum()

    logger.info(f"Type Coercion: Bad Numeric={n_bad_numeric}, Bad Dates={n_bad_dates}, "
                f"Bad Org Types={n_bad_org_types}")

    return {
        "bad_numeric_values": int(n_bad_numeric),
        "bad_dates": int(n_bad_dates),
        "bad_org_types": int(n_bad_org_types)
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Validate organisation reference coverage
# -------------------------------------------------------------------------------------------------------------------------------

def validate_reference_coverage(
    df_transactions: pd.DataFrame,
    df_organisations: pd.DataFrame
) -> Dict[str, float]:
    """
    Computes the fraction of transaction provider and receiver references that
    exist in the master organisation table.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Raw transactions dataframe.
    df_organisations : pd.DataFrame
        Raw organisations dataframe.

    Returns
    -------
    Dict[str, float]
        A dictionary containing:
        - 'provider_ref_match_rate': Fraction of non-null provider refs found in master.
        - 'receiver_ref_match_rate': Fraction of non-null receiver refs found in master.
    """
    # Extract master set of organisation references
    # Drop nulls to ensure clean set
    master_refs = set(df_organisations['org_ref'].dropna().unique())

    # 1. Provider Reference Coverage
    # Get non-null provider refs
    provider_refs = df_transactions['transaction_provider_org_ref'].dropna()
    if len(provider_refs) == 0:
        prov_coverage = 0.0
    else:
        # Check membership
        prov_matches = provider_refs.isin(master_refs)
        prov_coverage = prov_matches.mean()

    # 2. Receiver Reference Coverage
    # Get non-null receiver refs
    receiver_refs = df_transactions['transaction_receiver_org_ref'].dropna()
    if len(receiver_refs) == 0:
        recv_coverage = 0.0
    else:
        # Check membership
        recv_matches = receiver_refs.isin(master_refs)
        recv_coverage = recv_matches.mean()

    logger.info(f"Ref Coverage: Provider={prov_coverage:.2%}, Receiver={recv_coverage:.2%}")

    return {
        "provider_ref_match_rate": float(prov_coverage),
        "receiver_ref_match_rate": float(recv_coverage)
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_data_quality(
    df_transactions: pd.DataFrame,
    df_activities: pd.DataFrame,
    df_organisations: pd.DataFrame
) -> Dict[str, Any]:
    """
    Orchestrator for Task 2. Executes a suite of data quality checks to ensure
    integrity, type safety, and referential consistency before processing.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Raw transactions dataframe.
    df_activities : pd.DataFrame
        Raw activities dataframe.
    df_organisations : pd.DataFrame
        Raw organisations dataframe.

    Returns
    -------
    Dict[str, Any]
        A consolidated dictionary of all quality metrics computed.
    """
    logger.info("Starting Task 2: Data Quality Validation...")

    # Step 1: Key Integrity
    integrity_metrics = validate_key_integrity(df_transactions, df_activities)

    # Step 2: Type Coercion
    coercion_metrics = validate_type_coercion(df_transactions, df_organisations)

    # Step 3: Reference Coverage
    coverage_metrics = validate_reference_coverage(df_transactions, df_organisations)

    # Consolidate results
    quality_report = {
        **integrity_metrics,
        **coercion_metrics,
        **coverage_metrics
    }

    logger.info("Task 2 Completed Successfully.")
    return quality_report


In [None]:
# Task 3 — Freeze input snapshot and establish reproducibility metadata

# ==============================================================================
# Task 3: Freeze input snapshot and establish reproducibility metadata
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Record dataset fingerprints
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass
class DatasetFingerprint:
    """
    Encapsulates the statistical fingerprint of a dataset for reproducibility.

    This dataclass serves as an immutable record of the input data state at the
    start of the pipeline. It captures key metrics (row counts, null rates,
    temporal bounds, and content hashes) that allow for the verification of
    data integrity and the detection of upstream data drift.

    Attributes
    ----------
    row_count : int
        The total number of rows in the DataFrame. Used to verify data volume.
    column_null_rates : Dict[str, float]
        A dictionary mapping column names to their fraction of null values.
        Used to monitor data quality and completeness.
    temporal_range : Optional[Tuple[pd.Timestamp, pd.Timestamp]], default=None
        A tuple containing the minimum and maximum timestamps found in the
        dataset (if applicable, e.g., for transactions). Used to verify
        temporal coverage.
    dataframe_hash : str, default=""
        A deterministic hash string derived from the DataFrame's content or
        structure. Used for strict equality checks and provenance tracking.
    """
    # The total number of rows in the dataset
    row_count: int

    # A dictionary mapping column names to the proportion of missing values (0.0 to 1.0)
    column_null_rates: Dict[str, float]

    # The minimum and maximum dates in the dataset, if a date column exists
    temporal_range: Optional[Tuple[pd.Timestamp, pd.Timestamp]] = None

    # A SHA-256 hash representing the dataset's content/structure
    dataframe_hash: str = ""

def compute_dataset_fingerprints(
    input_data: Dict[str, pd.DataFrame]
) -> Dict[str, DatasetFingerprint]:
    """
    Computes statistical fingerprints (row counts, null rates, temporal ranges)
    for all input DataFrames to establish a baseline snapshot.

    Parameters
    ----------
    input_data : Dict[str, pd.DataFrame]
        Dictionary of raw input DataFrames.

    Returns
    -------
    Dict[str, DatasetFingerprint]
        A dictionary mapping dataset names to their fingerprints.
    """
    fingerprints = {}

    for name, df in input_data.items():
        # 1. Row Count
        row_count = len(df)

        # 2. Null Rates per Column
        # Compute fraction of nulls for each column
        null_rates = df.isnull().mean().to_dict()

        # 3. Temporal Range (for transactions only)
        temporal_range = None
        if name == "df_transactions_raw" and "transaction_transaction_date_iso_date" in df.columns:
            # Coerce dates to find min/max, ignoring errors for fingerprinting purposes
            # We do not mutate the original DF here
            dates = pd.to_datetime(df["transaction_transaction_date_iso_date"], errors='coerce')
            valid_dates = dates.dropna()
            if not valid_dates.empty:
                temporal_range = (valid_dates.min(), valid_dates.max())

        # 4. Deterministic Hash (Simple structural hash for in-memory verification)
        # We hash the column names and row count as a lightweight proxy
        # A full content hash is expensive for 10M rows; this is a "sanity check" hash
        header_str = ",".join(sorted(df.columns))
        content_summary = f"{header_str}|{row_count}"
        df_hash = hashlib.sha256(content_summary.encode('utf-8')).hexdigest()

        fingerprints[name] = DatasetFingerprint(
            row_count=row_count,
            column_null_rates=null_rates,
            temporal_range=temporal_range,
            dataframe_hash=df_hash
        )

        logger.info(f"Fingerprinted {name}: {row_count} rows, Hash={df_hash[:8]}...")

    return fingerprints


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Fix random seeds for stochastic components
# -------------------------------------------------------------------------------------------------------------------------------

def fix_random_seeds(config: Dict[str, Any]) -> Dict[str, int]:
    """
    Ensures that random seeds for stochastic components (node2vec, UMAP) are
    explicitly set in the configuration. If missing, assigns default fixed values.

    Parameters
    ----------
    config : Dict[str, Any]
        The master study configuration dictionary.

    Returns
    -------
    Dict[str, int]
        A dictionary of the resolved seeds.
    """
    resolved_seeds = {}

    # 1. node2vec Seed
    # Check if seed exists in config, else set default
    n2v_config = config.get("NODE2VEC", {})
    if "random_seed" not in n2v_config or n2v_config["random_seed"] == "MUST_BE_FIXED_AND_RECORDED":
        # Default fixed seed for reproducibility
        resolved_seeds["node2vec_seed"] = 42
        logger.info("Assigned default node2vec_seed: 42")
    else:
        resolved_seeds["node2vec_seed"] = int(n2v_config["random_seed"])
        logger.info(f"Using configured node2vec_seed: {resolved_seeds['node2vec_seed']}")

    # 2. UMAP Seed
    # Check if seed exists in config, else set default
    umap_config = config.get("UMAP", {})
    if "random_state" not in umap_config or umap_config["random_state"] == "MUST_BE_FIXED_AND_RECORDED":
        # Default fixed seed for reproducibility
        resolved_seeds["umap_seed"] = 42
        logger.info("Assigned default umap_seed: 42")
    else:
        resolved_seeds["umap_seed"] = int(umap_config["random_state"])
        logger.info(f"Using configured umap_seed: {resolved_seeds['umap_seed']}")

    return resolved_seeds


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Define deterministic ordering rules
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass
class SortPolicy:
    """
    Defines the columns and order used for deterministic sorting of datasets.

    This dataclass specifies the exact sorting rules required to ensure
    deterministic behavior in downstream operations such as deduplication,
    hashing, and rank assignment. By explicitly defining the sort keys and
    order, we eliminate ambiguity caused by non-deterministic row ordering
    in distributed or parallel processing environments.

    Attributes
    ----------
    dataset_name : str
        The name of the dataset to which this policy applies (e.g., "df_transactions_raw").
    sort_columns : List[str]
        A list of column names to use as sort keys. The order of columns in
        this list determines the priority of sorting (primary key, secondary key, etc.).
    ascending : List[bool]
        A list of booleans corresponding to `sort_columns`, indicating whether
        each column should be sorted in ascending (True) or descending (False) order.
    """
    # The identifier of the dataset (e.g., 'df_transactions_raw')
    dataset_name: str

    # The list of column names to sort by, in order of priority
    sort_columns: List[str]

    # A list of booleans indicating sort direction for each column (True=Ascending)
    ascending: List[bool]

def define_sort_policies() -> Dict[str, SortPolicy]:
    """
    Defines the immutable sorting rules for each dataset to ensure deterministic
    processing order (e.g., for deduplication and hashing).

    Returns
    -------
    Dict[str, SortPolicy]
        A dictionary mapping dataset names to their sort policies.
    """
    policies = {}

    # 1. Transactions Policy
    # Sort by ID, Date, Value, Provider, Receiver
    policies["df_transactions_raw"] = SortPolicy(
        dataset_name="df_transactions_raw",
        sort_columns=[
            "iati_identifier",
            "transaction_transaction_date_iso_date",
            "transaction_value",
            "transaction_provider_org_ref",
            "transaction_receiver_org_ref"
        ],
        # All ascending
        ascending=[True, True, True, True, True]
    )

    # 2. Activities Policy
    # Sort by IATI Identifier
    policies["df_activities_raw"] = SortPolicy(
        dataset_name="df_activities_raw",
        sort_columns=["iati_identifier"],
        ascending=[True]
    )

    # 3. Organisations Policy
    # Sort by Org Ref
    policies["df_organisations_raw"] = SortPolicy(
        dataset_name="df_organisations_raw",
        sort_columns=["org_ref"],
        ascending=[True]
    )

    logger.info("Defined deterministic sort policies for Transactions, Activities, Organisations.")
    return policies


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass
class ReproducibilityContext:
    """
    Container for all metadata required to reproduce the run.

    This dataclass aggregates all the artifacts generated during the
    "Freeze input snapshot" task. It serves as a single source of truth for
    the run's initial conditions, including data fingerprints, random seeds,
    and sorting policies. This context object is passed downstream to ensure
    that all subsequent processing steps adhere to the established
    reproducibility parameters.

    Attributes
    ----------
    fingerprints : Dict[str, DatasetFingerprint]
        A dictionary mapping dataset names to their computed `DatasetFingerprint` objects.
    seeds : Dict[str, int]
        A dictionary mapping stochastic component names (e.g., "node2vec_seed")
        to their fixed integer seeds.
    sort_policies : Dict[str, SortPolicy]
        A dictionary mapping dataset names to their defined `SortPolicy` objects.
    """
    # A dictionary of dataset fingerprints keyed by dataset name
    fingerprints: Dict[str, DatasetFingerprint]

    # A dictionary of fixed random seeds keyed by component name
    seeds: Dict[str, int]

    # A dictionary of sort policies keyed by dataset name
    sort_policies: Dict[str, SortPolicy]

def establish_reproducibility_context(
    config: Dict[str, Any],
    input_data: Dict[str, pd.DataFrame]
) -> ReproducibilityContext:
    """
    Orchestrator for Task 3. Freezes the input state by computing fingerprints,
    resolving random seeds, and defining sorting policies.

    Parameters
    ----------
    config : Dict[str, Any]
        The master study configuration dictionary.
    input_data : Dict[str, pd.DataFrame]
        Dictionary of raw input DataFrames.

    Returns
    -------
    ReproducibilityContext
        The established context object containing fingerprints, seeds, and policies.
    """
    logger.info("Starting Task 3: Establishing Reproducibility Context...")

    # Step 1: Fingerprint Datasets
    fingerprints = compute_dataset_fingerprints(input_data)

    # Step 2: Fix Seeds
    seeds = fix_random_seeds(config)

    # Step 3: Define Sort Policies
    sort_policies = define_sort_policies()

    context = ReproducibilityContext(
        fingerprints=fingerprints,
        seeds=seeds,
        sort_policies=sort_policies
    )

    logger.info("Task 3 Completed Successfully.")
    return context


In [None]:
# Task 4 — Cleanse df_transactions_raw (temporal and value filters)

# ==============================================================================
# Task 4: Cleanse df_transactions_raw (temporal and value filters)
# ==============================================================================

@dataclass
class TransactionExclusions:
    """
    Container for tracking rows excluded during transaction cleansing.

    This dataclass serves as a structured ledger for all transactions dropped
    during the temporal and value-based cleansing phase. By categorizing exclusions
    into specific lists of transaction IDs, it enables precise auditing of data loss
    and facilitates the debugging of upstream data quality issues.

    Attributes
    ----------
    bad_date_ids : List[str]
        A list of transaction IDs excluded because their date strings could not
        be parsed into valid ISO-8601 datetime objects.
    non_numeric_value_ids : List[str]
        A list of transaction IDs excluded because their value fields could not
        be coerced into valid floating-point numbers.
    zero_or_negative_value_ids : List[str]
        A list of transaction IDs excluded because their financial value was
        less than or equal to zero (violating the V(t) > 0 constraint).
    out_of_scope_ids : List[str]
        A list of transaction IDs excluded because their transaction year fell
        outside the study's defined temporal scope (1967-2025).
    """
    # List of IDs with unparseable dates
    bad_date_ids: List[str] = field(default_factory=list)

    # List of IDs with non-numeric value fields
    non_numeric_value_ids: List[str] = field(default_factory=list)

    # List of IDs with zero or negative values
    zero_or_negative_value_ids: List[str] = field(default_factory=list)

    # List of IDs outside the temporal scope
    out_of_scope_ids: List[str] = field(default_factory=list)

    def total_excluded(self) -> int:
        """
        Computes the total number of transactions excluded across all categories.

        Returns
        -------
        int
            The sum of the lengths of all exclusion lists.
        """
        # Sum the lengths of all exclusion lists to get the total count
        return (len(self.bad_date_ids) +
                len(self.non_numeric_value_ids) +
                len(self.zero_or_negative_value_ids) +
                len(self.out_of_scope_ids))

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Parse and validate transaction dates
# -------------------------------------------------------------------------------------------------------------------------------

def parse_transaction_dates(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Parses transaction dates to datetime objects and extracts the year.
    Quarantines rows with unparseable dates.

    Parameters
    ----------
    df : pd.DataFrame
        The raw transactions DataFrame. Must contain 'transaction_transaction_date_iso_date'
        and 'transaction_id'.

    Returns
    -------
    Tuple[pd.DataFrame, List[str]]
        1. The DataFrame with a new 'transaction_year' column and valid dates.
        2. A list of transaction_ids excluded due to invalid dates.
    """
    # Create a copy to avoid mutating the input
    df_out = df.copy()

    # Attempt strict ISO-8601 parsing
    # errors='coerce' turns unparseable strings into NaT
    df_out['parsed_date'] = pd.to_datetime(
        df_out['transaction_transaction_date_iso_date'],
        errors='coerce'
    )

    # Identify failures
    # Rows where date is NaT are invalid
    mask_invalid = df_out['parsed_date'].isna()
    bad_date_ids = df_out.loc[mask_invalid, 'transaction_id'].astype(str).tolist()

    # Filter to keep only valid rows
    df_valid = df_out.loc[~mask_invalid].copy()

    # Extract year as integer
    # We use .dt.year which returns float (due to NaNs potentially), so we cast to int
    df_valid['transaction_year'] = df_valid['parsed_date'].dt.year.astype(int)

    logger.info(f"Date Parsing: {len(bad_date_ids)} rows excluded due to invalid dates.")

    return df_valid, bad_date_ids


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Apply paper-stated value filter
# -------------------------------------------------------------------------------------------------------------------------------

def apply_value_filter(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, List[str], List[str]]:
    """
    Applies the manuscript's value filter: V(t) > 0.
    Handles non-numeric values and zero/negative values separately.

    Parameters
    ----------
    df : pd.DataFrame
        The transactions DataFrame with valid dates. Must contain 'transaction_value'.

    Returns
    -------
    Tuple[pd.DataFrame, List[str], List[str]]
        1. The filtered DataFrame containing only positive financial transactions.
        2. A list of transaction_ids excluded due to non-numeric values.
        3. A list of transaction_ids excluded due to zero or negative values.
    """
    # Coerce transaction_value to numeric
    # errors='coerce' turns non-numeric strings into NaN
    numeric_values = pd.to_numeric(df['transaction_value'], errors='coerce')

    # Identify non-numeric failures
    mask_non_numeric = numeric_values.isna()
    non_numeric_ids = df.loc[mask_non_numeric, 'transaction_id'].astype(str).tolist()

    # Create a working dataframe with the coerced numeric column
    # We use .loc to ensure we are working with a clean subset
    df_numeric = df.loc[~mask_non_numeric].copy()
    df_numeric['transaction_value_numeric'] = numeric_values[~mask_non_numeric]

    # Apply strict positive filter: V > 0
    # Manuscript: "filtered for nonzero financial transactions" -> interpreted as strictly positive
    # to exclude reversals/adjustments in baseline.
    mask_positive = df_numeric['transaction_value_numeric'] > 0

    # Identify zero/negative exclusions
    zero_neg_ids = df_numeric.loc[~mask_positive, 'transaction_id'].astype(str).tolist()

    # Filter to keep only positive rows
    df_final = df_numeric.loc[mask_positive].copy()

    # Replace the original string column with the numeric one for downstream use
    # (or keep both, but usually we want the numeric one for analysis)
    df_final['transaction_value'] = df_final['transaction_value_numeric']
    df_final.drop(columns=['transaction_value_numeric'], inplace=True)

    logger.info(f"Value Filter: {len(non_numeric_ids)} non-numeric, {len(zero_neg_ids)} zero/negative excluded.")

    return df_final, non_numeric_ids, zero_neg_ids


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Apply temporal scope filter
# -------------------------------------------------------------------------------------------------------------------------------

def apply_scope_filter(
    df: pd.DataFrame,
    start_year: int,
    end_year: int
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Applies the temporal scope filter: start_year <= year <= end_year.

    Parameters
    ----------
    df : pd.DataFrame
        The transactions DataFrame with 'transaction_year'.
    start_year : int
        The inclusive start year (e.g., 1967).
    end_year : int
        The inclusive end year (e.g., 2025).

    Returns
    -------
    Tuple[pd.DataFrame, List[str]]
        1. The filtered DataFrame within the temporal scope.
        2. A list of transaction_ids excluded due to being out of scope.
    """
    # Apply range filter
    mask_in_scope = (df['transaction_year'] >= start_year) & (df['transaction_year'] <= end_year)

    # Identify exclusions
    out_of_scope_ids = df.loc[~mask_in_scope, 'transaction_id'].astype(str).tolist()

    # Filter DataFrame
    df_final = df.loc[mask_in_scope].copy()

    logger.info(f"Scope Filter: {len(out_of_scope_ids)} rows excluded (outside {start_year}-{end_year}).")

    return df_final, out_of_scope_ids


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_transactions_temporal_value(
    df_transactions_raw: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, TransactionExclusions]:
    """
    Orchestrator for Task 4. Executes the temporal and value-based cleansing pipeline
    for transactions.

    Parameters
    ----------
    df_transactions_raw : pd.DataFrame
        The raw transactions DataFrame.
    config : Dict[str, Any]
        The master study configuration dictionary containing scope parameters.

    Returns
    -------
    Tuple[pd.DataFrame, TransactionExclusions]
        1. The cleansed DataFrame (T++), filtered for valid dates, positive values, and scope.
        2. An exclusions ledger object tracking all dropped rows.
    """
    logger.info("Starting Task 4: Temporal and Value Cleansing...")

    # Initialize exclusions tracker
    exclusions = TransactionExclusions()

    # Step 1: Parse Dates
    df_dated, bad_date_ids = parse_transaction_dates(df_transactions_raw)
    exclusions.bad_date_ids = bad_date_ids

    # Step 2: Apply Value Filter
    df_valued, non_numeric_ids, zero_neg_ids = apply_value_filter(df_dated)
    exclusions.non_numeric_value_ids = non_numeric_ids
    exclusions.zero_or_negative_value_ids = zero_neg_ids

    # Step 3: Apply Scope Filter
    start_year = config["SCOPE"]["start_year"]
    end_year = config["SCOPE"]["end_year"]
    df_final, out_of_scope_ids = apply_scope_filter(df_valued, start_year, end_year)
    exclusions.out_of_scope_ids = out_of_scope_ids

    logger.info(f"Task 4 Completed. Total rows retained: {len(df_final)}. "
                f"Total excluded: {exclusions.total_excluded()}.")

    return df_final, exclusions


In [None]:
# Task 5 — Cleanse df_transactions_raw (endpoint validity and deduplication)

# ==============================================================================
# Task 5: Cleanse df_transactions_raw (endpoint validity and deduplication)
# ==============================================================================

@dataclass
class EndpointExclusions:
    """
    Container for tracking rows excluded during endpoint validation and deduplication.

    This dataclass serves as a structured ledger for all transactions dropped
    during the endpoint validation and deduplication phase. It categorizes exclusions
    based on the specific data quality failure (missing provider, missing receiver,
    or duplicate record), enabling precise auditing of the graph construction process.
    Ensuring that every edge in the network has valid source and target nodes is
    critical for the topological integrity of the subsequent bipartite projection.

    Attributes
    ----------
    missing_provider_ids : List[str]
        A list of transaction IDs excluded because they lacked a valid provider
        identifier (both `transaction_provider_org_ref` and `transaction_provider_org_name`
        were null or empty).
    missing_receiver_ids : List[str]
        A list of transaction IDs excluded because they lacked a valid receiver
        identifier (both `transaction_receiver_org_ref` and `transaction_receiver_org_name`
        were null or empty), despite having a valid provider.
    duplicate_transaction_ids : List[str]
        A list of transaction IDs excluded because they were identified as duplicates
        of an existing record based on the `transaction_id` field, following a
        deterministic sort.
    """
    # List of IDs excluded due to missing provider information
    missing_provider_ids: List[str] = field(default_factory=list)

    # List of IDs excluded due to missing receiver information
    missing_receiver_ids: List[str] = field(default_factory=list)

    # List of IDs excluded due to duplication
    duplicate_transaction_ids: List[str] = field(default_factory=list)

    def total_excluded(self) -> int:
        """
        Computes the total number of transactions excluded across all categories.

        Returns
        -------
        int
            The sum of the lengths of all exclusion lists.
        """
        # Sum the lengths of all exclusion lists to get the total count
        return (len(self.missing_provider_ids) +
                len(self.missing_receiver_ids) +
                len(self.duplicate_transaction_ids))

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1: Enforce endpoint presence policy
# -------------------------------------------------------------------------------------------------------------------------------

def enforce_endpoint_validity(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, List[str], List[str]]:
    """
    Enforces that every transaction has at least one valid provider identifier
    (ref or name) and at least one valid receiver identifier (ref or name).

    Parameters
    ----------
    df : pd.DataFrame
        The filtered transactions DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, List[str], List[str]]
        1. The DataFrame with valid endpoints.
        2. List of transaction_ids excluded due to missing provider info.
        3. List of transaction_ids excluded due to missing receiver info.
    """
    # Work on a copy to avoid side effects
    df_out = df.copy()

    # Normalize endpoint columns: trim whitespace, convert empty strings to NaN
    endpoint_cols = [
        'transaction_provider_org_ref', 'transaction_provider_org_name',
        'transaction_receiver_org_ref', 'transaction_receiver_org_name'
    ]

    for col in endpoint_cols:
        if col in df_out.columns:
            # Ensure string type, strip whitespace
            df_out[col] = df_out[col].astype(str).str.strip()
            # Replace empty strings and 'nan' strings with actual NaN
            df_out[col] = df_out[col].replace({'': np.nan, 'nan': np.nan, 'None': np.nan})

    # Define validity conditions
    # Provider valid if Ref is present OR Name is present
    has_provider = df_out['transaction_provider_org_ref'].notna() | df_out['transaction_provider_org_name'].notna()

    # Receiver valid if Ref is present OR Name is present
    has_receiver = df_out['transaction_receiver_org_ref'].notna() | df_out['transaction_receiver_org_name'].notna()

    # Identify exclusions
    missing_provider_ids = df_out.loc[~has_provider, 'transaction_id'].astype(str).tolist()

    # For receiver exclusions, we only count those that HAVE a provider but lack a receiver
    # (rows missing both are already caught by missing_provider check logic if we filter sequentially,
    # but here we calculate masks independently. To avoid double counting in lists, we can be specific).
    # However, standard practice is to report the primary reason.
    # Let's define:
    # - Missing Provider: dropped immediately.
    # - Missing Receiver: dropped if Provider exists but Receiver doesn't.
    # Rows to keep must have BOTH
    keep_mask = has_provider & has_receiver

    # Calculate specific exclusion lists based on the keep_mask logic
    # Rows failing provider check
    missing_provider_ids = df_out.loc[~has_provider, 'transaction_id'].astype(str).tolist()

    # Rows passing provider check but failing receiver check
    missing_receiver_ids = df_out.loc[has_provider & ~has_receiver, 'transaction_id'].astype(str).tolist()

    # Filter DataFrame
    df_valid = df_out.loc[keep_mask].copy()

    logger.info(f"Endpoint Validity: {len(missing_provider_ids)} missing provider, "
                f"{len(missing_receiver_ids)} missing receiver.")

    return df_valid, missing_provider_ids, missing_receiver_ids


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 2: Deduplicate on transaction identity
# -------------------------------------------------------------------------------------------------------------------------------

def deduplicate_transactions(
    df: pd.DataFrame,
    sort_policy: Any  # Typed as Any to avoid circular import of SortPolicy class, but effectively SortPolicy
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Deduplicates transactions based on 'transaction_id', keeping the first occurrence
    after applying a deterministic sort order.

    Parameters
    ----------
    df : pd.DataFrame
        The transactions DataFrame with valid endpoints.
    sort_policy : SortPolicy
        The sorting policy object defining columns and order for deterministic sorting.

    Returns
    -------
    Tuple[pd.DataFrame, List[str]]
        1. The deduplicated DataFrame.
        2. List of transaction_ids that were removed as duplicates.
    """
    # Apply deterministic sort
    # We assume sort_policy has .sort_columns and .ascending attributes
    try:
        df_sorted = df.sort_values(
            by=sort_policy.sort_columns,
            ascending=sort_policy.ascending
        )
    except KeyError as e:
        # Fallback if sort columns are missing (e.g. during testing with partial data)
        logger.warning(f"Sort columns missing: {e}. Falling back to 'transaction_id'.")
        df_sorted = df.sort_values(by=['transaction_id'])

    # Identify duplicates
    # keep='first' retains the first row in the sorted order
    is_duplicate = df_sorted.duplicated(subset=['transaction_id'], keep='first')

    # Extract IDs of removed rows
    # Note: This list might contain the same ID multiple times if a transaction ID
    # appears 3+ times. This is correct behavior for an exclusion ledger (one entry per dropped row).
    duplicate_ids = df_sorted.loc[is_duplicate, 'transaction_id'].astype(str).tolist()

    # Filter
    df_deduped = df_sorted.loc[~is_duplicate].copy()

    logger.info(f"Deduplication: {len(duplicate_ids)} duplicate rows removed.")

    return df_deduped, duplicate_ids


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Retain but do not filter on transaction type code
# -------------------------------------------------------------------------------------------------------------------------------

def verify_transaction_type_retention(df: pd.DataFrame) -> None:
    """
    Verifies that the 'transaction_transaction_type_code' column is present
    and logs that no filtering has been applied to it, consistent with baseline requirements.

    Parameters
    ----------
    df : pd.DataFrame
        The deduplicated transactions DataFrame.
    """
    if 'transaction_transaction_type_code' in df.columns:
        unique_types = df['transaction_transaction_type_code'].unique()
        logger.info(f"Transaction Type Retention: Column present. "
                    f"Contains {len(unique_types)} unique types. No filtering applied.")
    else:
        logger.warning("Transaction Type Retention: 'transaction_transaction_type_code' column missing.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_transactions_endpoints_dedup(
    df_transactions_filtered: pd.DataFrame,
    sort_policy: Any
) -> Tuple[pd.DataFrame, EndpointExclusions]:
    """
    Orchestrator for Task 5. Executes endpoint validation and deterministic deduplication.

    Parameters
    ----------
    df_transactions_filtered : pd.DataFrame
        The transactions DataFrame resulting from Task 4 (temporal/value cleansed).
    sort_policy : SortPolicy
        The sorting policy for transactions to ensure deterministic deduplication.

    Returns
    -------
    Tuple[pd.DataFrame, EndpointExclusions]
        1. The fully cleansed transactions DataFrame ready for joining.
        2. An exclusions ledger object tracking dropped rows.
    """
    logger.info("Starting Task 5: Endpoint Validity and Deduplication...")

    exclusions = EndpointExclusions()

    # Step 1: Enforce Endpoint Validity
    df_valid, missing_prov, missing_recv = enforce_endpoint_validity(df_transactions_filtered)
    exclusions.missing_provider_ids = missing_prov
    exclusions.missing_receiver_ids = missing_recv

    # Step 2: Deduplicate
    df_deduped, dup_ids = deduplicate_transactions(df_valid, sort_policy)
    exclusions.duplicate_transaction_ids = dup_ids

    # Step 3: Verify Type Retention
    verify_transaction_type_retention(df_deduped)

    logger.info(f"Task 5 Completed. Rows remaining: {len(df_deduped)}. "
                f"Total excluded: {exclusions.total_excluded()}.")

    return df_deduped, exclusions


In [None]:
# Task 6 — Cleanse df_activities_raw (join backbone and multi-value normalization)

# ===============================================================================
# Task 6: Cleanse df_activities_raw (join backbone and multi-value normalization)
# ===============================================================================

@dataclass
class ActivityExclusions:
    """
    Container for tracking rows excluded during activity cleansing.

    This dataclass serves as a structured ledger for all activity records dropped
    during the cleansing and normalization phase. It specifically tracks exclusions
    due to missing primary keys (`iati_identifier`) and duplicate records.
    Maintaining accurate counts of these exclusions is essential for verifying
    the integrity of the activity backbone, which serves as the join target for
    transactional data.

    Attributes
    ----------
    missing_identifier_count : int, default=0
        The number of rows excluded because the `iati_identifier` field was null
        or missing. These rows cannot be joined to transactions and are thus
        invalid.
    duplicate_identifier_count : int, default=0
        The number of rows excluded because they shared an `iati_identifier` with
        another row that was retained (based on deterministic sorting). Duplicate
        activities can cause Cartesian explosions during joins if not removed.
    """
    # Count of rows with missing IATI identifiers
    missing_identifier_count: int = 0

    # Count of duplicate rows removed
    duplicate_identifier_count: int = 0

    def total_excluded(self) -> int:
        """
        Computes the total number of activity rows excluded.

        Returns
        -------
        int
            The sum of missing identifier counts and duplicate identifier counts.
        """
        # Sum the counts to get the total number of excluded rows
        return self.missing_identifier_count + self.duplicate_identifier_count

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1: Validate activity identifier integrity
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_activity_backbone(
    df: pd.DataFrame,
    sort_policy: Any
) -> Tuple[pd.DataFrame, ActivityExclusions]:
    """
    Validates activity identifiers, removes nulls, and deduplicates deterministically.

    Parameters
    ----------
    df : pd.DataFrame
        The raw activities DataFrame.
    sort_policy : SortPolicy
        The sorting policy for activities to ensure deterministic deduplication.

    Returns
    -------
    Tuple[pd.DataFrame, ActivityExclusions]
        1. The cleaned activity backbone DataFrame.
        2. An exclusions ledger object.
    """
    exclusions = ActivityExclusions()

    # 1. Remove rows with missing iati_identifier
    initial_count = len(df)
    df_clean = df.dropna(subset=['iati_identifier']).copy()
    exclusions.missing_identifier_count = initial_count - len(df_clean)

    # 2. Deduplicate deterministically
    # Apply sort
    try:
        df_sorted = df_clean.sort_values(
            by=sort_policy.sort_columns,
            ascending=sort_policy.ascending
        )
    except KeyError as e:
        logger.warning(f"Sort columns missing: {e}. Falling back to 'iati_identifier'.")
        df_sorted = df_clean.sort_values(by=['iati_identifier'])

    # Drop duplicates, keeping first
    # We check for duplicates in iati_identifier
    is_dup = df_sorted.duplicated(subset=['iati_identifier'], keep='first')
    exclusions.duplicate_identifier_count = is_dup.sum()

    df_final = df_sorted.loc[~is_dup].copy()

    logger.info(f"Activity Backbone: {exclusions.missing_identifier_count} missing IDs, "
                f"{exclusions.duplicate_identifier_count} duplicates removed.")

    return df_final, exclusions


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 2: Normalize multi-valued recipient country field
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_activity_countries(
    df: pd.DataFrame
) -> pd.DataFrame:
    """
    Normalizes the 'recipient_country_code' field by exploding multi-valued entries
    into a long-form DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        The cleaned activity backbone DataFrame.

    Returns
    -------
    pd.DataFrame
        A DataFrame with columns ['iati_identifier', 'recipient_country_code'],
        where each row represents a unique (activity, country) pair.
    """
    # Select relevant columns
    # We use .copy() to ensure we don't affect the backbone
    df_countries = df[['iati_identifier', 'recipient_country_code']].copy()

    # Drop nulls immediately to avoid exploding NaNs
    df_countries = df_countries.dropna(subset=['recipient_country_code'])

    # Helper to normalize input to list
    def to_list(val: Union[str, List, Any]) -> List[str]:
        if isinstance(val, list):
            return [str(x).strip() for x in val if x]
        if isinstance(val, str):
            # Assume semicolon delimiter for strings, common in IATI
            return [x.strip() for x in val.split(';') if x.strip()]
        return [str(val).strip()]

    # Apply normalization
    df_countries['recipient_country_code'] = df_countries['recipient_country_code'].apply(to_list)

    # Explode
    df_exploded = df_countries.explode('recipient_country_code')

    # Ensure clean strings
    df_exploded['recipient_country_code'] = df_exploded['recipient_country_code'].astype(str)

    # Remove duplicates if an activity listed the same country twice
    df_final = df_exploded.drop_duplicates()

    logger.info(f"Normalized Countries: {len(df_final)} rows generated from {len(df)} activities.")

    return df_final


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Normalize multi-valued sector field
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_activity_sectors(
    df: pd.DataFrame
) -> pd.DataFrame:
    """
    Normalizes the 'sector_code' field by exploding multi-valued entries
    into a long-form DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        The cleaned activity backbone DataFrame.

    Returns
    -------
    pd.DataFrame
        A DataFrame with columns ['iati_identifier', 'sector_code'],
        where each row represents a unique (activity, sector) pair.
    """
    # Select relevant columns
    df_sectors = df[['iati_identifier', 'sector_code']].copy()

    # Drop nulls
    df_sectors = df_sectors.dropna(subset=['sector_code'])

    # Helper to normalize input to list (same logic as countries)
    def to_list(val: Union[str, List, Any]) -> List[str]:
        if isinstance(val, list):
            return [str(x).strip() for x in val if x]
        if isinstance(val, str):
            return [x.strip() for x in val.split(';') if x.strip()]
        return [str(val).strip()]

    # Apply normalization
    df_sectors['sector_code'] = df_sectors['sector_code'].apply(to_list)

    # Explode
    df_exploded = df_sectors.explode('sector_code')

    # Ensure clean strings
    df_exploded['sector_code'] = df_exploded['sector_code'].astype(str)

    # Remove duplicates
    df_final = df_exploded.drop_duplicates()

    logger.info(f"Normalized Sectors: {len(df_final)} rows generated from {len(df)} activities.")

    return df_final


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_activities_normalization(
    df_activities_raw: pd.DataFrame,
    sort_policy: Any
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, ActivityExclusions]:
    """
    Orchestrator for Task 6. Cleanses the activity backbone and normalizes
    multi-valued context fields (countries, sectors).

    Parameters
    ----------
    df_activities_raw : pd.DataFrame
        The raw activities DataFrame.
    sort_policy : SortPolicy
        The sorting policy for activities.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, ActivityExclusions]
        1. The cleaned activity backbone DataFrame (unique iati_identifier).
        2. The normalized countries DataFrame (long form).
        3. The normalized sectors DataFrame (long form).
        4. An exclusions ledger object.
    """
    logger.info("Starting Task 6: Activity Cleansing and Normalization...")

    # Step 1: Cleanse Backbone
    df_backbone, exclusions = cleanse_activity_backbone(df_activities_raw, sort_policy)

    # Step 2: Normalize Countries
    df_countries = normalize_activity_countries(df_backbone)

    # Step 3: Normalize Sectors
    df_sectors = normalize_activity_sectors(df_backbone)

    logger.info("Task 6 Completed Successfully.")

    return df_backbone, df_countries, df_sectors, exclusions


In [None]:
# Task 7 — Cleanse df_activities_raw (instrument type normalization)

# ==============================================================================
# Task 7: Cleanse df_activities_raw (instrument type normalization)
# ==============================================================================

@dataclass
class InstrumentExclusions:
    """
    Container for tracking rows excluded during instrument normalization.

    This dataclass serves as a structured ledger for tracking the number of
    activity records that contained invalid or unmappable instrument types.
    By quantifying these exclusions, we can assess the quality of the
    financial instrument metadata and ensure that downstream analyses (such as
    longitudinal instrument evolution) are based on a clean, standardized
    taxonomy {Grant, Loan, Equity}.

    Attributes
    ----------
    bad_instrument_type_count : int, default=0
        The count of activity rows where the `instrument_type` field contained
        a value that could not be mapped to the controlled vocabulary
        {Grant, Loan, Equity} and was consequently set to NaN.
    """
    # Count of invalid instrument types found
    bad_instrument_type_count: int = 0

    def total_excluded(self) -> int:
        """
        Returns the total count of instrument exclusions.

        Returns
        -------
        int
            The number of rows with invalid instrument types.
        """
        # Return the tracked count
        return self.bad_instrument_type_count

@dataclass
class ActivityCoverageReport:
    """
    Container for activity-level coverage statistics.

    This dataclass encapsulates key data quality metrics regarding the completeness
    of activity metadata. It tracks the proportion of activities that have valid
    instrument types, recipient countries, and sector codes. These metrics are
    critical for understanding the potential bias in downstream projections (e.g.,
    co-occurrence networks) where missing context data leads to exclusion from
    specific analyses.

    Attributes
    ----------
    total_activities : int
        The total number of unique activities in the cleaned backbone.
    instrument_null_rate : float
        The fraction of activities (0.0 to 1.0) that lack a valid instrument type.
    country_coverage_rate : float
        The fraction of activities (0.0 to 1.0) that are associated with at least
        one valid recipient country.
    sector_coverage_rate : float
        The fraction of activities (0.0 to 1.0) that are associated with at least
        one valid sector code.
    """
    # Total count of unique activities
    total_activities: int

    # Fraction of activities missing instrument type
    instrument_null_rate: float

    # Fraction of activities with at least one country
    country_coverage_rate: float

    # Fraction of activities with at least one sector
    sector_coverage_rate: float

    def __str__(self) -> str:
        """
        Returns a formatted string representation of the coverage report.

        Returns
        -------
        str
            A human-readable summary of the coverage statistics.
        """
        # Format metrics as percentages for readability
        return (f"Activity Coverage: N={self.total_activities}, "
                f"Null Inst={self.instrument_null_rate:.2%}, "
                f"Country Cov={self.country_coverage_rate:.2%}, "
                f"Sector Cov={self.sector_coverage_rate:.2%}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1: Enforce instrument taxonomy consistent with Extended Data Fig. S1 categories
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_instrument_types(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, InstrumentExclusions]:
    """
    Normalizes the 'instrument_type' column to a controlled vocabulary:
    {Grant, Loan, Equity}. Quarantines unmappable values.

    Parameters
    ----------
    df : pd.DataFrame
        The cleaned activity backbone DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, InstrumentExclusions]
        1. The DataFrame with normalized instrument types (invalid set to NaN).
        2. An exclusions ledger object tracking unmappable counts.
    """
    # Work on a copy
    df_out = df.copy()

    # Allowed taxonomy
    allowed_types = {'Grant', 'Loan', 'Equity'}

    # Normalize: Title Case, strip whitespace
    # Handle non-string types gracefully
    if 'instrument_type' in df_out.columns:
        # Convert to string, strip, title case
        # We treat 'nan', 'None' as actual NaN first to avoid stringifying them
        df_out['instrument_type'] = df_out['instrument_type'].replace({np.nan: None})

        # Apply normalization only to non-nulls
        mask_notna = df_out['instrument_type'].notna()
        df_out.loc[mask_notna, 'instrument_type'] = (
            df_out.loc[mask_notna, 'instrument_type']
            .astype(str)
            .str.strip()
            .str.title()
        )

        # Check validity
        # Valid if in allowed set OR is null (we allow nulls, just track them as missing later)
        # The task says "Quarantine unmappable rows with reason BAD_INSTRUMENT_TYPE".
        # This implies we should set them to NaN and count them as "bad" (distinct from originally missing).

        # Identify values that are NOT null and NOT in allowed set
        current_values = df_out['instrument_type']
        is_valid = current_values.isin(allowed_types) | current_values.isna()

        bad_count = (~is_valid).sum()

        # Set invalid to NaN
        df_out.loc[~is_valid, 'instrument_type'] = np.nan

        exclusions = InstrumentExclusions(bad_instrument_type_count=bad_count)

        logger.info(f"Instrument Normalization: {bad_count} invalid types set to NaN.")

    else:
        logger.warning("Instrument Normalization: 'instrument_type' column missing.")
        exclusions = InstrumentExclusions(bad_instrument_type_count=0)
        df_out['instrument_type'] = np.nan

    return df_out, exclusions


# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 2 & 3: Compute activity-level coverage statistics
# -------------------------------------------------------------------------------------------------------------------------------

def compute_activity_coverage(
    df_backbone: pd.DataFrame,
    df_countries: pd.DataFrame,
    df_sectors: pd.DataFrame
) -> ActivityCoverageReport:
    """
    Computes coverage statistics for instruments, countries, and sectors across
    the unique activity set.

    Parameters
    ----------
    df_backbone : pd.DataFrame
        The cleaned activity backbone DataFrame (unique iati_identifier).
    df_countries : pd.DataFrame
        The normalized countries DataFrame (long form).
    df_sectors : pd.DataFrame
        The normalized sectors DataFrame (long form).

    Returns
    -------
    ActivityCoverageReport
        A structured report of coverage rates.
    """
    total_activities = len(df_backbone)

    if total_activities == 0:
        return ActivityCoverageReport(0, 0.0, 0.0, 0.0)

    # 1. Instrument Null Rate
    # Count nulls in backbone
    n_null_inst = df_backbone['instrument_type'].isna().sum()
    inst_null_rate = n_null_inst / total_activities

    # 2. Country Coverage Rate
    # Count unique IDs in df_countries that exist in backbone
    # (They should all exist if derived from backbone, but we check intersection for rigor)
    ids_with_country = set(df_countries['iati_identifier'].unique())
    ids_in_backbone = set(df_backbone['iati_identifier'].unique())

    n_with_country = len(ids_with_country.intersection(ids_in_backbone))
    country_cov_rate = n_with_country / total_activities

    # 3. Sector Coverage Rate
    ids_with_sector = set(df_sectors['iati_identifier'].unique())
    n_with_sector = len(ids_with_sector.intersection(ids_in_backbone))
    sector_cov_rate = n_with_sector / total_activities

    report = ActivityCoverageReport(
        total_activities=total_activities,
        instrument_null_rate=inst_null_rate,
        country_coverage_rate=country_cov_rate,
        sector_coverage_rate=sector_cov_rate
    )

    logger.info(str(report))

    return report


# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_activities_instruments_coverage(
    df_backbone: pd.DataFrame,
    df_countries: pd.DataFrame,
    df_sectors: pd.DataFrame
) -> Tuple[pd.DataFrame, InstrumentExclusions, ActivityCoverageReport]:
    """
    Orchestrator for Task 7. Normalizes instruments and computes final activity coverage metrics.

    Parameters
    ----------
    df_backbone : pd.DataFrame
        The cleaned activity backbone from Task 6.
    df_countries : pd.DataFrame
        The normalized countries from Task 6.
    df_sectors : pd.DataFrame
        The normalized sectors from Task 6.

    Returns
    -------
    Tuple[pd.DataFrame, InstrumentExclusions, ActivityCoverageReport]
        1. The backbone DataFrame with normalized instruments.
        2. An exclusions ledger for instruments.
        3. A coverage report.
    """
    logger.info("Starting Task 7: Instrument Normalization and Coverage Analysis...")

    # Step 1: Normalize Instruments
    df_final, exclusions = normalize_instrument_types(df_backbone)

    # Step 2 & 3: Compute Coverage
    coverage_report = compute_activity_coverage(df_final, df_countries, df_sectors)

    logger.info("Task 7 Completed Successfully.")

    return df_final, exclusions, coverage_report


In [None]:
# Task 8 — Cleanse df_organisations_raw (entity resolution substrate)

# ==============================================================================
# Task 8: Cleanse df_organisations_raw (entity resolution substrate)
# ==============================================================================

@dataclass
class OrganisationExclusions:
    """
    Container for tracking rows excluded or flagged during organisation cleansing.

    This dataclass serves as a structured ledger for data quality issues identified
    within the organisation master table. It distinguishes between fatal errors
    that require row exclusion (missing primary keys) and non-fatal data quality
    issues (invalid type labels) that result in field nullification but retention
    of the record. This distinction preserves the maximum amount of usable
    entity data for the resolution process.

    Attributes
    ----------
    missing_ref_count : int, default=0
        The number of rows excluded because the `org_ref` field was null or missing.
        Since `org_ref` is the primary key for joining and entity resolution,
        records without it are unusable.
    bad_org_type_count : int, default=0
        The number of rows where the `org_type` field contained a value outside
        the controlled vocabulary. These fields are set to NaN, but the organisation
        record itself is retained.
    """
    # Count of rows dropped due to missing reference ID
    missing_ref_count: int = 0

    # Count of rows with invalid organisation type labels
    bad_org_type_count: int = 0

    def total_excluded(self) -> int:
        """
        Returns the total count of rows strictly excluded from the dataset.

        Returns
        -------
        int
            The count of rows missing a reference ID. Invalid types do not
            cause row exclusion, so they are not summed here.
        """
        # Only missing refs cause row exclusion; bad types just nullify the field
        return self.missing_ref_count

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1: Enforce organisation record integrity
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_domains(domain_series: pd.Series) -> pd.Series:
    """
    Normalizes website domains by stripping protocols, paths, and lowercasing.

    Parameters
    ----------
    domain_series : pd.Series
        Series containing raw URL/domain strings.

    Returns
    -------
    pd.Series
        Normalized domain strings.
    """
    # Convert to string, lowercase, strip whitespace
    s = domain_series.astype(str).str.lower().str.strip()

    # Remove protocol (http://, https://)
    s = s.str.replace(r'^https?://', '', regex=True)

    # Remove 'www.' prefix (optional, but good for canonicalization)
    # The task says "define subdomain policy". We will keep subdomains generally
    # but stripping 'www.' is standard practice for entity resolution.
    s = s.str.replace(r'^www\.', '', regex=True)

    # Remove trailing paths and slashes
    s = s.str.split('/').str[0]

    # Handle 'nan' strings resulting from astype(str) on nulls
    s = s.replace({'nan': np.nan, 'none': np.nan, '': np.nan})

    return s

def cleanse_organisation_records(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, OrganisationExclusions]:
    """
    Validates organisation records, normalizes names and domains, and removes
    rows without a reference ID.

    Parameters
    ----------
    df : pd.DataFrame
        The raw organisations DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, OrganisationExclusions]
        1. The cleaned organisations DataFrame.
        2. An exclusions ledger.
    """
    exclusions = OrganisationExclusions()

    # 1. Enforce org_ref presence
    initial_count = len(df)
    df_clean = df.dropna(subset=['org_ref']).copy()
    exclusions.missing_ref_count = initial_count - len(df_clean)

    # 2. Normalize Names
    if 'org_name' in df_clean.columns:
        # Unicode normalize (NFKD), lowercase, strip
        df_clean['org_name_normalized'] = (
            df_clean['org_name']
            .astype(str)
            .str.normalize('NFKD')
            .str.lower()
            .str.strip()
            .replace({'nan': np.nan, 'none': np.nan, '': np.nan})
        )
    else:
        logger.warning("Organisation Cleansing: 'org_name' column missing.")
        df_clean['org_name_normalized'] = np.nan

    # 3. Normalize Domains
    if 'website_domain' in df_clean.columns:
        df_clean['website_domain_normalized'] = normalize_domains(df_clean['website_domain'])
    else:
        logger.warning("Organisation Cleansing: 'website_domain' column missing.")
        df_clean['website_domain_normalized'] = np.nan

    logger.info(f"Organisation Records: {exclusions.missing_ref_count} missing refs removed.")

    return df_clean, exclusions


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 2: Validate organisation type taxonomy
# -------------------------------------------------------------------------------------------------------------------------------

def validate_org_taxonomy(
    df: pd.DataFrame,
    exclusions: OrganisationExclusions
) -> Tuple[pd.DataFrame, OrganisationExclusions]:
    """
    Enforces the controlled vocabulary for organisation types.

    Parameters
    ----------
    df : pd.DataFrame
        The cleaned organisations DataFrame.
    exclusions : OrganisationExclusions
        The existing exclusions ledger.

    Returns
    -------
    Tuple[pd.DataFrame, OrganisationExclusions]
        1. The DataFrame with validated 'org_type' (invalid set to NaN).
        2. The updated exclusions ledger.
    """
    df_out = df.copy()

    # Controlled vocabulary
    allowed_types = {
        "Government", "Multilateral", "Foundation", "Academic/Research",
        "International NGO", "National NGO", "Private Sector", "Other"
    }

    if 'org_type' in df_out.columns:
        # Normalize input to match vocabulary (Title Case)
        # We assume input is close to valid, just needs casing fix
        # If input is raw IATI codes (10, 15, etc.), a mapping step would be needed upstream.
        # Here we assume labels are present but potentially messy.
        # Convert to string, strip
        s = df_out['org_type'].astype(str).str.strip()

        # Check validity
        is_valid = s.isin(allowed_types)

        # Count invalid (excluding actual NaNs which become 'nan' string)
        # We treat 'nan' as missing, not invalid type per se, but for the count
        # we want to know how many *labels* were rejected.
        # If original was NaN, s is 'nan'. 'nan' is not in allowed_types.
        # So we should check if original was not null.
        was_not_null = df_out['org_type'].notna()
        bad_labels = was_not_null & (~is_valid)

        bad_count = bad_labels.sum()
        exclusions.bad_org_type_count = bad_count

        # Set invalid to NaN (keep valid ones)
        # We rely on the original column values if they matched, or we could standardize.
        # Let's standardize to the allowed set values (which they matched).
        # Since we only checked exact match (isin), the values are already standard.
        df_out.loc[~is_valid, 'org_type'] = np.nan

        logger.info(f"Org Taxonomy: {bad_count} invalid types set to NaN.")

    else:
        logger.warning("Org Taxonomy: 'org_type' column missing.")
        df_out['org_type'] = np.nan

    return df_out, exclusions


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 3: Prepare alias structure
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_aliases(
    df: pd.DataFrame
) -> pd.DataFrame:
    """
    Standardizes organisation aliases into a list of normalized strings.

    Parameters
    ----------
    df : pd.DataFrame
        The validated organisations DataFrame.

    Returns
    -------
    pd.DataFrame
        The DataFrame with a new 'aliases_normalized' column (List[str]).
    """
    df_out = df.copy()

    if 'org_name_aliases' not in df_out.columns:
        logger.info("Alias Preparation: 'org_name_aliases' missing. Creating empty lists.")
        df_out['aliases_normalized'] = [[] for _ in range(len(df_out))]
        return df_out

    def normalize_alias_list(val: Union[str, List, Any]) -> List[str]:
        raw_list = []
        if isinstance(val, list):
            raw_list = val
        elif isinstance(val, str):
            # Assume semicolon delimiter
            raw_list = val.split(';')
        else:
            return []

        # Normalize each alias
        normalized = []
        for alias in raw_list:
            s = str(alias).strip()
            if s:
                # Apply same normalization as org_name
                norm_s = (
                    pd.Series([s])
                    .str.normalize('NFKD')
                    .str.lower()
                    .str.strip()
                    .iloc[0]
                )
                normalized.append(norm_s)
        return list(set(normalized)) # Deduplicate

    df_out['aliases_normalized'] = df_out['org_name_aliases'].apply(normalize_alias_list)

    logger.info("Alias Preparation: Aliases normalized and deduplicated.")

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_organisations_substrate(
    df_organisations_raw: pd.DataFrame
) -> Tuple[pd.DataFrame, OrganisationExclusions]:
    """
    Orchestrator for Task 8. Prepares the organisation master table for entity resolution.

    Parameters
    ----------
    df_organisations_raw : pd.DataFrame
        The raw organisations DataFrame.

    Returns
    -------
    Tuple[pd.DataFrame, OrganisationExclusions]
        1. The fully cleansed organisations DataFrame.
        2. An exclusions ledger.
    """
    logger.info("Starting Task 8: Organisation Cleansing...")

    # Step 1: Record Integrity & Normalization
    df_clean, exclusions = cleanse_organisation_records(df_organisations_raw)

    # Step 2: Taxonomy Validation
    df_typed, exclusions = validate_org_taxonomy(df_clean, exclusions)

    # Step 3: Alias Preparation
    df_final = prepare_aliases(df_typed)

    logger.info("Task 8 Completed Successfully.")

    return df_final, exclusions


In [None]:
# Task 9 — Join transactions to activity contexts (countries, sectors, instruments)

# ================================================================================
# Task 9: Join transactions to activity contexts (countries, sectors, instruments)
# ================================================================================

@dataclass
class ContextCoverageReport:
    """
    Container for transaction-level context coverage statistics.

    This dataclass encapsulates the success rates of joining transaction records
    to their associated activity-level metadata (contexts). It tracks the total
    number of transactions and the subset that successfully matched to at least
    one recipient country, sector code, or instrument type. These metrics are
    vital for assessing the "projection eligibility" of the transaction corpus:
    transactions without contexts cannot contribute to the co-occurrence network
    projection, representing a potential source of topological bias.

    Attributes
    ----------
    total_transactions : int
        The total number of transactions in the cleansed dataset (T++).
    transactions_with_country : int
        The number of transactions that were successfully joined to at least one
        recipient country context.
    transactions_with_sector : int
        The number of transactions that were successfully joined to at least one
        sector code context.
    transactions_with_instrument : int
        The number of transactions that were successfully joined to a valid
        instrument type.
    """
    # Total count of transactions processed
    total_transactions: int

    # Count of transactions with valid country context
    transactions_with_country: int

    # Count of transactions with valid sector context
    transactions_with_sector: int

    # Count of transactions with valid instrument type
    transactions_with_instrument: int

    def __str__(self) -> str:
        """
        Returns a formatted string representation of the coverage report.

        Returns
        -------
        str
            A human-readable summary of the context coverage percentages.
        """
        # Calculate percentages for readability
        return (f"Context Coverage (N={self.total_transactions}): "
                f"Country={self.transactions_with_country/self.total_transactions:.2%}, "
                f"Sector={self.transactions_with_sector/self.total_transactions:.2%}, "
                f"Instrument={self.transactions_with_instrument/self.total_transactions:.2%}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 1: Left-join transactions to activity metadata
# -------------------------------------------------------------------------------------------------------------------------------

def join_activity_contexts(
    df_transactions: pd.DataFrame,
    df_activities_backbone: pd.DataFrame,
    df_activities_countries: pd.DataFrame,
    df_activities_sectors: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Joins transactions to activity contexts, producing separate tables to avoid
    Cartesian explosion between countries and sectors.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        The cleansed transactions DataFrame.
    df_activities_backbone : pd.DataFrame
        Activity backbone with 'instrument_type'.
    df_activities_countries : pd.DataFrame
        Long-form activity-country mapping.
    df_activities_sectors : pd.DataFrame
        Long-form activity-sector mapping.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
        1. df_tx_instruments: Transactions enriched with instrument_type (1:1).
        2. df_tx_countries: Transactions exploded by country (1:M).
        3. df_tx_sectors: Transactions exploded by sector (1:M).
    """
    # 1. Join Instruments (1:1 expected)
    # We only need iati_identifier and instrument_type from backbone
    df_tx_instruments = pd.merge(
        df_transactions,
        df_activities_backbone[['iati_identifier', 'instrument_type']],
        on='iati_identifier',
        how='left',
        validate='many_to_one' # Enforce that backbone is unique on join key
    )

    # 2. Join Countries (1:M)
    # We keep transaction_id to link back
    df_tx_countries = pd.merge(
        df_transactions[['transaction_id', 'iati_identifier']],
        df_activities_countries,
        on='iati_identifier',
        how='inner' # Inner join to keep only valid contexts; missing ones are just absent
    )

    # 3. Join Sectors (1:M)
    df_tx_sectors = pd.merge(
        df_transactions[['transaction_id', 'iati_identifier']],
        df_activities_sectors,
        on='iati_identifier',
        how='inner'
    )

    logger.info(f"Joins: Instruments N={len(df_tx_instruments)}, "
                f"Countries N={len(df_tx_countries)}, Sectors N={len(df_tx_sectors)}")

    return df_tx_instruments, df_tx_countries, df_tx_sectors


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 2: Quantify join coverage and enforce exclusion policies
# -------------------------------------------------------------------------------------------------------------------------------

def compute_context_coverage(
    df_transactions: pd.DataFrame,
    df_tx_instruments: pd.DataFrame,
    df_tx_countries: pd.DataFrame,
    df_tx_sectors: pd.DataFrame
) -> ContextCoverageReport:
    """
    Computes the fraction of transactions that successfully matched to contexts.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Base transactions.
    df_tx_instruments : pd.DataFrame
        Transactions with instruments.
    df_tx_countries : pd.DataFrame
        Transactions exploded by country.
    df_tx_sectors : pd.DataFrame
        Transactions exploded by sector.

    Returns
    -------
    ContextCoverageReport
        Coverage statistics.
    """
    total_tx = len(df_transactions)

    # Instruments: check non-null in the joined table
    # Note: df_tx_instruments has same row count as df_transactions (left join)
    n_with_inst = df_tx_instruments['instrument_type'].notna().sum()

    # Countries: count unique transaction_ids in the inner-joined table
    n_with_country = df_tx_countries['transaction_id'].nunique()

    # Sectors: count unique transaction_ids in the inner-joined table
    n_with_sector = df_tx_sectors['transaction_id'].nunique()

    report = ContextCoverageReport(
        total_transactions=total_tx,
        transactions_with_country=n_with_country,
        transactions_with_sector=n_with_sector,
        transactions_with_instrument=n_with_inst
    )

    logger.info(str(report))

    return report


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 3: Define the projection context function
# -------------------------------------------------------------------------------------------------------------------------------

def build_unified_projection_contexts(
    df_tx_countries: pd.DataFrame,
    df_tx_sectors: pd.DataFrame
) -> pd.DataFrame:
    """
    Constructs the unified context table for projection, applying namespacing
    to distinguish countries from sectors.

    Context Function: kappa(t) in {COUNTRY:c, SECTOR:s}

    Parameters
    ----------
    df_tx_countries : pd.DataFrame
        Transactions exploded by country.
    df_tx_sectors : pd.DataFrame
        Transactions exploded by sector.

    Returns
    -------
    pd.DataFrame
        A long-form DataFrame with columns ['transaction_id', 'context_id'].
    """
    # Prepare Country Contexts
    # Namespace: "COUNTRY:<code >"
    countries = df_tx_countries[['transaction_id', 'recipient_country_code']].copy()
    countries['context_id'] = 'COUNTRY:' + countries['recipient_country_code'].astype(str)
    countries = countries[['transaction_id', 'context_id']]

    # Prepare Sector Contexts
    # Namespace: "SECTOR:<code>"
    sectors = df_tx_sectors[['transaction_id', 'sector_code']].copy()
    sectors['context_id'] = 'SECTOR:' + sectors['sector_code'].astype(str)
    sectors = sectors[['transaction_id', 'context_id']]

    # Concatenate
    df_contexts = pd.concat([countries, sectors], ignore_index=True)

    # Deduplicate (just in case an activity listed same country/sector twice)
    df_contexts = df_contexts.drop_duplicates()

    logger.info(f"Unified Contexts: {len(df_contexts)} context rows generated.")

    return df_contexts


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def join_transactions_to_contexts(
    df_transactions: pd.DataFrame,
    df_activities_backbone: pd.DataFrame,
    df_activities_countries: pd.DataFrame,
    df_activities_sectors: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame, ContextCoverageReport]:
    """
    Orchestrator for Task 9. Joins transactions to activity metadata and builds
    the unified context table for projection.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Cleansed transactions.
    df_activities_backbone : pd.DataFrame
        Activity backbone.
    df_activities_countries : pd.DataFrame
        Activity countries.
    df_activities_sectors : pd.DataFrame
        Activity sectors.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, ContextCoverageReport]
        1. df_tx_instruments: Transactions enriched with instrument info (for time series).
        2. df_contexts: Unified long-form contexts (for projection).
        3. Coverage report.
    """
    logger.info("Starting Task 9: Context Joins...")

    # Step 1: Joins
    df_tx_inst, df_tx_ctry, df_tx_sect = join_activity_contexts(
        df_transactions,
        df_activities_backbone,
        df_activities_countries,
        df_activities_sectors
    )

    # Step 2: Coverage
    report = compute_context_coverage(
        df_transactions,
        df_tx_inst,
        df_tx_ctry,
        df_tx_sect
    )

    # Step 3: Unified Contexts
    df_contexts = build_unified_projection_contexts(df_tx_ctry, df_tx_sect)

    logger.info("Task 9 Completed Successfully.")

    return df_tx_inst, df_contexts, report


In [None]:
# Task 10 — Entity resolution: construct canonical organisation mapping

# ==============================================================================
# Task 10: Entity resolution: construct canonical organisation mapping
# ==============================================================================

@dataclass
class EntityResolutionConfig:
    """
    Configuration for the entity resolution process.

    This dataclass encapsulates the hyperparameters used to control the fuzzy
    matching algorithm. By explicitly defining the similarity threshold and
    n-gram range, we ensure that the entity resolution process is deterministic
    and reproducible. These parameters govern the sensitivity of the matching
    logic, balancing the risk of false positives (merging distinct entities)
    against false negatives (failing to merge variants).

    Attributes
    ----------
    similarity_threshold : float, default=0.90
        The minimum cosine similarity score (0.0 to 1.0) required to consider
        two organisation names as a match. A higher threshold yields stricter
        matching.
    ngram_range : Tuple[int, int], default=(3, 3)
        The range of n-gram sizes to use for TF-IDF vectorization. (3, 3)
        indicates using character trigrams, which is robust against minor
        spelling variations.
    use_precomputed : bool, default=False
        A flag indicating whether to bypass computation and use an existing
        `canonical_org_id` column if present and trusted.
    """
    # Minimum cosine similarity for a match
    similarity_threshold: float = 0.90

    # Range for character n-grams (min, max)
    ngram_range: Tuple[int, int] = (3, 3)

    # Flag to use existing mapping if available
    use_precomputed: bool = False

@dataclass
class CanonicalMappingArtifact:
    """
    Container for the final canonical mapping table and metadata.

    This dataclass serves as the primary output artifact of the entity resolution
    task. It contains the resolved mapping table that links raw organisation
    references to their canonical identities, along with metadata describing
    the resolution process. This artifact is essential for auditing the
    consolidation of the network nodes and ensuring traceability from raw data
    to the final graph topology.

    Attributes
    ----------
    mapping_table : pd.DataFrame
        A DataFrame containing the mapping between `org_ref` (raw) and
        `canonical_org_id` (resolved).
    resolution_method : str
        A description of the method used to generate the mapping (e.g.,
        "Precomputed" or "TF-IDF/Cosine Clustering").
    cluster_count : int
        The total number of unique canonical entities identified after resolution.
    """
    # The mapping DataFrame
    mapping_table: pd.DataFrame

    # Description of the resolution method
    resolution_method: str

    # Count of unique canonical entities
    cluster_count: int

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the mapping artifact.

        Returns
        -------
        str
            A human-readable summary including entry count, unique entities,
            and the method used.
        """
        # Format summary string
        return (f"Canonical Mapping: {len(self.mapping_table)} entries, "
                f"{self.cluster_count} unique entities. Method: {self.resolution_method}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1: Determine if canonical mapping is pre-provided
# -------------------------------------------------------------------------------------------------------------------------------

def check_precomputed_mapping(
    df_organisations: pd.DataFrame
) -> bool:
    """
    Checks if a trusted canonical mapping already exists in the input data.

    Parameters
    ----------
    df_organisations : pd.DataFrame
        The cleansed organisations DataFrame.

    Returns
    -------
    bool
        True if 'canonical_org_id' exists and is fully populated.
    """
    if 'canonical_org_id' in df_organisations.columns:
        null_rate = df_organisations['canonical_org_id'].isna().mean()
        if null_rate < 0.01: # Allow <1% missing, treat as fully populated
            logger.info("Entity Resolution: Precomputed 'canonical_org_id' found and trusted.")
            return True

    logger.info("Entity Resolution: No precomputed mapping found. Proceeding to compute.")
    return False


# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2: Implement fuzzy matching framework
# -------------------------------------------------------------------------------------------------------------------------------

def compute_fuzzy_clusters(
    df_organisations: pd.DataFrame,
    config: EntityResolutionConfig
) -> Dict[str, str]:
    """
    Computes fuzzy clusters of organisations using TF-IDF and Cosine Similarity.
    Returns a mapping from org_ref to a canonical cluster ID.

    Parameters
    ----------
    df_organisations : pd.DataFrame
        The cleansed organisations DataFrame. Must contain 'org_name_normalized'.
    config : EntityResolutionConfig
        Configuration parameters.

    Returns
    -------
    Dict[str, str]
        A dictionary mapping 'org_ref' -> 'canonical_org_id'.
    """
    # Filter for valid names
    df_active = df_organisations.dropna(subset=['org_name_normalized', 'org_ref']).copy()

    if len(df_active) == 0:
        return {}

    names = df_active['org_name_normalized'].tolist()
    refs = df_active['org_ref'].tolist()

    # Vectorize (Blocking strategy via TF-IDF on n-grams)
    vectorizer = TfidfVectorizer(
        analyzer='char',
        ngram_range=config.ngram_range,
        min_df=1  # Keep rare n-grams
    )
    tfidf_matrix = vectorizer.fit_transform(names)

    # Compute Similarity (Candidate generation)
    # For N=2500, dense cosine similarity is fast enough (2500^2 = 6.25M entries)
    # If N were larger (>10k), we would use sparse_dot_topn or Annoy.
    sim_matrix = cosine_similarity(tfidf_matrix)

    # Build Graph of Matches
    G = nx.Graph()
    # Add all nodes
    for ref in refs:
        G.add_node(ref)

    # Add edges for pairs exceeding threshold
    # We iterate upper triangle
    rows, cols = np.where(np.triu(sim_matrix, k=1) >= config.similarity_threshold)

    for r, c in zip(rows, cols):
        G.add_edge(refs[r], refs[c])

    # Extract Connected Components (Clusters)
    mapping = {}
    for component in nx.connected_components(G):
        # Deterministic canonical ID selection:
        # Sort by ref length (prefer shorter, often cleaner) or lexicographically
        # Here we sort lexicographically for stability
        members = sorted(list(component))
        canonical_id = members[0] # Pick first as representative

        for member in members:
            mapping[member] = canonical_id

    logger.info(f"Fuzzy Matching: Resolved {len(refs)} refs into {len(list(nx.connected_components(G)))} clusters.")

    return mapping


# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 3: Apply canonicalization to all organisation identities
# -------------------------------------------------------------------------------------------------------------------------------

def apply_canonical_mapping(
    df_organisations: pd.DataFrame,
    computed_mapping: Dict[str, str],
    use_precomputed: bool
) -> Tuple[pd.DataFrame, CanonicalMappingArtifact]:
    """
    Applies the canonical mapping to the organisation master table and generates
    the final artifact.

    Parameters
    ----------
    df_organisations : pd.DataFrame
        The cleansed organisations DataFrame.
    computed_mapping : Dict[str, str]
        The mapping dictionary from Step 2.
    use_precomputed : bool
        Whether to use the existing column.

    Returns
    -------
    Tuple[pd.DataFrame, CanonicalMappingArtifact]
        1. The updated organisations DataFrame with 'canonical_org_id'.
        2. The mapping artifact.
    """
    df_out = df_organisations.copy()

    if use_precomputed:
        # Ensure column exists and fill gaps if any (self-map)
        if 'canonical_org_id' not in df_out.columns:
             df_out['canonical_org_id'] = df_out['org_ref'] # Fallback

        df_out['canonical_org_id'] = df_out['canonical_org_id'].fillna(df_out['org_ref'])
        method = "Precomputed"
    else:
        # Apply computed mapping
        # Map org_ref -> canonical. If not in mapping (e.g. no name), map to self.
        df_out['canonical_org_id'] = df_out['org_ref'].map(computed_mapping).fillna(df_out['org_ref'])
        method = "TF-IDF/Cosine Clustering"

    # Create artifact table
    mapping_table = df_out[['org_ref', 'org_name', 'canonical_org_id']].copy()
    unique_entities = df_out['canonical_org_id'].nunique()

    artifact = CanonicalMappingArtifact(
        mapping_table=mapping_table,
        resolution_method=method,
        cluster_count=unique_entities
    )

    logger.info(f"Canonicalization Applied. {unique_entities} unique entities.")

    return df_out, artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def construct_canonical_mapping(
    df_organisations: pd.DataFrame,
    config_dict: Dict[str, Any]
) -> Tuple[pd.DataFrame, CanonicalMappingArtifact]:
    """
    Orchestrator for Task 10. Performs entity resolution to create a canonical
    organisation mapping.

    Parameters
    ----------
    df_organisations : pd.DataFrame
        The cleansed organisations DataFrame.
    config_dict : Dict[str, Any]
        The master config dictionary (for ER parameters).

    Returns
    -------
    Tuple[pd.DataFrame, CanonicalMappingArtifact]
        1. The organisations DataFrame enriched with 'canonical_org_id'.
        2. The mapping artifact.
    """
    logger.info("Starting Task 10: Entity Resolution...")

    # Extract config
    er_params = config_dict.get("ENTITY_RESOLUTION", {}).get("fuzzy_matching", {})
    # Default to 0.90 if unspecified
    threshold = er_params.get("threshold", 0.90)
    if threshold == "UNSPECIFIED_IN_PAPER":
        threshold = 0.90

    config = EntityResolutionConfig(similarity_threshold=float(threshold))

    # Step 1: Check Precomputed
    use_precomputed = check_precomputed_mapping(df_organisations)

    # Step 2: Compute Clusters (if needed)
    mapping_dict = {}
    if not use_precomputed:
        mapping_dict = compute_fuzzy_clusters(df_organisations, config)

    # Step 3: Apply Mapping
    df_final, artifact = apply_canonical_mapping(df_organisations, mapping_dict, use_precomputed)

    logger.info("Task 10 Completed Successfully.")

    return df_final, artifact


In [None]:
# Task 11 — Compute geographic transaction density (Fig. 1-type output)

# ==============================================================================
# Task 11: Compute geographic transaction density (Fig. 1-type output)
# ==============================================================================

@dataclass
class GeoDensityArtifact:
    """
    Container for geographic density results and metadata.

    This dataclass encapsulates the output of the geographic density computation,
    providing both the processed density table (ready for choropleth visualization)
    and the necessary coverage statistics to audit the mapping process. It ensures
    that the "global" nature of the aid map is quantified by tracking exactly
    how many transactions and countries are represented versus excluded.

    Attributes
    ----------
    density_table : pd.DataFrame
        A DataFrame containing transaction counts per country, with columns
        ['recipient_country_code', 'transaction_count', 'log_transaction_count'].
    total_transactions_mapped : int
        The count of unique transactions that were successfully associated with
        at least one recipient country and thus contribute to the density map.
    unique_countries_count : int
        The number of distinct recipient countries identified in the transaction
        corpus. This is compared against the manuscript's reported 230 territories.
    excluded_transaction_count : int
        The number of transactions excluded from the map because they lacked
        valid recipient country metadata.
    """
    # The computed density table
    density_table: pd.DataFrame

    # Count of mapped transactions
    total_transactions_mapped: int

    # Count of unique countries found
    unique_countries_count: int

    # Count of excluded transactions
    excluded_transaction_count: int

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the geographic density results.

        Returns
        -------
        str
            A human-readable summary of countries found and transaction coverage.
        """
        # Format summary string
        return (f"Geo Density: {self.unique_countries_count} countries, "
                f"{self.total_transactions_mapped} tx mapped, "
                f"{self.excluded_transaction_count} tx excluded.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1: Aggregate transaction counts by country
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_country_counts(
    df_tx_countries: pd.DataFrame
) -> pd.DataFrame:
    """
    Aggregates transaction counts per recipient country.
    Implements D(c) = sum(1[c(t) = c]).

    Parameters
    ----------
    df_tx_countries : pd.DataFrame
        Long-form transaction-country mapping (transaction_id, recipient_country_code).

    Returns
    -------
    pd.DataFrame
        Table with columns ['recipient_country_code', 'transaction_count'].
    """
    # Group by country and count unique transactions
    # Note: If a transaction is associated with the same country multiple times (unlikely if normalized),
    # nunique ensures we count the transaction once per country.
    # However, df_tx_countries should be unique on (tx_id, country).
    density = (
        df_tx_countries
        .groupby('recipient_country_code')['transaction_id']
        .nunique()
        .reset_index(name='transaction_count')
    )

    # Sort descending for readability
    density = density.sort_values('transaction_count', ascending=False).reset_index(drop=True)

    logger.info(f"Geo Aggregation: Found {len(density)} unique countries.")

    return density


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 2: Apply log transform for visualization
# -------------------------------------------------------------------------------------------------------------------------------

def apply_log_transform(
    density_table: pd.DataFrame
) -> pd.DataFrame:
    """
    Applies log transformation to transaction counts: D_log = log(1 + D).

    Parameters
    ----------
    density_table : pd.DataFrame
        Table with 'transaction_count'.

    Returns
    -------
    pd.DataFrame
        Table with added 'log_transaction_count'.
    """
    df_out = density_table.copy()

    # log1p(x) = log(1 + x)
    df_out['log_transaction_count'] = np.log1p(df_out['transaction_count'])

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 3: Validate coverage and document exclusions
# -------------------------------------------------------------------------------------------------------------------------------

def validate_geo_coverage(
    df_transactions: pd.DataFrame,
    df_tx_countries: pd.DataFrame,
    density_table: pd.DataFrame
) -> GeoDensityArtifact:
    """
    Validates the geographic mapping coverage and packages the final artifact.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Total cleansed transactions.
    df_tx_countries : pd.DataFrame
        Transactions with country contexts.
    density_table : pd.DataFrame
        Computed density table.

    Returns
    -------
    GeoDensityArtifact
        Final artifact with metadata.
    """
    total_tx = len(df_transactions)
    mapped_tx = df_tx_countries['transaction_id'].nunique()
    excluded_tx = total_tx - mapped_tx
    unique_countries = len(density_table)

    artifact = GeoDensityArtifact(
        density_table=density_table,
        total_transactions_mapped=mapped_tx,
        unique_countries_count=unique_countries,
        excluded_transaction_count=excluded_tx
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_geographic_density(
    df_transactions: pd.DataFrame,
    df_tx_countries: pd.DataFrame
) -> GeoDensityArtifact:
    """
    Orchestrator for Task 11. Computes geographic transaction density and prepares
    data for visualization (Fig. 1).

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Cleansed transactions.
    df_tx_countries : pd.DataFrame
        Transaction-country mapping.

    Returns
    -------
    GeoDensityArtifact
        The density table and coverage metadata.
    """
    logger.info("Starting Task 11: Geographic Density Computation...")

    # Step 1: Aggregate
    density_raw = aggregate_country_counts(df_tx_countries)

    # Step 2: Transform
    density_final = apply_log_transform(density_raw)

    # Step 3: Validate
    artifact = validate_geo_coverage(df_transactions, df_tx_countries, density_final)

    logger.info("Task 11 Completed Successfully.")

    return artifact


In [None]:
# Task 12 — Compute instrument evolution time series (Extended Data Fig. S1-type output)

# =====================================================================================
# Task 12: Compute instrument evolution time series (Extended Data Fig. S1-type output)
# =====================================================================================

@dataclass
class InstrumentEvolutionArtifact:
    """
    Container for instrument evolution time series and metadata.

    This dataclass encapsulates the longitudinal analysis of financial instruments
    used in global aid. It provides the aggregated time series data required to
    visualize the shifting composition of aid (e.g., the rise of equity vs. grants)
    over the study period. Additionally, it includes validation reports that quantify
    data completeness, ensuring that any trends observed are not artifacts of
    missing metadata.

    Attributes
    ----------
    time_series_table : pd.DataFrame
        A wide-form DataFrame indexed by `transaction_year`, with columns corresponding
        to each instrument type (Grant, Loan, Equity). Values represent the count
        of transactions for that instrument in that year.
    validation_report : pd.DataFrame
        A DataFrame indexed by `transaction_year` containing coverage statistics,
        specifically the total number of transactions, the number mapped to a valid
        instrument, and the share of missing data per year.
    total_transactions_analyzed : int
        The total number of transactions included in the analysis scope.
    """
    # The aggregated time series data
    time_series_table: pd.DataFrame

    # The validation metadata per year
    validation_report: pd.DataFrame

    # Total count of transactions processed
    total_transactions_analyzed: int

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the instrument evolution results.

        Returns
        -------
        str
            A human-readable summary of the temporal coverage and volume.
        """
        # Format summary string
        return (f"Instrument Evolution: {len(self.time_series_table)} years, "
                f"{self.total_transactions_analyzed} tx analyzed.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1: Aggregate transaction counts by year and instrument
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_instrument_counts(
    df_tx_instruments: pd.DataFrame
) -> pd.DataFrame:
    """
    Aggregates transaction counts by year and instrument type.
    Implements C(y, i) = sum(1[year=y, inst=i]).

    Parameters
    ----------
    df_tx_instruments : pd.DataFrame
        Transactions enriched with 'transaction_year' and 'instrument_type'.

    Returns
    -------
    pd.DataFrame
        Wide-form DataFrame with index 'transaction_year' and columns for each instrument.
    """
    # Group and count
    # We use size() to count rows
    counts = (
        df_tx_instruments
        .groupby(['transaction_year', 'instrument_type'])
        .size()
        .reset_index(name='count')
    )

    # Pivot to wide format
    # Index: Year, Columns: Instrument Type, Values: Count
    time_series = counts.pivot(
        index='transaction_year',
        columns='instrument_type',
        values='count'
    ).fillna(0).astype(int)

    # Ensure all expected columns exist (Grant, Loan, Equity) even if 0
    expected_cols = ['Grant', 'Loan', 'Equity']
    for col in expected_cols:
        if col not in time_series.columns:
            time_series[col] = 0

    # Sort columns deterministically
    time_series = time_series[sorted(time_series.columns)]

    # Sort index
    time_series = time_series.sort_index()

    logger.info(f"Instrument Aggregation: {len(time_series)} years of data.")

    return time_series


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 2: Validate totals
# -------------------------------------------------------------------------------------------------------------------------------

def validate_instrument_totals(
    df_tx_instruments: pd.DataFrame,
    time_series_table: pd.DataFrame
) -> pd.DataFrame:
    """
    Validates that the sum of instrument counts matches the total transaction count
    per year, accounting for missing instruments.

    Parameters
    ----------
    df_tx_instruments : pd.DataFrame
        Source transactions.
    time_series_table : pd.DataFrame
        Aggregated counts.

    Returns
    -------
    pd.DataFrame
        Validation report with columns ['total_tx', 'mapped_tx', 'missing_tx', 'missing_share'].
    """
    # Calculate total transactions per year from source
    total_per_year = df_tx_instruments.groupby('transaction_year').size()

    # Calculate mapped transactions per year from time series
    mapped_per_year = time_series_table.sum(axis=1)

    # Align indices
    validation = pd.DataFrame({
        'total_tx': total_per_year,
        'mapped_tx': mapped_per_year
    }).fillna(0).astype(int)

    # Calculate missing
    validation['missing_tx'] = validation['total_tx'] - validation['mapped_tx']
    validation['missing_share'] = validation['missing_tx'] / validation['total_tx']

    # Log summary
    avg_missing = validation['missing_share'].mean()
    logger.info(f"Instrument Validation: Average missing share per year = {avg_missing:.2%}")

    return validation


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Persist time series artifact
# -------------------------------------------------------------------------------------------------------------------------------

def package_instrument_artifact(
    time_series_table: pd.DataFrame,
    validation_report: pd.DataFrame,
    total_tx: int
) -> InstrumentEvolutionArtifact:
    """
    Packages the instrument evolution results.

    Parameters
    ----------
    time_series_table : pd.DataFrame
        The evolution data.
    validation_report : pd.DataFrame
        The validation metadata.
    total_tx : int
        Total transactions processed.

    Returns
    -------
    InstrumentEvolutionArtifact
        Final artifact.
    """
    artifact = InstrumentEvolutionArtifact(
        time_series_table=time_series_table,
        validation_report=validation_report,
        total_transactions_analyzed=total_tx
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_instrument_evolution(
    df_tx_instruments: pd.DataFrame
) -> InstrumentEvolutionArtifact:
    """
    Orchestrator for Task 12. Computes the longitudinal evolution of aid instruments.

    Parameters
    ----------
    df_tx_instruments : pd.DataFrame
        Transactions enriched with instrument types.

    Returns
    -------
    InstrumentEvolutionArtifact
        The time series data and validation report.
    """
    logger.info("Starting Task 12: Instrument Evolution Computation...")

    # Step 1: Aggregate
    time_series = aggregate_instrument_counts(df_tx_instruments)

    # Step 2: Validate
    validation = validate_instrument_totals(df_tx_instruments, time_series)

    # Step 3: Package
    total_tx = len(df_tx_instruments)
    artifact = package_instrument_artifact(time_series, validation, total_tx)

    logger.info("Task 12 Completed Successfully.")

    return artifact


In [None]:
# Task 13 — Construct the bipartite Provider–Receiver transaction graph

# ==============================================================================
# Task 13: Construct the bipartite Provider–Receiver transaction graph
# ==============================================================================

@dataclass
class BipartiteGraphArtifact:
    """
    Container for the bipartite graph structure and metadata.

    This dataclass encapsulates the constructed bipartite network connecting
    Provider organisations to Receiver organisations. It holds the sparse
    incidence matrix representing the transaction flows, the node metadata
    tables for both partitions, and the index mappings required to translate
    between matrix indices and canonical node identifiers. This artifact is
    the foundational data structure for subsequent centrality calculations
    and network projections.

    Attributes
    ----------
    provider_nodes : pd.DataFrame
        A DataFrame containing metadata for all unique provider nodes in the
        graph (partition U). Includes canonical IDs and role suffixes.
    receiver_nodes : pd.DataFrame
        A DataFrame containing metadata for all unique receiver nodes in the
        graph (partition V). Includes canonical IDs and role suffixes.
    incidence_matrix : sparse.csr_matrix
        The sparse bi-adjacency matrix B of shape (|U|, |V|), where B_ij
        represents the frequency of transactions between provider i and
        receiver j.
    provider_index_map : Dict[str, int]
        A dictionary mapping provider node IDs (strings) to their row indices
        (integers) in the incidence matrix.
    receiver_index_map : Dict[str, int]
        A dictionary mapping receiver node IDs (strings) to their column indices
        (integers) in the incidence matrix.
    edge_list : pd.DataFrame
        A DataFrame representation of the non-zero entries in the incidence
        matrix, useful for auditing and visualization.
    """
    # Metadata for provider nodes (U)
    provider_nodes: pd.DataFrame

    # Metadata for receiver nodes (V)
    receiver_nodes: pd.DataFrame

    # Sparse incidence matrix B
    incidence_matrix: sparse.csr_matrix

    # Map from provider node ID to matrix row index
    provider_index_map: Dict[str, int]

    # Map from receiver node ID to matrix column index
    receiver_index_map: Dict[str, int]

    # DataFrame of edges (u, v, weight)
    edge_list: pd.DataFrame

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the bipartite graph.

        Returns
        -------
        str
            A human-readable summary of the graph dimensions (nodes and edges).
        """
        # Extract dimensions from matrix shape
        n_prov = self.incidence_matrix.shape[0]
        n_recv = self.incidence_matrix.shape[1]
        n_edges = self.incidence_matrix.nnz

        return (f"Bipartite Graph: {n_prov} Providers, {n_recv} Receivers, "
                f"{n_edges} Edges.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1: Define bipartite node sets with role-splitting
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_bipartite_nodes(
    df_transactions: pd.DataFrame,
    canonical_mapping: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Maps transaction endpoints to canonical IDs and generates role-split node sets.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Cleansed transactions.
    canonical_mapping : pd.DataFrame
        Mapping table with 'org_ref' and 'canonical_org_id'.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]
        1. df_tx_mapped: Transactions with canonical provider/receiver IDs.
        2. provider_nodes: Unique provider nodes with role suffix.
        3. receiver_nodes: Unique receiver nodes with role suffix.
    """
    # Create a mapping dictionary for fast lookup
    # We map org_ref -> canonical_org_id
    # Note: If mapping table has duplicates (one ref to multiple canonicals - shouldn't happen), drop dupes
    mapping_dict = (
        canonical_mapping
        .drop_duplicates('org_ref')
        .set_index('org_ref')['canonical_org_id']
        .to_dict()
    )

    df_tx = df_transactions.copy()

    # Map Provider
    # Prefer Ref, fallback to Name if Ref missing (though Task 5 ensured validity)
    # Ideally we mapped names too in Task 10.
    # For baseline, we assume Task 10 produced a mapping covering all refs found in transactions.
    # If a ref is missing from mapping, we use the ref itself as canonical (fallback).

    def get_canonical(ref: Any, name: Any) -> str:
        ref_str = str(ref).strip()
        if ref_str in mapping_dict:
            return mapping_dict[ref_str]
        # Fallback: if ref is valid, use it. Else use name.
        if ref_str and ref_str.lower() != 'nan':
            return ref_str
        return str(name).strip()

    # Apply mapping (vectorized map is faster if possible, but fallback logic requires apply or coalesce)
    # Let's use map on ref first, then fillna with ref, then fillna with name

    # Provider
    df_tx['provider_canonical_id'] = df_tx['transaction_provider_org_ref'].map(mapping_dict)
    df_tx['provider_canonical_id'] = df_tx['provider_canonical_id'].fillna(df_tx['transaction_provider_org_ref'])
    df_tx['provider_canonical_id'] = df_tx['provider_canonical_id'].fillna(df_tx['transaction_provider_org_name'])

    # Receiver
    df_tx['receiver_canonical_id'] = df_tx['transaction_receiver_org_ref'].map(mapping_dict)
    df_tx['receiver_canonical_id'] = df_tx['receiver_canonical_id'].fillna(df_tx['transaction_receiver_org_ref'])
    df_tx['receiver_canonical_id'] = df_tx['receiver_canonical_id'].fillna(df_tx['transaction_receiver_org_name'])

    # Role Splitting
    df_tx['provider_node_id'] = df_tx['provider_canonical_id'].astype(str) + "::PROVIDER"
    df_tx['receiver_node_id'] = df_tx['receiver_canonical_id'].astype(str) + "::RECEIVER"

    # Extract unique nodes
    provider_nodes = pd.DataFrame({
        'node_id': df_tx['provider_node_id'].unique(),
        'canonical_id': df_tx['provider_canonical_id'].unique(),
        'role': 'PROVIDER'
    })

    receiver_nodes = pd.DataFrame({
        'node_id': df_tx['receiver_node_id'].unique(),
        'canonical_id': df_tx['receiver_canonical_id'].unique(),
        'role': 'RECEIVER'
    })

    logger.info(f"Bipartite Nodes: {len(provider_nodes)} Providers, {len(receiver_nodes)} Receivers.")

    return df_tx, provider_nodes, receiver_nodes


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 2: Define bipartite incidence matrix
# -------------------------------------------------------------------------------------------------------------------------------

def build_incidence_matrix(
    df_tx_mapped: pd.DataFrame,
    provider_nodes: pd.DataFrame,
    receiver_nodes: pd.DataFrame
) -> Tuple[sparse.csr_matrix, Dict[str, int], Dict[str, int], pd.DataFrame]:
    """
    Constructs the frequency-weighted bipartite incidence matrix B.

    Parameters
    ----------
    df_tx_mapped : pd.DataFrame
        Transactions with role-split node IDs.
    provider_nodes : pd.DataFrame
        Unique provider nodes.
    receiver_nodes : pd.DataFrame
        Unique receiver nodes.

    Returns
    -------
    Tuple[sparse.csr_matrix, Dict[str, int], Dict[str, int], pd.DataFrame]
        1. Sparse matrix B (rows=providers, cols=receivers).
        2. Provider index map (node_id -> row_idx).
        3. Receiver index map (node_id -> col_idx).
        4. Edge list DataFrame.
    """
    # Create Index Maps
    # Sort for determinism
    prov_ids = sorted(provider_nodes['node_id'].tolist())
    recv_ids = sorted(receiver_nodes['node_id'].tolist())

    prov_map = {nid: i for i, nid in enumerate(prov_ids)}
    recv_map = {nid: i for i, nid in enumerate(recv_ids)}

    # Aggregate edges (frequency weight)
    edges = (
        df_tx_mapped
        .groupby(['provider_node_id', 'receiver_node_id'])
        .size()
        .reset_index(name='weight')
    )

    # Map to indices
    row_indices = edges['provider_node_id'].map(prov_map).values
    col_indices = edges['receiver_node_id'].map(recv_map).values
    data = edges['weight'].values

    # Build Matrix
    shape = (len(prov_ids), len(recv_ids))
    B = sparse.coo_matrix((data, (row_indices, col_indices)), shape=shape).tocsr()

    logger.info(f"Incidence Matrix: Shape {shape}, NNZ {B.nnz}.")

    return B, prov_map, recv_map, edges


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 3: Persist bipartite graph artifacts
# -------------------------------------------------------------------------------------------------------------------------------

def package_bipartite_artifact(
    provider_nodes: pd.DataFrame,
    receiver_nodes: pd.DataFrame,
    B: sparse.csr_matrix,
    prov_map: Dict[str, int],
    recv_map: Dict[str, int],
    edge_list: pd.DataFrame
) -> BipartiteGraphArtifact:
    """
    Packages the bipartite graph components.

    Parameters
    ----------
    provider_nodes : pd.DataFrame
        Provider node table.
    receiver_nodes : pd.DataFrame
        Receiver node table.
    B : sparse.csr_matrix
        Incidence matrix.
    prov_map : Dict[str, int]
        Provider index map.
    recv_map : Dict[str, int]
        Receiver index map.
    edge_list : pd.DataFrame
        Edge list.

    Returns
    -------
    BipartiteGraphArtifact
        Final artifact.
    """
    artifact = BipartiteGraphArtifact(
        provider_nodes=provider_nodes,
        receiver_nodes=receiver_nodes,
        incidence_matrix=B,
        provider_index_map=prov_map,
        receiver_index_map=recv_map,
        edge_list=edge_list
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def construct_bipartite_graph(
    df_transactions: pd.DataFrame,
    canonical_mapping: pd.DataFrame
) -> BipartiteGraphArtifact:
    """
    Orchestrator for Task 13. Constructs the bipartite provider-receiver graph.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Cleansed transactions.
    canonical_mapping : pd.DataFrame
        Canonical organisation mapping.

    Returns
    -------
    BipartiteGraphArtifact
        The constructed graph.
    """
    logger.info("Starting Task 13: Bipartite Graph Construction...")

    # Step 1: Prepare Nodes
    df_mapped, prov_nodes, recv_nodes = prepare_bipartite_nodes(df_transactions, canonical_mapping)

    # Step 2: Build Matrix
    B, prov_map, recv_map, edges = build_incidence_matrix(df_mapped, prov_nodes, recv_nodes)

    # Step 3: Package
    artifact = package_bipartite_artifact(prov_nodes, recv_nodes, B, prov_map, recv_map, edges)

    logger.info("Task 13 Completed Successfully.")

    return artifact


In [None]:
# Task 14 — Compute transaction counts per organisation (node size metric)

# ==============================================================================
# Task 14: Compute transaction counts per organisation (node size metric)
# ==============================================================================

@dataclass
class NodeSizeArtifact:
    """
    Container for organisation transaction counts (node sizes).

    This dataclass encapsulates the results of the node sizing computation,
    which is based on the total number of transactions ("deal count") associated
    with each organisation. This metric serves as the primary sizing variable
    for the "Solar System" visualization, contrasting with traditional volume-based
    metrics (USD). The artifact includes both the raw counts per organisation
    and aggregated validation statistics to verify alignment with the manuscript's
    reported distributions.

    Attributes
    ----------
    deal_counts : pd.DataFrame
        A DataFrame indexed by `canonical_org_id` containing the transaction counts
        for each organisation. Columns include `provider_tx_count`, `receiver_tx_count`,
        and the aggregate `total_tx_count`.
    validation_stats : pd.DataFrame
        A summary DataFrame containing descriptive statistics (mean, median, count)
        of deal counts, stratified by organisation type (e.g., Academic, Foundation).
        Used to validate findings such as "Universities are 85% smaller by deal count".
    """
    # The per-organisation deal counts
    deal_counts: pd.DataFrame

    # The validation statistics by org type
    validation_stats: pd.DataFrame

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the node size artifact.

        Returns
        -------
        str
            A human-readable summary indicating the number of organisations sized.
        """
        # Format summary string
        return f"Node Sizes: Computed for {len(self.deal_counts)} organisations."

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1: Define deal count metric
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_deal_counts(
    df_tx_mapped: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes the total number of transactions each organisation participates in,
    either as a provider or a receiver.

    Parameters
    ----------
    df_tx_mapped : pd.DataFrame
        Transactions with 'provider_canonical_id' and 'receiver_canonical_id'.

    Returns
    -------
    pd.DataFrame
        Table indexed by 'canonical_org_id' with columns:
        ['provider_tx_count', 'receiver_tx_count', 'total_tx_count'].
    """
    # Count as provider
    prov_counts = df_tx_mapped['provider_canonical_id'].value_counts()

    # Count as receiver
    recv_counts = df_tx_mapped['receiver_canonical_id'].value_counts()

    # Align and sum
    # We use a DataFrame to handle the outer join logic of aligning indices
    counts = pd.DataFrame({
        'provider_tx_count': prov_counts,
        'receiver_tx_count': recv_counts
    }).fillna(0).astype(int)

    # Total deals
    counts['total_tx_count'] = counts['provider_tx_count'] + counts['receiver_tx_count']

    # Name the index
    counts.index.name = 'canonical_org_id'

    logger.info(f"Deal Counts: Computed for {len(counts)} unique entities.")

    return counts


# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 2: Validate against manuscript statistics
# -------------------------------------------------------------------------------------------------------------------------------

def validate_deal_statistics(
    deal_counts: pd.DataFrame,
    df_organisations: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes summary statistics for deal counts, stratified by organisation type,
    to compare against manuscript claims.

    Parameters
    ----------
    deal_counts : pd.DataFrame
        The computed deal counts.
    df_organisations : pd.DataFrame
        Organisation master table with 'org_type' and 'canonical_org_id'.

    Returns
    -------
    pd.DataFrame
        Summary statistics per org_type.
    """
    # Join type info
    # We need to map canonical_org_id to org_type.
    # If canonical_org_id is in df_organisations, we use that.
    # Note: df_organisations might have multiple rows per canonical ID if multiple raw refs mapped to one.
    # We need a unique mapping. We take the first type found (assuming consistency within canonical cluster).
    type_map = (
        df_organisations
        .dropna(subset=['canonical_org_id', 'org_type'])
        .drop_duplicates('canonical_org_id')
        .set_index('canonical_org_id')['org_type']
    )

    # Enrich counts
    enriched = deal_counts.copy()
    enriched['org_type'] = enriched.index.map(type_map).fillna('Unknown')

    # Compute stats
    stats = (
        enriched
        .groupby('org_type')['total_tx_count']
        .agg(['count', 'mean', 'median', 'sum'])
        .sort_values('mean', ascending=False)
    )

    # Add global row
    global_stats = pd.DataFrame({
        'count': [len(enriched)],
        'mean': [enriched['total_tx_count'].mean()],
        'median': [enriched['total_tx_count'].median()],
        'sum': [enriched['total_tx_count'].sum()]
    }, index=['Global'])

    final_stats = pd.concat([stats, global_stats])

    logger.info("Deal Stats Validation:\n" + str(final_stats[['count', 'mean', 'median']]))

    return final_stats


# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 3: Persist deal count table
# -------------------------------------------------------------------------------------------------------------------------------

def package_node_size_artifact(
    deal_counts: pd.DataFrame,
    validation_stats: pd.DataFrame
) -> NodeSizeArtifact:
    """
    Packages the node size results.

    Parameters
    ----------
    deal_counts : pd.DataFrame
        The counts.
    validation_stats : pd.DataFrame
        The stats.

    Returns
    -------
    NodeSizeArtifact
        Final artifact.
    """
    artifact = NodeSizeArtifact(
        deal_counts=deal_counts,
        validation_stats=validation_stats
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_node_sizes(
    df_tx_mapped: pd.DataFrame,
    df_organisations: pd.DataFrame
) -> NodeSizeArtifact:
    """
    Orchestrator for Task 14. Computes transaction counts for node sizing.

    Parameters
    ----------
    df_tx_mapped : pd.DataFrame
        Transactions with canonical IDs.
    df_organisations : pd.DataFrame
        Organisation master table.

    Returns
    -------
    NodeSizeArtifact
        The deal counts and validation stats.
    """
    logger.info("Starting Task 14: Node Size Computation...")

    # Step 1: Calculate
    counts = calculate_deal_counts(df_tx_mapped)

    # Step 2: Validate
    stats = validate_deal_statistics(counts, df_organisations)

    # Step 3: Package
    artifact = package_node_size_artifact(counts, stats)

    logger.info("Task 14 Completed Successfully.")

    return artifact


In [None]:
# Task 15 — Construct Provider–Provider co-occurrence projection

# ==============================================================================
# Task 15: Construct Provider–Provider co-occurrence projection
# ==============================================================================

@dataclass
class ProjectionArtifact:
    """
    Container for the projected provider-provider graph.

    This dataclass encapsulates the one-mode projection of the bipartite network
    onto the set of Provider organisations. It contains the symmetric adjacency
    matrix P, where P_ij represents the strength of the relationship between
    provider i and provider j based on their shared contexts (co-funding or
    co-implementation in the same country or sector). This artifact is the
    primary input for community detection and topological analysis of the
    donor ecosystem.

    Attributes
    ----------
    adjacency_matrix : sparse.csr_matrix
        The symmetric, weighted adjacency matrix of the provider network.
        Weights correspond to the frequency of co-occurrence.
    provider_index_map : Dict[str, int]
        A dictionary mapping canonical provider IDs (strings) to their row/column
        indices (integers) in the adjacency matrix.
    context_index_map : Dict[str, int]
        A dictionary mapping context IDs (strings, e.g., "COUNTRY:US") to their
        column indices in the intermediate incidence matrix M. Included for
        provenance and potential re-projection.
    edge_list : pd.DataFrame
        A DataFrame representation of the upper triangle of the adjacency matrix,
        listing unique edges (source, target, weight). Useful for visualization
        and export.
    """
    # The projected adjacency matrix P
    adjacency_matrix: sparse.csr_matrix

    # Map from provider ID to matrix index
    provider_index_map: Dict[str, int]

    # Map from context ID to incidence matrix index
    context_index_map: Dict[str, int]

    # DataFrame of edges
    edge_list: pd.DataFrame

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the projection graph.

        Returns
        -------
        str
            A human-readable summary of the network size (nodes and edges).
        """
        # Extract dimensions
        n_nodes = self.adjacency_matrix.shape[0]
        # Count edges (undirected, so divide non-zeros by 2)
        n_edges = self.adjacency_matrix.nnz // 2

        return f"Projection: {n_nodes} Providers, {n_edges} Co-occurrence Edges."

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 1: Build provider–context incidence matrix
# -------------------------------------------------------------------------------------------------------------------------------

def build_context_incidence(
    df_contexts: pd.DataFrame,
    df_tx_mapped: pd.DataFrame
) -> Tuple[sparse.csr_matrix, Dict[str, int], Dict[str, int]]:
    """
    Constructs the provider-context incidence matrix M.
    M_ix = frequency of provider i appearing in context x.

    Parameters
    ----------
    df_contexts : pd.DataFrame
        Long-form contexts (transaction_id, context_id).
    df_tx_mapped : pd.DataFrame
        Transactions with 'provider_canonical_id'.

    Returns
    -------
    Tuple[sparse.csr_matrix, Dict[str, int], Dict[str, int]]
        1. Sparse matrix M.
        2. Provider index map.
        3. Context index map.
    """
    # Join to get provider for each context instance
    # We need transaction_id -> provider_canonical_id
    tx_provider_map = df_tx_mapped[['transaction_id', 'provider_canonical_id']]

    # Inner join: only transactions with both context and valid provider
    df_incidence = pd.merge(
        df_contexts,
        tx_provider_map,
        on='transaction_id',
        how='inner'
    )

    # Aggregate frequency
    # Count how many times provider P appears in context C
    counts = (
        df_incidence
        .groupby(['provider_canonical_id', 'context_id'])
        .size()
        .reset_index(name='weight')
    )

    # Create Index Maps
    prov_ids = sorted(counts['provider_canonical_id'].unique())
    ctx_ids = sorted(counts['context_id'].unique())

    prov_map = {pid: i for i, pid in enumerate(prov_ids)}
    ctx_map = {cid: i for i, cid in enumerate(ctx_ids)}

    # Map to indices
    row_indices = counts['provider_canonical_id'].map(prov_map).values
    col_indices = counts['context_id'].map(ctx_map).values
    data = counts['weight'].values

    # Build Matrix M
    shape = (len(prov_ids), len(ctx_ids))
    M = sparse.coo_matrix((data, (row_indices, col_indices)), shape=shape).tocsr()

    logger.info(f"Context Incidence: {len(prov_ids)} Providers x {len(ctx_ids)} Contexts.")

    return M, prov_map, ctx_map


# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 2: Project to provider–provider adjacency
# -------------------------------------------------------------------------------------------------------------------------------

def project_to_adjacency(
    M: sparse.csr_matrix
) -> sparse.csr_matrix:
    """
    Computes the one-mode projection P = M * M.T and removes self-loops.

    Parameters
    ----------
    M : sparse.csr_matrix
        Provider-context incidence matrix.

    Returns
    -------
    sparse.csr_matrix
        Symmetric provider-provider adjacency matrix P.
    """
    # Matrix multiplication
    # P_ij = sum_k M_ik * M_jk (dot product of context vectors)
    P = M.dot(M.T)

    # Remove self-loops (diagonal)
    # setdiag works on lil or csr (efficiently on lil, but csr is okay for batch)
    P.setdiag(0)

    # Eliminate explicit zeros created by setdiag
    P.eliminate_zeros()

    logger.info(f"Projection: Generated adjacency with {P.nnz} non-zero entries.")

    return P


# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 3: Persist projected graph
# -------------------------------------------------------------------------------------------------------------------------------

def package_projection_artifact(
    P: sparse.csr_matrix,
    prov_map: Dict[str, int],
    ctx_map: Dict[str, int]
) -> ProjectionArtifact:
    """
    Packages the projection graph.

    Parameters
    ----------
    P : sparse.csr_matrix
        Adjacency matrix.
    prov_map : Dict[str, int]
        Provider index map.
    ctx_map : Dict[str, int]
        Context index map.

    Returns
    -------
    ProjectionArtifact
        Final artifact.
    """
    # Extract edge list for persistence/audit
    # Use upper triangle to avoid duplicates in undirected graph
    P_triu = sparse.triu(P, format='coo')

    # Reverse map for IDs
    idx_to_prov = {v: k for k, v in prov_map.items()}

    rows = [idx_to_prov[i] for i in P_triu.row]
    cols = [idx_to_prov[j] for j in P_triu.col]
    weights = P_triu.data

    edge_list = pd.DataFrame({
        'source_canonical_id': rows,
        'target_canonical_id': cols,
        'weight': weights
    })

    artifact = ProjectionArtifact(
        adjacency_matrix=P,
        provider_index_map=prov_map,
        context_index_map=ctx_map,
        edge_list=edge_list
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def construct_co_occurrence_projection(
    df_contexts: pd.DataFrame,
    df_tx_mapped: pd.DataFrame
) -> ProjectionArtifact:
    """
    Orchestrator for Task 15. Constructs the provider-provider co-occurrence network.

    Parameters
    ----------
    df_contexts : pd.DataFrame
        Unified contexts.
    df_tx_mapped : pd.DataFrame
        Transactions with canonical IDs.

    Returns
    -------
    ProjectionArtifact
        The projected graph.
    """
    logger.info("Starting Task 15: Co-occurrence Projection...")

    # Step 1: Incidence
    M, prov_map, ctx_map = build_context_incidence(df_contexts, df_tx_mapped)

    # Step 2: Projection
    P = project_to_adjacency(M)

    # Step 3: Package
    artifact = package_projection_artifact(P, prov_map, ctx_map)

    logger.info("Task 15 Completed Successfully.")

    return artifact


In [None]:
# Task 16 — Learn node embeddings with node2vec

# ==============================================================================
# Task 16: Learn node embeddings with node2vec
# ==============================================================================

@dataclass
class EmbeddingArtifact:
    """
    Container for node embeddings and training metadata.

    This dataclass encapsulates the output of the representation learning phase.
    It holds the dense vector representations (embeddings) for all nodes in the
    network, indexed by their canonical identifiers. These embeddings capture
    the structural equivalence and community membership of organisations, serving
    as the input for the UMAP dimensionality reduction. The artifact also
    preserves the hyperparameters used during the random walk generation and
    Skip-gram training to ensure reproducibility.

    Attributes
    ----------
    embeddings : pd.DataFrame
        A DataFrame where the index corresponds to canonical node IDs (strings)
        and columns represent the embedding dimensions (e.g., dim_0, ..., dim_99).
    hyperparameters : Dict[str, Any]
        A dictionary recording the configuration used for Node2Vec, including
        p, q, walk_length, num_walks, window_size, and random_seed.
    """
    embeddings: pd.DataFrame
    hyperparameters: Dict[str, Any]

    def __str__(self) -> str:
        return (f"Embeddings: {len(self.embeddings)} nodes, "
                f"{self.embeddings.shape[1]} dimensions.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 1: Specify graph input and parameters
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_symmetric_adjacency(
    bipartite_artifact: Any # BipartiteGraphArtifact
) -> Tuple[sparse.csr_matrix, Dict[int, str]]:
    """
    Constructs the symmetric adjacency matrix A for the bipartite graph and a unified index map.

    A = [[0, B], [B.T, 0]] where B is the incidence matrix (Providers x Receivers).

    Parameters
    ----------
    bipartite_artifact : BipartiteGraphArtifact
        The constructed bipartite graph containing incidence matrix B and separate index maps.

    Returns
    -------
    Tuple[sparse.csr_matrix, Dict[int, str]]
        1. The symmetric adjacency matrix A in CSR format.
        2. A unified mapping from integer index (0 to |U|+|V|-1) to canonical node ID.
    """
    B = bipartite_artifact.incidence_matrix
    n_u, n_v = B.shape

    # Construct symmetric adjacency A
    # Row indices: 0..n_u-1 (Providers), n_u..n_u+n_v-1 (Receivers)
    # We use bmat to create the block matrix efficiently
    A = sparse.bmat([[None, B], [B.T, None]], format='csr')

    # Create unified index map
    # 0..n_u-1 -> Provider IDs
    # n_u..end -> Receiver IDs
    idx_to_id = {}

    # Reverse provider map (0 to n_u-1)
    # provider_index_map maps ID -> Index
    for pid, idx in bipartite_artifact.provider_index_map.items():
        idx_to_id[idx] = pid

    # Reverse receiver map (offset by n_u)
    for rid, idx in bipartite_artifact.receiver_index_map.items():
        idx_to_id[idx + n_u] = rid

    logger.info(f"Graph Preparation: Constructed symmetric adjacency A with shape {A.shape} and {A.nnz} edges.")

    return A, idx_to_id


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2: Define biased random walk model and generate walks (Numba Optimized)
# -------------------------------------------------------------------------------------------------------------------------------

@njit
def numba_random_walk(
    indptr: np.ndarray,
    indices: np.ndarray,
    data: np.ndarray,
    start_nodes: np.ndarray,
    walk_length: int,
    p: float,
    q: float,
    seed: int
) -> np.ndarray:
    """
    JIT-compiled function to generate biased random walks for Node2Vec.

    This implementation handles the 2nd-order Markov chain logic efficiently.
    For a bipartite graph, the distance d(t, x) from previous node t to next node x
    is always 2 (unless returning to t, where d=0).

    Parameters
    ----------
    indptr : np.ndarray
        CSR index pointer array.
    indices : np.ndarray
        CSR column indices array.
    data : np.ndarray
        CSR data array (edge weights).
    start_nodes : np.ndarray
        Array of starting node indices for this batch of walks.
    walk_length : int
        Length of each walk.
    p : float
        Return parameter (1/p probability to return).
    q : float
        In-out parameter (1/q probability to move to distance 2).
    seed : int
        Random seed for reproducibility.

    Returns
    -------
    np.ndarray
        2D array of walks (num_walks x walk_length).
    """
    np.random.seed(seed)
    n_walks = len(start_nodes)
    walks = np.empty((n_walks, walk_length), dtype=np.int32)

    for i in range(n_walks):
        curr = start_nodes[i]
        walks[i, 0] = curr
        prev = -1

        for step in range(1, walk_length):
            # Get neighbors of curr
            start_idx = indptr[curr]
            end_idx = indptr[curr + 1]

            if start_idx == end_idx:
                # Isolated node or dead end, fill rest with current
                walks[i, step:] = curr
                break

            nbrs = indices[start_idx:end_idx]
            weights = data[start_idx:end_idx].astype(np.float64) # Copy to float for manipulation

            # Calculate unnormalized transition probabilities
            if prev == -1:
                # First step: proportional to edge weight only
                pass # weights are already edge weights
            else:
                # 2nd order bias
                # In bipartite:
                # - Neighbor is 'prev' (dist=0) -> weight * 1/p
                # - Neighbor is not 'prev' (dist=2) -> weight * 1/q
                # (dist=1 impossible in bipartite)

                for j in range(len(nbrs)):
                    nbr = nbrs[j]
                    if nbr == prev:
                        weights[j] *= (1.0 / p)
                    else:
                        weights[j] *= (1.0 / q)

            # Sampling
            # Compute CDF for weighted sampling
            cdf = np.cumsum(weights)
            cdf /= cdf[-1]

            r = np.random.random()
            # Find index where cdf >= r
            # Manual search is fast for small degrees
            next_idx = 0
            for j in range(len(cdf)):
                if r <= cdf[j]:
                    next_idx = j
                    break

            next_node = nbrs[next_idx]
            walks[i, step] = next_node

            prev = curr
            curr = next_node

    return walks

def generate_walks_production(
    A: sparse.csr_matrix,
    num_walks: int,
    walk_length: int,
    p: float,
    q: float,
    seed: int
) -> List[List[int]]:
    """
    Wrapper for Numba-optimized walk generation. Handles batching and type conversion.

    Parameters
    ----------
    A : sparse.csr_matrix
        Symmetric adjacency matrix.
    num_walks : int
        Walks per node.
    walk_length : int
        Length of walk.
    p : float
        Return parameter.
    q : float
        In-out parameter.
    seed : int
        Random seed.

    Returns
    -------
    List[List[int]]
        List of walks (as lists of integers).
    """
    # Ensure CSR format and sorted indices for deterministic behavior
    A_sorted = A.sorted_indices()
    indptr = A_sorted.indptr
    indices = A_sorted.indices
    data = A_sorted.data

    n_nodes = A.shape[0]
    all_nodes = np.arange(n_nodes, dtype=np.int32)

    # Repeat nodes for num_walks
    # We shuffle the order of walk generation to avoid artifacts,
    # but we seed the shuffle for reproducibility.
    rng = np.random.default_rng(seed)

    # We generate walks in one go or batches. For 2500 nodes * 10 walks = 25k walks.
    # This fits easily in memory.
    start_nodes = np.repeat(all_nodes, num_walks)
    rng.shuffle(start_nodes)

    # Call Numba function
    # Note: Numba requires typed arrays
    raw_walks = numba_random_walk(
        indptr, indices, data,
        start_nodes, walk_length,
        p, q, seed
    )

    # Convert to list of lists
    walks_list = raw_walks.tolist()

    logger.info(f"Random Walks: Generated {len(walks_list)} walks using Numba optimization.")
    return walks_list


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 3: Train skip-gram embedding model
# -------------------------------------------------------------------------------------------------------------------------------

def train_skipgram_model(
    walks: List[List[int]],
    idx_to_id: Dict[int, str],
    dimensions: int,
    window_size: int,
    seed: int
) -> pd.DataFrame:
    """
    Trains the Skip-gram model (Word2Vec) on the generated walks.

    Parameters
    ----------
    walks : List[List[int]]
        The random walks (integer indices).
    idx_to_id : Dict[int, str]
        Mapping from integer index to canonical node ID.
    dimensions : int
        Embedding vector size.
    window_size : int
        Context window size.
    seed : int
        Random seed.

    Returns
    -------
    pd.DataFrame
        Embeddings indexed by canonical node ID.
    """
    # Convert integer walks to canonical string IDs
    # This ensures the model learns vectors for the actual entity IDs
    # and handles the mapping transparently.
    str_walks = []
    for walk in walks:
        str_walk = [idx_to_id[idx] for idx in walk]
        str_walks.append(str_walk)

    logger.info("Word2Vec: Training Skip-gram model...")

    # Train Word2Vec
    # sg=1 (Skip-gram), hs=0 (Negative Sampling), workers=1 (Deterministic)
    model = Word2Vec(
        sentences=str_walks,
        vector_size=dimensions,
        window=window_size,
        min_count=1, # Keep all nodes that appear in walks
        sg=1,
        workers=1,
        seed=seed,
        epochs=5,
        negative=5 # Default negative sampling
    )

    # Extract vectors
    # The model vocabulary contains the canonical IDs
    vocab_keys = list(model.wv.index_to_key)
    vectors = model.wv[vocab_keys]

    df_embeddings = pd.DataFrame(
        vectors,
        index=vocab_keys,
        columns=[f"dim_{i}" for i in range(dimensions)]
    )

    # Sort index for deterministic output
    df_embeddings = df_embeddings.sort_index()

    logger.info(f"Word2Vec: Learned embeddings for {len(df_embeddings)} nodes.")

    return df_embeddings


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def learn_node_embeddings(
    bipartite_artifact: Any, # BipartiteGraphArtifact
    config: Dict[str, Any]
) -> EmbeddingArtifact:
    """
    Orchestrator for Task 16. Generates node embeddings using Node2Vec with Numba acceleration.

    Parameters
    ----------
    bipartite_artifact : BipartiteGraphArtifact
        The constructed bipartite graph.
    config : Dict[str, Any]
        Configuration containing hyperparameters.

    Returns
    -------
    EmbeddingArtifact
        The learned embeddings and metadata.
    """
    logger.info("Starting Task 16: Node2Vec Embedding (Production Grade)...")

    # Extract params with defaults
    n2v_config = config.get("NODE2VEC", {})
    dim = int(n2v_config.get("dimensions", 100))
    seed = int(n2v_config.get("random_seed", 42))

    # Assumptions (explicitly recorded)
    p = float(n2v_config.get("p", 1.0))
    q = float(n2v_config.get("q", 1.0))
    num_walks = int(n2v_config.get("num_walks", 10))
    walk_length = int(n2v_config.get("walk_length", 80))
    window_size = int(n2v_config.get("window_size", 10))

    # Step 1: Prepare Graph
    A, idx_to_id = prepare_symmetric_adjacency(bipartite_artifact)

    # Step 2: Generate Walks (Numba)
    walks = generate_walks_production(
        A, num_walks, walk_length, p, q, seed
    )

    # Step 3: Train Model
    embeddings = train_skipgram_model(
        walks, idx_to_id, dim, window_size, seed
    )

    # Package
    params = {
        "p": p, "q": q, "num_walks": num_walks,
        "walk_length": walk_length, "window_size": window_size,
        "seed": seed, "dimensions": dim,
        "implementation": "numba_accelerated"
    }

    artifact = EmbeddingArtifact(embeddings, params)

    logger.info("Task 16 Completed Successfully.")

    return artifact


In [None]:
# Task 17 — Project embeddings to 2D with UMAP

# ==============================================================================
# Task 17: Project embeddings to 2D with UMAP
# ==============================================================================

@dataclass
class UMAPArtifact:
    """
    Container for UMAP projection results and metadata.

    This dataclass encapsulates the output of the dimensionality reduction phase,
    where the high-dimensional node embeddings are projected onto a 2D manifold.
    It holds the resulting coordinate table, which serves as the basis for the
    "Hidden Geometry" visualization (Figure 2). Additionally, it preserves the
    exact hyperparameters used for the UMAP algorithm and the qualitative
    interpretation of the resulting axes, ensuring that the visual map is both
    reproducible and semantically meaningful.

    Attributes
    ----------
    coordinates : pd.DataFrame
        A DataFrame indexed by `canonical_org_id` containing the 2D coordinates
        for each organisation. Columns are `umap_x` and `umap_y`.
    parameters : Dict[str, Any]
        A dictionary recording the UMAP hyperparameters used, including
        `n_neighbors`, `min_dist`, `metric`, and `random_state`.
    axis_interpretation : Dict[str, str]
        A dictionary mapping axis names (e.g., "horizontal_axis") to their
        qualitative functional interpretations (e.g., "Humanitarian vs. Development"),
        derived from the positions of known anchor organisations.
    """
    # The 2D coordinates
    coordinates: pd.DataFrame

    # The UMAP parameters
    parameters: Dict[str, Any]

    # The semantic interpretation of axes
    axis_interpretation: Dict[str, str]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the UMAP projection.

        Returns
        -------
        str
            A human-readable summary of the projection size and key parameters.
        """
        # Format summary string
        return (f"UMAP Projection: {len(self.coordinates)} points. "
                f"Params: {self.parameters}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 1 & 2: Fix parameters and Compute 2D coordinates
# -------------------------------------------------------------------------------------------------------------------------------

def compute_umap_projection(
    embeddings: pd.DataFrame,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Projects high-dimensional embeddings to 2D using UMAP with fixed parameters.

    Parameters
    ----------
    embeddings : pd.DataFrame
        Node embeddings (index=canonical_id).
    config : Dict[str, Any]
        Configuration dictionary.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, Any]]
        1. DataFrame with columns ['umap_x', 'umap_y'], indexed by canonical_id.
        2. Dictionary of parameters used.
    """
    # Extract params
    umap_config = config.get("UMAP", {})

    # Manuscript-stated defaults
    params = {
        "n_neighbors": int(umap_config.get("n_neighbors", 15)),
        "min_dist": float(umap_config.get("min_dist", 0.1)),
        "metric": umap_config.get("metric", "cosine"),
        "n_components": int(umap_config.get("n_components", 2)),
        "random_state": int(umap_config.get("random_state", 42))
    }

    logger.info(f"UMAP: Fitting with params {params}...")

    # Initialize UMAP
    reducer = umap.UMAP(
        n_neighbors=params["n_neighbors"],
        min_dist=params["min_dist"],
        metric=params["metric"],
        n_components=params["n_components"],
        random_state=params["random_state"],
        transform_seed=params["random_state"] # Ensure transform is also seeded
    )

    # Fit Transform
    # Ensure deterministic row order by sorting index if not already sorted
    # (embeddings should be sorted from previous task, but safety first)
    embeddings_sorted = embeddings.sort_index()

    # Extract numpy array
    X = embeddings_sorted.values

    # Run UMAP
    embedding_2d = reducer.fit_transform(X)

    # Create DataFrame
    df_coords = pd.DataFrame(
        embedding_2d,
        index=embeddings_sorted.index,
        columns=['umap_x', 'umap_y']
    )

    logger.info(f"UMAP: Projected {len(df_coords)} nodes to 2D.")

    return df_coords, params


# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 3: Document axis interpretation (qualitative)
# -------------------------------------------------------------------------------------------------------------------------------

def define_axis_interpretation() -> Dict[str, str]:
    """
    Defines the qualitative interpretation of the UMAP axes based on the manuscript.

    Returns
    -------
    Dict[str, str]
        Dictionary describing the axes.
    """
    return {
        "horizontal_axis": "Humanitarian (Crisis Response) <-> Development (Long-term Resilience)",
        "vertical_axis": "Funders (Capital Supply) <-> Implementers (Operational Delivery)",
        "anchors": "OCHA (Humanitarian Funder), Save the Children (Humanitarian Implementer), "
                   "UNDP (Development Funder), Chemonics (Development Implementer)"
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def project_embeddings_umap(
    embedding_artifact: Any, # EmbeddingArtifact
    config: Dict[str, Any]
) -> UMAPArtifact:
    """
    Orchestrator for Task 17. Projects embeddings to 2D manifold.

    Parameters
    ----------
    embedding_artifact : EmbeddingArtifact
        The learned embeddings.
    config : Dict[str, Any]
        Configuration.

    Returns
    -------
    UMAPArtifact
        The projection results.
    """
    logger.info("Starting Task 17: UMAP Projection...")

    # Step 1 & 2: Compute
    coords, params = compute_umap_projection(embedding_artifact.embeddings, config)

    # Step 3: Interpret
    interpretation = define_axis_interpretation()

    # Package
    artifact = UMAPArtifact(
        coordinates=coords,
        parameters=params,
        axis_interpretation=interpretation
    )

    logger.info("Task 17 Completed Successfully.")

    return artifact


In [None]:
# Task 18 — Compute Hub Score (HITS) centrality

# ==============================================================================
# Task 18: Compute Hub Score (HITS) centrality
# ==============================================================================

@dataclass
class HITSArtifact:
    """
    Container for HITS centrality scores and metadata.

    This dataclass encapsulates the results of the Hyperlink-Induced Topic Search (HITS)
    algorithm applied to the bipartite transaction network. It provides the computed
    Hub scores (for Provider organisations) and Authority scores (for Receiver
    organisations), which serve as the primary metrics for ranking systemic influence
    in the "Solar System" visualization. The artifact also includes convergence
    metadata to verify the numerical stability of the power iteration process.

    Attributes
    ----------
    hub_scores : pd.DataFrame
        A DataFrame indexed by `canonical_org_id` containing the Hub scores for
        provider nodes. High scores indicate providers that fund many authoritative
        receivers.
    authority_scores : pd.DataFrame
        A DataFrame indexed by `canonical_org_id` containing the Authority scores
        for receiver nodes. High scores indicate receivers funded by many
        influential hubs.
    convergence_meta : Dict[str, Any]
        A dictionary recording the convergence status of the algorithm, including
        the number of iterations performed, the final residual error, and the
        tolerance threshold used.
    """
    # DataFrame of Hub scores (Providers)
    hub_scores: pd.DataFrame

    # DataFrame of Authority scores (Receivers)
    authority_scores: pd.DataFrame

    # Metadata regarding algorithm convergence
    convergence_meta: Dict[str, Any]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the HITS results.

        Returns
        -------
        str
            A human-readable summary of the number of scored entities and
            convergence status.
        """
        # Format summary string
        return (f"HITS Centrality: {len(self.hub_scores)} Hubs, "
                f"{len(self.authority_scores)} Authorities. "
                f"Converged: {self.convergence_meta['converged']}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 1 & 2: Iterate HITS algorithm
# -------------------------------------------------------------------------------------------------------------------------------

def run_hits_algorithm(
    B: sparse.csr_matrix,
    tol: float = 1e-6,
    max_iter: int = 100
) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]:
    """
    Computes HITS hub and authority scores using power iteration on the bipartite matrix.

    h = B * a
    a = B.T * h

    Parameters
    ----------
    B : sparse.csr_matrix
        Incidence matrix (Providers x Receivers).
    tol : float
        Convergence tolerance (L2 norm difference).
    max_iter : int
        Maximum iterations.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray, Dict[str, Any]]
        1. Hub scores (size |U|).
        2. Authority scores (size |V|).
        3. Convergence metadata.
    """
    n_u, n_v = B.shape

    # Initialize
    # Start with uniform positive values
    h = np.ones(n_u) / np.sqrt(n_u)
    a = np.ones(n_v) / np.sqrt(n_v)

    converged = False
    iterations = 0
    final_diff = 0.0

    # Precompute transpose for efficiency
    BT = B.transpose()

    for i in range(max_iter):
        h_prev = h.copy()
        a_prev = a.copy()

        # Update Authorities: a = B.T * h
        a = BT.dot(h)

        # Normalize a
        norm_a = np.linalg.norm(a, 2)
        if norm_a > 0:
            a = a / norm_a

        # Update Hubs: h = B * a
        h = B.dot(a)

        # Normalize h
        norm_h = np.linalg.norm(h, 2)
        if norm_h > 0:
            h = h / norm_h

        # Check convergence
        diff_h = np.linalg.norm(h - h_prev, 2)
        diff_a = np.linalg.norm(a - a_prev, 2)
        total_diff = diff_h + diff_a

        if total_diff < tol:
            converged = True
            iterations = i + 1
            final_diff = total_diff
            break

        iterations = i + 1
        final_diff = total_diff

    meta = {
        "converged": converged,
        "iterations": iterations,
        "final_diff": final_diff,
        "tolerance": tol
    }

    logger.info(f"HITS: Converged={converged} in {iterations} iters (Diff={final_diff:.2e}).")

    return h, a, meta


# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 3: Persist hub scores
# -------------------------------------------------------------------------------------------------------------------------------

def package_hits_artifact(
    h: np.ndarray,
    a: np.ndarray,
    meta: Dict[str, Any],
    prov_map: Dict[str, int],
    recv_map: Dict[str, int]
) -> HITSArtifact:
    """
    Packages HITS scores into DataFrames indexed by canonical IDs.

    Parameters
    ----------
    h : np.ndarray
        Hub vector.
    a : np.ndarray
        Authority vector.
    meta : Dict[str, Any]
        Metadata.
    prov_map : Dict[str, int]
        Provider index map.
    recv_map : Dict[str, int]
        Receiver index map.

    Returns
    -------
    HITSArtifact
        Final artifact.
    """
    # Reverse maps
    idx_to_prov = {v: k for k, v in prov_map.items()}
    idx_to_recv = {v: k for k, v in recv_map.items()}

    # Create DataFrames
    # Providers (Hubs)
    # Note: prov_map keys are "ID::PROVIDER". We strip suffix for canonical ID.

    prov_ids = [idx_to_prov[i] for i in range(len(h))]
    # Strip suffix "::PROVIDER"
    canonical_provs = [pid.split("::")[0] for pid in prov_ids]

    df_hubs = pd.DataFrame({
        'provider_node_id': prov_ids,
        'hub_score_hits': h
    }, index=canonical_provs)
    df_hubs.index.name = 'canonical_org_id'

    # Receivers (Authorities)
    recv_ids = [idx_to_recv[i] for i in range(len(a))]
    canonical_recvs = [rid.split("::")[0] for rid in recv_ids]

    df_auths = pd.DataFrame({
        'receiver_node_id': recv_ids,
        'authority_score_hits': a
    }, index=canonical_recvs)
    df_auths.index.name = 'canonical_org_id'

    artifact = HITSArtifact(
        hub_scores=df_hubs,
        authority_scores=df_auths,
        convergence_meta=meta
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_hits_centrality(
    bipartite_artifact: Any # BipartiteGraphArtifact
) -> HITSArtifact:
    """
    Orchestrator for Task 18. Computes HITS centrality on the bipartite graph.

    Parameters
    ----------
    bipartite_artifact : BipartiteGraphArtifact
        The bipartite graph.

    Returns
    -------
    HITSArtifact
        The computed scores.
    """
    logger.info("Starting Task 18: HITS Centrality...")

    # Extract matrix and maps
    B = bipartite_artifact.incidence_matrix
    prov_map = bipartite_artifact.provider_index_map
    recv_map = bipartite_artifact.receiver_index_map

    # Step 1 & 2: Run Algorithm
    h, a, meta = run_hits_algorithm(B)

    # Step 3: Package
    artifact = package_hits_artifact(h, a, meta, prov_map, recv_map)

    logger.info("Task 18 Completed Successfully.")

    return artifact


In [None]:
# Task 19 — Compute degree and betweenness centrality

# ==============================================================================
# Task 19: Compute degree and betweenness centrality
# ==============================================================================

@dataclass
class CentralityArtifact:
    """
    Container for all computed network centrality metrics.

    This dataclass serves as the unified repository for the topological influence
    metrics calculated across the different graph representations. It merges
    metrics derived from the bipartite graph (Hub Scores) with those derived
    from the one-mode projection (Degree, Strength, Betweenness). This consolidated
    view allows for the multi-dimensional ranking of actors required to construct
    the "Solar System" visualization and identify key brokers.

    Attributes
    ----------
    centrality_table : pd.DataFrame
        A master DataFrame indexed by `canonical_org_id` containing all computed
        centrality measures. Columns include `hub_score_hits`, `degree`,
        `strength`, and `betweenness`.
    metadata : Dict[str, Any]
        A dictionary recording the methodological choices made during centrality
        computation, such as the specific algorithm variants (e.g., unweighted
        betweenness) and source graphs used.
    """
    # The unified centrality DataFrame
    centrality_table: pd.DataFrame

    # Methodological metadata
    metadata: Dict[str, Any]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the centrality artifact.

        Returns
        -------
        str
            A human-readable summary of the number of nodes scored.
        """
        # Format summary string
        return (f"Centrality: Computed Degree, Strength, Betweenness, Hubs "
                f"for {len(self.centrality_table)} nodes.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 1: Compute degree centrality
# -------------------------------------------------------------------------------------------------------------------------------

def compute_degree_metrics(
    adjacency_matrix: sparse.csr_matrix,
    index_map: Dict[str, int]
) -> pd.DataFrame:
    """
    Computes unweighted degree and weighted strength for each node.

    Parameters
    ----------
    adjacency_matrix : sparse.csr_matrix
        Symmetric provider-provider adjacency P.
    index_map : Dict[str, int]
        Map from canonical ID to matrix index.

    Returns
    -------
    pd.DataFrame
        Table with columns ['degree', 'strength'], indexed by canonical_org_id.
    """
    # Reverse map
    idx_to_id = {v: k for k, v in index_map.items()}

    # Weighted Strength: Sum of row weights
    # axis=1 sums rows
    strength = np.array(adjacency_matrix.sum(axis=1)).flatten()

    # Unweighted Degree: Count of non-zero entries per row
    degree = adjacency_matrix.getnnz(axis=1)

    # Create DataFrame
    # Ensure ordering matches index 0..N-1
    ids = [idx_to_id[i] for i in range(len(strength))]

    df_degree = pd.DataFrame({
        'degree': degree,
        'strength': strength
    }, index=ids)

    df_degree.index.name = 'canonical_org_id'

    logger.info(f"Degree Centrality: Computed for {len(df_degree)} nodes.")

    return df_degree


# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 2: Compute betweenness centrality
# -------------------------------------------------------------------------------------------------------------------------------

def compute_betweenness_metrics(
    adjacency_matrix: sparse.csr_matrix,
    index_map: Dict[str, int]
) -> pd.DataFrame:
    """
    Computes Betweenness Centrality using Brandes' algorithm on the unweighted graph.

    Note: We use unweighted betweenness because edge weights in P are similarities,
    not distances. Shortest paths on similarity weights would require inverting
    weights (1/w), which is an assumption not stated in the manuscript.
    Topological brokerage is typically defined on the existence of ties.

    Parameters
    ----------
    adjacency_matrix : sparse.csr_matrix
        Symmetric adjacency P.
    index_map : Dict[str, int]
        Map from canonical ID to matrix index.

    Returns
    -------
    pd.DataFrame
        Table with column ['betweenness'], indexed by canonical_org_id.
    """
    # Convert to NetworkX graph
    # from_scipy_sparse_array is efficient
    G = nx.from_scipy_sparse_array(adjacency_matrix)

    # Compute Betweenness
    # normalized=True is standard (divides by (N-1)(N-2))
    # weight=None implies unweighted BFS for shortest paths
    logger.info("Betweenness: Running Brandes' algorithm (unweighted)...")
    bc_dict = nx.betweenness_centrality(G, weight=None, normalized=True)

    # Map indices back to IDs
    idx_to_id = {v: k for k, v in index_map.items()}

    # Convert to DataFrame
    # bc_dict keys are integers (node indices)
    ids = []
    scores = []
    for idx, score in bc_dict.items():
        if idx in idx_to_id:
            ids.append(idx_to_id[idx])
            scores.append(score)

    df_bc = pd.DataFrame({
        'betweenness': scores
    }, index=ids)
    df_bc.index.name = 'canonical_org_id'

    logger.info(f"Betweenness: Computed for {len(df_bc)} nodes.")

    return df_bc


# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 3: Persist centrality tables
# -------------------------------------------------------------------------------------------------------------------------------

def unify_centrality_metrics(
    df_degree: pd.DataFrame,
    df_betweenness: pd.DataFrame,
    hits_artifact: Any # HITSArtifact
) -> CentralityArtifact:
    """
    Joins all centrality metrics into a single master table.

    Parameters
    ----------
    df_degree : pd.DataFrame
        Degree/Strength.
    df_betweenness : pd.DataFrame
        Betweenness.
    hits_artifact : HITSArtifact
        Contains Hub Scores.

    Returns
    -------
    CentralityArtifact
        Unified results.
    """
    # Get Hub Scores
    # Note: Hub scores are for Providers.
    df_hubs = hits_artifact.hub_scores[['hub_score_hits']]

    # Join
    # We start with the projection nodes (Degree/Betweenness are defined on Projection)
    # Hub scores should exist for all providers in the projection (since projection comes from bipartite)
    # Use outer join to be safe, but expect full overlap on providers
    df_master = df_degree.join(df_betweenness, how='outer')
    df_master = df_master.join(df_hubs, how='outer')

    # Fill NaNs
    # If a node is in bipartite but not projection (isolated in projection?), it has 0 degree/betweenness
    # If a node is in projection but not bipartite (impossible), it has NaN hub score
    df_master = df_master.fillna(0.0)

    meta = {
        "betweenness_type": "unweighted",
        "degree_type": "unweighted_and_weighted",
        "hub_score_source": "bipartite_hits"
    }

    artifact = CentralityArtifact(
        centrality_table=df_master,
        metadata=meta
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_network_centrality(
    projection_artifact: Any, # ProjectionArtifact
    hits_artifact: Any # HITSArtifact
) -> CentralityArtifact:
    """
    Orchestrator for Task 19. Computes and unifies network centrality metrics.

    Parameters
    ----------
    projection_artifact : ProjectionArtifact
        The provider-provider graph.
    hits_artifact : HITSArtifact
        The bipartite hub scores.

    Returns
    -------
    CentralityArtifact
        The unified centrality table.
    """
    logger.info("Starting Task 19: Centrality Computation...")

    P = projection_artifact.adjacency_matrix
    idx_map = projection_artifact.provider_index_map

    # Step 1: Degree
    df_deg = compute_degree_metrics(P, idx_map)

    # Step 2: Betweenness
    df_bet = compute_betweenness_metrics(P, idx_map)

    # Step 3: Unify
    artifact = unify_centrality_metrics(df_deg, df_bet, hits_artifact)

    logger.info("Task 19 Completed Successfully.")

    return artifact


In [None]:
# Task 20 — Construct solar system ranking and ring assignment

# ==============================================================================
# Task 20: Construct solar system ranking and ring assignment
# ==============================================================================

@dataclass
class SolarSystemArtifact:
    """
    Container for the Solar System visualization data.

    This dataclass encapsulates the final output required to render the "Solar System"
    visualization of the global aid network. It combines the centrality-based ranking
    of organisations (determining their orbital position) with transaction-based
    sizing metrics (determining node radius). The artifact explicitly separates the
    full ranked list from the "Top 100" display set and includes the definitions
    used to assign organisations to specific rings (Inner, Middle, Outer), ensuring
    that the visual hierarchy is traceable to quantitative thresholds.

    Attributes
    ----------
    ranked_nodes : pd.DataFrame
        A DataFrame containing all ranked organisations, indexed by `canonical_org_id`.
        Columns include `rank`, `hub_score_hits`, `node_size`, and `ring`.
    top_100_table : pd.DataFrame
        A subset of `ranked_nodes` containing only the top 100 organisations,
        optimized for visualization rendering.
    ring_definitions : Dict[str, str]
        A dictionary mapping ring labels (e.g., "Inner") to their rank-based
        definitions (e.g., "Rank 1-25"), documenting the segmentation logic.
    """
    # The full table of ranked nodes
    ranked_nodes: pd.DataFrame

    # The subset of top 100 nodes for display
    top_100_table: pd.DataFrame

    # Metadata defining the ring boundaries
    ring_definitions: Dict[str, str]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the Solar System artifact.

        Returns
        -------
        str
            A human-readable summary of the ranking scope and core size.
        """
        # Format summary string
        return (f"Solar System: {len(self.ranked_nodes)} nodes ranked. "
                f"Inner Ring: 25 nodes.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 1: Rank organisations by Hub Score
# -------------------------------------------------------------------------------------------------------------------------------

def rank_organisations(
    centrality_artifact: Any # CentralityArtifact
) -> pd.DataFrame:
    """
    Ranks organisations by Hub Score with deterministic tie-breaking.

    Parameters
    ----------
    centrality_artifact : CentralityArtifact
        Unified centrality table.

    Returns
    -------
    pd.DataFrame
        Table sorted by rank, with 'rank' column.
    """
    df = centrality_artifact.centrality_table.copy()

    # Ensure index is a column for sorting
    df = df.reset_index()

    # Sort: Hub Score Descending, then ID Ascending (Tie-break)
    df = df.sort_values(
        by=['hub_score_hits', 'canonical_org_id'],
        ascending=[False, True]
    )

    # Assign Rank (1-based)
    df['rank'] = range(1, len(df) + 1)

    # Restore index
    df = df.set_index('canonical_org_id')

    logger.info(f"Ranking: Ranked {len(df)} organisations.")

    return df


# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 2: Assign node sizes
# -------------------------------------------------------------------------------------------------------------------------------

def assign_node_sizes(
    ranked_df: pd.DataFrame,
    node_size_artifact: Any # NodeSizeArtifact
) -> pd.DataFrame:
    """
    Attaches transaction counts (node sizes) to the ranking table.

    Parameters
    ----------
    ranked_df : pd.DataFrame
        Ranked organisations.
    node_size_artifact : NodeSizeArtifact
        Deal counts.

    Returns
    -------
    pd.DataFrame
        Ranked table with 'node_size' column.
    """
    # Join deal counts
    # We use 'total_tx_count' as the size metric
    sizes = node_size_artifact.deal_counts[['total_tx_count']]

    # Left join to keep ranked order (though indices should match)
    df_sized = ranked_df.join(sizes, how='left')

    # Rename for clarity
    df_sized = df_sized.rename(columns={'total_tx_count': 'node_size'})

    # Fill missing sizes with 0 (should not happen if populations align)
    df_sized['node_size'] = df_sized['node_size'].fillna(0).astype(int)

    return df_sized


# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 3: Derive ring assignments from hub score distribution
# -------------------------------------------------------------------------------------------------------------------------------

def assign_rings(
    df_sized: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, str]]:
    """
    Assigns organisations to rings based on rank.

    Rule:
    - Inner: Rank 1-25
    - Middle: Rank 26-100
    - Outer: Rank > 100

    Parameters
    ----------
    df_sized : pd.DataFrame
        Ranked and sized table.

    Returns
    -------
    Tuple[pd.DataFrame, Dict[str, str]]
        1. Table with 'ring' column.
        2. Ring definition metadata.
    """
    df_out = df_sized.copy()

    conditions = [
        (df_out['rank'] <= 25),
        (df_out['rank'] <= 100)
    ]
    choices = ['Inner', 'Middle']

    df_out['ring'] = np.select(conditions, choices, default='Outer')

    definitions = {
        "Inner": "Rank 1-25 (Core)",
        "Middle": "Rank 26-100",
        "Outer": "Rank > 100"
    }

    logger.info("Ring Assignment: Applied rank-based rings.")

    return df_out, definitions


# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def construct_solar_system(
    centrality_artifact: Any, # CentralityArtifact
    node_size_artifact: Any # NodeSizeArtifact
) -> SolarSystemArtifact:
    """
    Orchestrator for Task 20. Constructs the Solar System visualization data.

    Parameters
    ----------
    centrality_artifact : CentralityArtifact
        Centrality scores.
    node_size_artifact : NodeSizeArtifact
        Node sizes.

    Returns
    -------
    SolarSystemArtifact
        The solar system data.
    """
    logger.info("Starting Task 20: Solar System Construction...")

    # Step 1: Rank
    ranked = rank_organisations(centrality_artifact)

    # Step 2: Size
    sized = assign_node_sizes(ranked, node_size_artifact)

    # Step 3: Rings
    final_df, defs = assign_rings(sized)

    # Extract Top 100 for display
    top_100 = final_df[final_df['rank'] <= 100].copy()

    artifact = SolarSystemArtifact(
        ranked_nodes=final_df,
        top_100_table=top_100,
        ring_definitions=defs
    )

    logger.info("Task 20 Completed Successfully.")

    return artifact


In [None]:
# Task 21 — Reproduce subgroup statistics (universities and foundations)

# ==============================================================================
# Task 21: Reproduce subgroup statistics (universities and foundations)
# ==============================================================================

@dataclass
class SubgroupStatsArtifact:
    """
    Container for subgroup analysis results.

    This dataclass encapsulates the comparative statistics derived for specific
    functional subgroups of the aid network, namely Universities (Academic/Research)
    and Foundations. It provides the quantitative evidence required to support
    manuscript claims regarding the "paradox of scale vs. influence," where
    smaller actors (by deal count) occupy disproportionately central positions.
    The artifact includes both the aggregated summary table and a lookup table
    identifying the specific broker organisations (e.g., J-PAL, Hewlett) used
    as case studies.

    Attributes
    ----------
    summary_table : pd.DataFrame
        A DataFrame indexed by group name ("Global", "Universities", "Foundations")
        containing summary statistics: count, mean/median deal size, median rank,
        and representation in the top 100/1000.
    broker_lookup : pd.DataFrame
        A DataFrame containing the canonical identities and metadata for the
        specific broker organisations highlighted in the text, facilitating
        downstream ego-network extraction.
    """
    # The summary statistics table
    summary_table: pd.DataFrame

    # The broker identification table
    broker_lookup: pd.DataFrame

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the subgroup statistics.

        Returns
        -------
        str
            A human-readable summary indicating the subgroups analyzed.
        """
        # Format summary string
        return "Subgroup Stats: Computed for Academic/Research and Foundation."

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 1: Partition organisations by type
# -------------------------------------------------------------------------------------------------------------------------------

def partition_subgroups(
    df_organisations: pd.DataFrame,
    ranked_nodes: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Partitions the ranked organisations into Universities and Foundations based on
    the controlled taxonomy.

    Parameters
    ----------
    df_organisations : pd.DataFrame
        Master organisation table with 'org_type'.
    ranked_nodes : pd.DataFrame
        Ranked organisations (providers).

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        1. Universities DataFrame (subset of ranked_nodes).
        2. Foundations DataFrame (subset of ranked_nodes).
    """
    # Map types to ranked nodes
    # We need to join org_type from master to ranked nodes
    # ranked_nodes is indexed by canonical_org_id

    # Prepare type map
    type_map = (
        df_organisations
        .dropna(subset=['canonical_org_id', 'org_type'])
        .drop_duplicates('canonical_org_id')
        .set_index('canonical_org_id')['org_type']
    )

    # Enrich ranked nodes
    df_enriched = ranked_nodes.copy()
    df_enriched['org_type'] = df_enriched.index.map(type_map)

    # Filter
    # Taxonomy from Task 8: "Academic/Research", "Foundation"
    universities = df_enriched[df_enriched['org_type'] == "Academic/Research"].copy()
    foundations = df_enriched[df_enriched['org_type'] == "Foundation"].copy()

    logger.info(f"Subgroups: Found {len(universities)} Universities, {len(foundations)} Foundations in ranked set.")

    return universities, foundations


# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 2: Compute comparative statistics
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_comparative_stats(
    universities: pd.DataFrame,
    foundations: pd.DataFrame,
    all_ranked: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes comparative statistics (deal count, rank) for subgroups vs global.

    Parameters
    ----------
    universities : pd.DataFrame
        University subgroup.
    foundations : pd.DataFrame
        Foundation subgroup.
    all_ranked : pd.DataFrame
        All ranked nodes.

    Returns
    -------
    pd.DataFrame
        Summary table.
    """
    def get_stats(df: pd.DataFrame, label: str) -> Dict[str, Any]:
        return {
            "Group": label,
            "Count": len(df),
            "Mean_Deals": df['node_size'].mean(),
            "Median_Deals": df['node_size'].median(),
            "Median_Rank": df['rank'].median(),
            "In_Top_100": (df['rank'] <= 100).sum(),
            "In_Top_1000": (df['rank'] <= 1000).sum()
        }

    stats_list = [
        get_stats(all_ranked, "Global"),
        get_stats(universities, "Universities"),
        get_stats(foundations, "Foundations")
    ]

    summary = pd.DataFrame(stats_list).set_index("Group")

    # Log comparison to manuscript claims
    # Manuscript: Universities median rank 434 vs global 571
    logger.info("Subgroup Stats:\n" + str(summary))

    return summary


# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 3: Persist subgroup summary table
# -------------------------------------------------------------------------------------------------------------------------------

def identify_brokers(
    df_organisations: pd.DataFrame
) -> pd.DataFrame:
    """
    Identifies specific brokers (J-PAL, Hewlett) for downstream analysis.

    Parameters
    ----------
    df_organisations : pd.DataFrame
        Master table.

    Returns
    -------
    pd.DataFrame
        Lookup table for brokers.
    """
    # We look for "Abdul Latif Jameel Poverty Action Lab" or "J-PAL"
    # and "William and Flora Hewlett Foundation"
    # This relies on the canonical names or aliases being present.

    # Simple search in normalized names
    mask_jpal = df_organisations['org_name_normalized'].str.contains("poverty action lab", na=False)
    mask_hewlett = df_organisations['org_name_normalized'].str.contains("hewlett foundation", na=False)

    brokers = df_organisations[mask_jpal | mask_hewlett].copy()

    # Keep relevant columns
    brokers = brokers[['canonical_org_id', 'org_name', 'org_type']].drop_duplicates('canonical_org_id')

    logger.info(f"Broker Identification: Found {len(brokers)} candidates.")

    return brokers

def package_subgroup_artifact(
    summary: pd.DataFrame,
    brokers: pd.DataFrame
) -> SubgroupStatsArtifact:
    """
    Packages the subgroup stats.

    Parameters
    ----------
    summary : pd.DataFrame
        Stats table.
    brokers : pd.DataFrame
        Broker lookup.

    Returns
    -------
    SubgroupStatsArtifact
        Final artifact.
    """
    artifact = SubgroupStatsArtifact(
        summary_table=summary,
        broker_lookup=brokers
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_subgroup_statistics(
    df_organisations: pd.DataFrame,
    solar_system_artifact: Any # SolarSystemArtifact
) -> SubgroupStatsArtifact:
    """
    Orchestrator for Task 21. Computes subgroup statistics.

    Parameters
    ----------
    df_organisations : pd.DataFrame
        Master table.
    solar_system_artifact : SolarSystemArtifact
        Ranked nodes.

    Returns
    -------
    SubgroupStatsArtifact
        The stats.
    """
    logger.info("Starting Task 21: Subgroup Statistics...")

    ranked = solar_system_artifact.ranked_nodes

    # Step 1: Partition
    unis, founds = partition_subgroups(df_organisations, ranked)

    # Step 2: Stats
    summary = calculate_comparative_stats(unis, founds, ranked)

    # Step 3: Brokers
    brokers = identify_brokers(df_organisations)

    # Package
    artifact = package_subgroup_artifact(summary, brokers)

    logger.info("Task 21 Completed Successfully.")

    return artifact


In [None]:
# Task 22 — Reproduce broker betweenness comparison (Extended Data Fig. S3-type)

# ==============================================================================
# Task 22: Reproduce broker betweenness comparison (Extended Data Fig. S3-type)
# ==============================================================================

@dataclass
class BrokerComparisonArtifact:
    """
    Container for broker betweenness comparison data.

    This dataclass encapsulates the quantitative evidence supporting the manuscript's
    central finding: that specific "knowledge brokers" (e.g., J-PAL, Hewlett Foundation)
    possess disproportionately high betweenness centrality relative to their financial
    volume. It provides a direct comparison between the betweenness scores of these
    brokers and the median betweenness of the "Inner Ring" (Top 25) actors. This
    comparison validates the "hidden geometry" hypothesis by demonstrating that
    structural influence is distinct from, and often uncorrelated with, aggregate
    transaction volume.

    Attributes
    ----------
    comparison_table : pd.DataFrame
        A DataFrame indexed by `canonical_org_id` containing the betweenness scores
        for the identified brokers, the baseline median of the Top 25, and the
        calculated ratio (`broker_score / median_score`).
    median_top25_betweenness : float
        The scalar median betweenness centrality of the 25 most central organisations
        (by Hub Score). This serves as the baseline for "high influence" in the network.
    """
    # The comparison table with scores and ratios
    comparison_table: pd.DataFrame

    # The baseline median value
    median_top25_betweenness: float

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the broker comparison.

        Returns
        -------
        str
            A human-readable summary of the baseline median and number of brokers compared.
        """
        # Format summary string
        return (f"Broker Comparison: Median Top 25 = {self.median_top25_betweenness:.4f}. "
                f"Brokers analyzed: {len(self.comparison_table)}.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 1: Extract betweenness for highlighted brokers
# -------------------------------------------------------------------------------------------------------------------------------

def extract_broker_scores(
    centrality_artifact: Any, # CentralityArtifact
    subgroup_artifact: Any # SubgroupStatsArtifact
) -> pd.DataFrame:
    """
    Retrieves betweenness centrality scores for the identified broker organisations.

    Parameters
    ----------
    centrality_artifact : CentralityArtifact
        Unified centrality table.
    subgroup_artifact : SubgroupStatsArtifact
        Contains broker lookup table.

    Returns
    -------
    pd.DataFrame
        Table with columns ['org_name', 'betweenness'], indexed by canonical_org_id.
    """
    # Get broker IDs
    brokers = subgroup_artifact.broker_lookup.copy()

    # Get centrality
    centrality = centrality_artifact.centrality_table[['betweenness']]

    # Join
    # Inner join to ensure we only get brokers that exist in the network
    broker_scores = brokers.join(centrality, on='canonical_org_id', how='inner')

    # Set index
    broker_scores = broker_scores.set_index('canonical_org_id')

    logger.info(f"Broker Scores: Retrieved for {len(broker_scores)} brokers.")

    return broker_scores


# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 2: Compute Top 25 median betweenness
# -------------------------------------------------------------------------------------------------------------------------------

def compute_top25_median(
    centrality_artifact: Any, # CentralityArtifact
    solar_system_artifact: Any # SolarSystemArtifact
) -> float:
    """
    Computes the median betweenness centrality of the Top 25 organisations (Inner Ring).

    Parameters
    ----------
    centrality_artifact : CentralityArtifact
        Centrality scores.
    solar_system_artifact : SolarSystemArtifact
        Ranked nodes.

    Returns
    -------
    float
        Median betweenness.
    """
    # Identify Top 25 IDs
    ranked = solar_system_artifact.ranked_nodes
    top_25_ids = ranked[ranked['rank'] <= 25].index

    # Get scores
    centrality = centrality_artifact.centrality_table
    top_25_scores = centrality.loc[centrality.index.intersection(top_25_ids), 'betweenness']

    # Compute median
    median_val = top_25_scores.median()

    # Handle empty case (unlikely)
    if pd.isna(median_val):
        median_val = 0.0

    logger.info(f"Top 25 Median Betweenness: {median_val:.6f}")

    return float(median_val)


# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 3: Persist comparison artifact
# -------------------------------------------------------------------------------------------------------------------------------

def package_broker_comparison(
    broker_scores: pd.DataFrame,
    median_val: float
) -> BrokerComparisonArtifact:
    """
    Packages the comparison results.

    Parameters
    ----------
    broker_scores : pd.DataFrame
        Broker scores.
    median_val : float
        Baseline median.

    Returns
    -------
    BrokerComparisonArtifact
        Final artifact.
    """
    df_out = broker_scores.copy()
    df_out['median_top25'] = median_val

    # Compute ratio
    # Handle division by zero
    if median_val > 0:
        df_out['ratio_to_median'] = df_out['betweenness'] / median_val
    else:
        df_out['ratio_to_median'] = np.inf # Or NaN, but Inf signals infinite advantage

    artifact = BrokerComparisonArtifact(
        comparison_table=df_out,
        median_top25_betweenness=median_val
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compare_broker_betweenness(
    centrality_artifact: Any, # CentralityArtifact
    subgroup_artifact: Any, # SubgroupStatsArtifact
    solar_system_artifact: Any # SolarSystemArtifact
) -> BrokerComparisonArtifact:
    """
    Orchestrator for Task 22. Compares broker betweenness to the core median.

    Parameters
    ----------
    centrality_artifact : CentralityArtifact
        Centrality scores.
    subgroup_artifact : SubgroupStatsArtifact
        Broker info.
    solar_system_artifact : SolarSystemArtifact
        Rankings.

    Returns
    -------
    BrokerComparisonArtifact
        The comparison data.
    """
    logger.info("Starting Task 22: Broker Comparison...")

    # Step 1: Extract
    scores = extract_broker_scores(centrality_artifact, subgroup_artifact)

    # Step 2: Baseline
    median = compute_top25_median(centrality_artifact, solar_system_artifact)

    # Step 3: Package
    artifact = package_broker_comparison(scores, median)

    logger.info("Task 22 Completed Successfully.")

    return artifact


In [None]:
# Task 23 — Reproduce Hewlett subnetwork characterization

# ==============================================================================
# Task 23: Reproduce Hewlett subnetwork characterization
# ==============================================================================

@dataclass
class HewlettNetworkArtifact:
    """
    Container for the Hewlett Foundation ego network analysis.

    This dataclass encapsulates the extracted subnetwork centered on the William
    and Flora Hewlett Foundation, a key "knowledge broker" identified in the study.
    It contains the edge list representing the Foundation's direct funding
    relationships (downstream partners) and a set of portfolio metrics that
    quantify its structural reach. This artifact supports the visualization of
    how a single central actor bridges disparate clusters (e.g., connecting
    research institutes to implementation NGOs) and validates the claim that
    influence flows through connectivity.

    Attributes
    ----------
    ego_edges : pd.DataFrame
        A DataFrame containing the edge list of the Hewlett Foundation's ego network.
        Columns include `receiver_canonical_id` (the partner organisation) and
        `tx_count` (the number of transactions).
    portfolio_metrics : Dict[str, Any]
        A dictionary of scalar metrics describing the portfolio, including the
        count of `unique_partners`, the `total_transactions` volume, and the
        number of partners that are also members of the global `partners_in_top_100`.
    """
    # The edge list of the ego network
    ego_edges: pd.DataFrame

    # Summary metrics of the portfolio
    portfolio_metrics: Dict[str, Any]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the Hewlett network.

        Returns
        -------
        str
            A human-readable summary of the network size and transaction volume.
        """
        # Format summary string
        return (f"Hewlett Network: {self.portfolio_metrics['unique_partners']} partners, "
                f"{self.portfolio_metrics['total_transactions']} transactions.")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 1: Define Hewlett ego network extraction rule
# -------------------------------------------------------------------------------------------------------------------------------

def extract_hewlett_ego(
    df_transactions: pd.DataFrame,
    subgroup_artifact: Any, # SubgroupStatsArtifact
    canonical_mapping: pd.DataFrame
) -> pd.DataFrame:
    """
    Extracts the ego network for the William and Flora Hewlett Foundation.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Cleansed transactions.
    subgroup_artifact : SubgroupStatsArtifact
        Contains broker lookup to find Hewlett's ID.
    canonical_mapping : pd.DataFrame
        Mapping to resolve transaction endpoints if not already mapped in df_transactions.
        (Note: Task 13 mapped them, but we might need to re-map if df_transactions is raw cleansed).
        Actually, we should use the mapped transaction table from Task 13 if available,
        or re-map here. Let's assume we pass the mapped transaction table from Task 13
        or re-map using the artifact.

        Better: Use the mapped transaction table from Task 13 (df_tx_mapped) if possible.
        However, the orchestrator flow might not persist that large table in memory.
        We will re-map efficiently here using the canonical mapping artifact.

    Returns
    -------
    pd.DataFrame
        Edge list of Hewlett's downstream partners (receiver, tx_count).
    """
    # Find Hewlett ID
    brokers = subgroup_artifact.broker_lookup

    # Look for "Hewlett" in name if ID not known, but we found it in Task 21.
    # We assume the broker lookup contains the correct row.
    # We filter for the one with "Hewlett" in name.
    hewlett_row = brokers[brokers['org_name'].str.contains("Hewlett", case=False, na=False)]

    if len(hewlett_row) == 0:
        logger.warning("Hewlett Foundation not found in broker lookup.")
        return pd.DataFrame(columns=['receiver_canonical_id', 'tx_count'])

    hewlett_id = hewlett_row.iloc[0]['canonical_org_id']
    logger.info(f"Identified Hewlett ID: {hewlett_id}")

    # Map transactions to canonical if needed
    # We need provider_canonical_id and receiver_canonical_id
    # We can use the mapping table

    # Create map
    mapping_dict = canonical_mapping.set_index('org_ref')['canonical_org_id'].to_dict()

    # Filter for Hewlett as provider (using Ref or Name)
    # Optimization: Find refs that map to Hewlett ID
    hewlett_refs = [k for k, v in mapping_dict.items() if v == hewlett_id]

    # Filter transactions where provider ref is in hewlett_refs
    # OR provider name matches (if we want to be loose, but strict is better)
    mask_provider = df_transactions['transaction_provider_org_ref'].isin(hewlett_refs)

    df_ego = df_transactions[mask_provider].copy()

    # Map receivers
    df_ego['receiver_canonical_id'] = df_ego['transaction_receiver_org_ref'].map(mapping_dict)
    # Fallback to name if needed (as in Task 13)
    df_ego['receiver_canonical_id'] = df_ego['receiver_canonical_id'].fillna(df_ego['transaction_receiver_org_name'])

    # Aggregate
    edges = (
        df_ego
        .groupby('receiver_canonical_id')
        .size()
        .reset_index(name='tx_count')
        .sort_values('tx_count', ascending=False)
    )

    logger.info(f"Hewlett Ego: Found {len(edges)} downstream partners.")

    return edges


# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 2: Compute Hewlett portfolio metrics
# -------------------------------------------------------------------------------------------------------------------------------

def compute_portfolio_metrics(
    ego_edges: pd.DataFrame,
    solar_system_artifact: Any # SolarSystemArtifact
) -> Dict[str, Any]:
    """
    Computes metrics for Hewlett's portfolio.

    Parameters
    ----------
    ego_edges : pd.DataFrame
        Hewlett's edges.
    solar_system_artifact : SolarSystemArtifact
        Rankings.

    Returns
    -------
    Dict[str, Any]
        Metrics dictionary.
    """
    # Basic stats
    unique_partners = len(ego_edges)
    total_tx = ego_edges['tx_count'].sum()

    # Top 100 overlap
    top_100_ids = set(solar_system_artifact.top_100_table.index)
    partners = set(ego_edges['receiver_canonical_id'])

    overlap = partners.intersection(top_100_ids)
    count_top_100 = len(overlap)

    metrics = {
        "unique_partners": unique_partners,
        "total_transactions": int(total_tx),
        "partners_in_top_100": count_top_100,
        "top_100_partner_ids": list(overlap)
    }

    logger.info(f"Hewlett Metrics: {unique_partners} partners, {total_tx} txs, {count_top_100} in Top 100.")

    return metrics


# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 3: Persist Hewlett subnetwork artifact
# -------------------------------------------------------------------------------------------------------------------------------

def package_hewlett_artifact(
    ego_edges: pd.DataFrame,
    metrics: Dict[str, Any]
) -> HewlettNetworkArtifact:
    """
    Packages the Hewlett network analysis.

    Parameters
    ----------
    ego_edges : pd.DataFrame
        Edges.
    metrics : Dict[str, Any]
        Metrics.

    Returns
    -------
    HewlettNetworkArtifact
        Final artifact.
    """
    artifact = HewlettNetworkArtifact(
        ego_edges=ego_edges,
        portfolio_metrics=metrics
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def characterize_hewlett_network(
    df_transactions: pd.DataFrame,
    subgroup_artifact: Any, # SubgroupStatsArtifact
    canonical_mapping_artifact: Any, # CanonicalMappingArtifact
    solar_system_artifact: Any # SolarSystemArtifact
) -> HewlettNetworkArtifact:
    """
    Orchestrator for Task 23. Characterizes the Hewlett Foundation's network.

    Parameters
    ----------
    df_transactions : pd.DataFrame
        Cleansed transactions.
    subgroup_artifact : SubgroupStatsArtifact
        Broker info.
    canonical_mapping_artifact : CanonicalMappingArtifact
        Mapping table.
    solar_system_artifact : SolarSystemArtifact
        Rankings.

    Returns
    -------
    HewlettNetworkArtifact
        The analysis results.
    """
    logger.info("Starting Task 23: Hewlett Network Characterization...")

    mapping = canonical_mapping_artifact.mapping_table

    # Step 1: Extract
    edges = extract_hewlett_ego(df_transactions, subgroup_artifact, mapping)

    # Step 2: Metrics
    metrics = compute_portfolio_metrics(edges, solar_system_artifact)

    # Step 3: Package
    artifact = package_hewlett_artifact(edges, metrics)

    logger.info("Task 23 Completed Successfully.")

    return artifact


In [None]:
# Task 24 — Construct website hyperlink graph for external validation

# ==============================================================================
# Task 24: Construct website hyperlink graph for external validation
# ==============================================================================

@dataclass
class WebGraphArtifact:
    """
    Container for the web hyperlink graph.

    This dataclass encapsulates the constructed external validation network,
    derived from website hyperlink data. It holds the sparse adjacency matrix
    representing the directed web graph, the mapping between domain names and
    matrix indices, and coverage statistics quantifying the overlap between
    the aid organisation population and the web crawl. This artifact is the
    input for the PageRank computation used to validate the "offline" centrality
    metrics.

    Attributes
    ----------
    adjacency_matrix : sparse.csr_matrix
        The sparse adjacency matrix of the web graph, where rows represent source
        domains and columns represent target domains. Entries are binary (1 for link).
    domain_index_map : Dict[str, int]
        A dictionary mapping normalized domain strings (e.g., "example.org") to
        their integer indices in the adjacency matrix.
    node_list : List[str]
        A list of all unique domains in the graph, ordered by their index.
    coverage_stats : Dict[str, Any]
        A dictionary containing metrics on the intersection between the organisation
        master list and the web graph nodes (e.g., match rate).
    """
    # The sparse adjacency matrix
    adjacency_matrix: sparse.csr_matrix

    # Map from domain string to matrix index
    domain_index_map: Dict[str, int]

    # List of unique domains
    node_list: List[str]

    # Coverage statistics
    coverage_stats: Dict[str, Any]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the web graph.

        Returns
        -------
        str
            A human-readable summary of the graph size and organisation coverage.
        """
        # Format summary string
        return (f"Web Graph: {len(self.node_list)} nodes, {self.adjacency_matrix.nnz} edges. "
                f"Org Coverage: {self.coverage_stats['match_rate']:.2%}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 1: Build domain-level directed graph
# -------------------------------------------------------------------------------------------------------------------------------

def build_web_adjacency(
    df_web_links: pd.DataFrame
) -> Tuple[sparse.csr_matrix, Dict[str, int], List[str]]:
    """
    Constructs the directed adjacency matrix of the web graph.

    Parameters
    ----------
    df_web_links : pd.DataFrame
        Raw web links with 'source_domain', 'target_domain'.

    Returns
    -------
    Tuple[sparse.csr_matrix, Dict[str, int], List[str]]
        1. Sparse adjacency matrix (rows=source, cols=target).
        2. Domain index map.
        3. List of unique domains (nodes).
    """
    # Normalize domains using the same logic as Task 8
    # We assume the helper normalize_domains is available or we re-implement
    # Re-implementing for self-containment within this task block logic
    def norm(s_series):
        return (
            s_series.astype(str)
            .str.lower().str.strip()
            .str.replace(r'^https?://', '', regex=True)
            .str.replace(r'^www\.', '', regex=True)
            .str.split('/').str[0]
        )

    df_links = df_web_links.copy()
    df_links['source'] = norm(df_links['source_domain'])
    df_links['target'] = norm(df_links['target_domain'])

    # Filter self-loops and empty
    df_links = df_links[df_links['source'] != df_links['target']]
    df_links = df_links[df_links['source'] != 'nan']
    df_links = df_links[df_links['target'] != 'nan']

    # Get unique nodes
    unique_domains = sorted(list(set(df_links['source']) | set(df_links['target'])))
    domain_map = {d: i for i, d in enumerate(unique_domains)}

    # Map to indices
    src_idx = df_links['source'].map(domain_map).values
    tgt_idx = df_links['target'].map(domain_map).values

    # Build Adjacency (Unweighted)
    # Use ones for existence
    data = np.ones(len(src_idx))

    shape = (len(unique_domains), len(unique_domains))
    adj = sparse.coo_matrix((data, (src_idx, tgt_idx)), shape=shape).tocsr()

    # Binarize (remove duplicate links)
    adj.data = np.ones_like(adj.data)

    logger.info(f"Web Graph: Built with {len(unique_domains)} nodes and {adj.nnz} edges.")

    return adj, domain_map, unique_domains


# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 2: Validate domain coverage
# -------------------------------------------------------------------------------------------------------------------------------

def validate_web_coverage(
    unique_domains: List[str],
    df_organisations: pd.DataFrame
) -> Dict[str, Any]:
    """
    Computes the overlap between organisation websites and the web graph.

    Parameters
    ----------
    unique_domains : List[str]
        Nodes in web graph.
    df_organisations : pd.DataFrame
        Master organisation table with 'website_domain_normalized'.

    Returns
    -------
    Dict[str, Any]
        Coverage stats.
    """
    web_nodes = set(unique_domains)

    # Get org domains
    # Filter for non-null
    org_domains = df_organisations['website_domain_normalized'].dropna().unique()

    # Compute intersection
    matched = [d for d in org_domains if d in web_nodes]

    match_count = len(matched)
    total_orgs_with_web = len(org_domains)

    rate = match_count / total_orgs_with_web if total_orgs_with_web > 0 else 0.0

    stats = {
        "total_org_domains": total_orgs_with_web,
        "matched_domains": match_count,
        "match_rate": rate
    }

    logger.info(f"Web Coverage: {match_count}/{total_orgs_with_web} ({rate:.2%}) matched.")

    return stats


# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 3: Persist web graph artifact
# -------------------------------------------------------------------------------------------------------------------------------

def package_web_artifact(
    adj: sparse.csr_matrix,
    domain_map: Dict[str, int],
    nodes: List[str],
    stats: Dict[str, Any]
) -> WebGraphArtifact:
    """
    Packages the web graph.

    Parameters
    ----------
    adj : sparse.csr_matrix
        Adjacency.
    domain_map : Dict[str, int]
        Index map.
    nodes : List[str]
        Node list.
    stats : Dict[str, Any]
        Stats.

    Returns
    -------
    WebGraphArtifact
        Final artifact.
    """
    artifact = WebGraphArtifact(
        adjacency_matrix=adj,
        domain_index_map=domain_map,
        node_list=nodes,
        coverage_stats=stats
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def construct_web_graph(
    df_web_links_raw: pd.DataFrame,
    df_organisations: pd.DataFrame
) -> WebGraphArtifact:
    """
    Orchestrator for Task 24. Constructs the web graph for validation.

    Parameters
    ----------
    df_web_links_raw : pd.DataFrame
        Raw links.
    df_organisations : pd.DataFrame
        Master orgs.

    Returns
    -------
    WebGraphArtifact
        The web graph.
    """
    logger.info("Starting Task 24: Web Graph Construction...")

    # Step 1: Build Graph
    adj, dmap, nodes = build_web_adjacency(df_web_links_raw)

    # Step 2: Validate
    stats = validate_web_coverage(nodes, df_organisations)

    # Step 3: Package
    artifact = package_web_artifact(adj, dmap, nodes, stats)

    logger.info("Task 24 Completed Successfully.")

    return artifact


In [None]:
# Task 25 — Compute website PageRank

# ==============================================================================
# Task 25: Compute website PageRank
# ==============================================================================

@dataclass
class PageRankArtifact:
    """
    Container for PageRank scores and metadata.

    This dataclass encapsulates the results of the PageRank computation on the
    external website hyperlink graph. It provides the "online" authority scores
    for each domain, which serve as the independent variable for validating the
    "offline" centrality metrics derived from the transaction network. The artifact
    also includes convergence metadata to ensure the numerical stability of the
    power iteration process.

    Attributes
    ----------
    pagerank_scores : pd.DataFrame
        A DataFrame indexed by `website_domain_normalized` containing the computed
        PageRank scores. These scores represent the stationary distribution of a
        random walker on the web graph.
    convergence_meta : Dict[str, Any]
        A dictionary recording the convergence status of the algorithm, including
        the number of iterations performed, the final L1 residual error, and the
        damping factor (`alpha`) used.
    """
    # The computed PageRank scores
    pagerank_scores: pd.DataFrame

    # Metadata regarding algorithm convergence
    convergence_meta: Dict[str, Any]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the PageRank results.

        Returns
        -------
        str
            A human-readable summary of the number of scored domains and
            convergence status.
        """
        # Format summary string
        return (f"PageRank: Computed for {len(self.pagerank_scores)} domains. "
                f"Converged: {self.convergence_meta['converged']}")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 1: Define PageRank iteration
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_pagerank_matrix(
    adjacency: sparse.csr_matrix
) -> Tuple[sparse.csr_matrix, np.ndarray]:
    """
    Prepares the column-stochastic transition matrix M and identifies dangling nodes.

    M_ij = 1 / outdeg(j) if j->i exists, else 0.
    (Note: We construct M such that x_new = M * x_old, so M is column-stochastic-ish.
     Actually, standard adjacency A has A_ij=1 if i->j.
     Transition matrix P has P_ij = 1/outdeg(i) if i->j.
     Power iteration: x_new = P.T * x_old.
     So we need P.T).

    Parameters
    ----------
    adjacency : sparse.csr_matrix
        Adjacency matrix A where A_ij = 1 if i -> j.

    Returns
    -------
    Tuple[sparse.csr_matrix, np.ndarray]
        1. Transposed transition matrix M = P.T.
        2. Boolean mask of dangling nodes (outdegree == 0).
    """
    n = adjacency.shape[0]

    # Calculate outdegrees (row sums)
    outdegrees = np.array(adjacency.sum(axis=1)).flatten()

    # Identify dangling nodes
    dangling_mask = (outdegrees == 0)

    # Normalize rows to create P
    # Avoid division by zero for dangling nodes
    # We multiply A by diag(1/outdeg)

    # Inverse outdegrees
    inv_outdegrees = np.zeros_like(outdegrees, dtype=float)
    inv_outdegrees[~dangling_mask] = 1.0 / outdegrees[~dangling_mask]

    # Create diagonal matrix D_inv
    D_inv = sparse.diags(inv_outdegrees)

    # P = D_inv * A
    P = D_inv.dot(adjacency)

    # We need M = P.T for x_new = M * x
    M = P.transpose()

    logger.info(f"PageRank Prep: {dangling_mask.sum()} dangling nodes identified.")

    return M, dangling_mask


# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 2: Iterate until convergence
# -------------------------------------------------------------------------------------------------------------------------------

def run_pagerank_power_iteration(
    M: sparse.csr_matrix,
    dangling_mask: np.ndarray,
    alpha: float = 0.85,
    tol: float = 1e-6,
    max_iter: int = 100
) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Executes the PageRank power iteration.

    x_{k+1} = alpha * M * x_k + (alpha * (sum(x_k[dangling]) / N) + (1-alpha) / N) * 1

    Parameters
    ----------
    M : sparse.csr_matrix
        Transposed transition matrix.
    dangling_mask : np.ndarray
        Mask of dangling nodes.
    alpha : float
        Damping factor.
    tol : float
        Convergence tolerance (L1 norm).
    max_iter : int
        Max iterations.

    Returns
    -------
    Tuple[np.ndarray, Dict[str, Any]]
        1. PageRank vector.
        2. Metadata.
    """
    n = M.shape[0]

    # Initialize uniform
    x = np.ones(n) / n

    converged = False
    iterations = 0
    final_diff = 0.0

    # Constant teleportation term (1-alpha)/N
    teleport = (1.0 - alpha) / n

    for i in range(max_iter):
        x_prev = x.copy()

        # Calculate dangling mass sum
        dangling_sum = x_prev[dangling_mask].sum()

        # Calculate redistribution term
        # This term is added to every node
        redist = (alpha * dangling_sum / n) + teleport

        # Update
        # x_new = alpha * M * x_prev + redist
        x = alpha * M.dot(x_prev) + redist

        # Check convergence (L1 norm)
        diff = np.linalg.norm(x - x_prev, 1)

        if diff < tol:
            converged = True
            iterations = i + 1
            final_diff = diff
            break

        iterations = i + 1
        final_diff = diff

    meta = {
        "converged": converged,
        "iterations": iterations,
        "final_diff": final_diff,
        "alpha": alpha
    }

    logger.info(f"PageRank: Converged={converged} in {iterations} iters (Diff={final_diff:.2e}).")

    return x, meta


# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 3: Persist PageRank vector
# -------------------------------------------------------------------------------------------------------------------------------

def package_pagerank_artifact(
    scores: np.ndarray,
    domain_map: Dict[str, int],
    meta: Dict[str, Any]
) -> PageRankArtifact:
    """
    Packages the PageRank results.

    Parameters
    ----------
    scores : np.ndarray
        Score vector.
    domain_map : Dict[str, int]
        Index map.
    meta : Dict[str, Any]
        Metadata.

    Returns
    -------
    PageRankArtifact
        Final artifact.
    """
    # Reverse map
    idx_to_domain = {v: k for k, v in domain_map.items()}

    # Create DataFrame
    domains = [idx_to_domain[i] for i in range(len(scores))]

    df_scores = pd.DataFrame({
        'pagerank': scores
    }, index=domains)
    df_scores.index.name = 'website_domain_normalized'

    artifact = PageRankArtifact(
        pagerank_scores=df_scores,
        convergence_meta=meta
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_website_pagerank(
    web_graph_artifact: Any, # WebGraphArtifact
    config: Dict[str, Any]
) -> PageRankArtifact:
    """
    Orchestrator for Task 25. Computes PageRank on the web graph.

    Parameters
    ----------
    web_graph_artifact : WebGraphArtifact
        The web graph.
    config : Dict[str, Any]
        Configuration (for alpha, though usually fixed).

    Returns
    -------
    PageRankArtifact
        The scores.
    """
    logger.info("Starting Task 25: PageRank Computation...")

    # Extract
    adj = web_graph_artifact.adjacency_matrix
    dmap = web_graph_artifact.domain_index_map

    # Assumption: alpha=0.85
    alpha = 0.85

    # Step 1: Prepare
    M, dangling = prepare_pagerank_matrix(adj)

    # Step 2: Iterate
    scores, meta = run_pagerank_power_iteration(M, dangling, alpha=alpha)

    # Step 3: Package
    artifact = package_pagerank_artifact(scores, dmap, meta)

    logger.info("Task 25 Completed Successfully.")

    return artifact


In [None]:
# Task 26 — Compute offline-online Pearson correlation

# ==============================================================================
# Task 26: Compute offline-online Pearson correlation
# ==============================================================================

@dataclass
class CorrelationArtifact:
    """
    Container for the offline-online correlation validation.

    This dataclass encapsulates the final validation metric of the study: the
    Pearson correlation coefficient between an organisation's "offline" structural
    influence (Hub Score in the transaction network) and its "online" visibility
    (PageRank in the web hyperlink graph). This metric serves as an external
    validity check for the constructed aid network, confirming that the topological
    centrality derived from financial flows aligns with independent measures of
    institutional authority. The artifact includes the matched dataset used for
    calculation, the statistical results, and the discrepancy relative to the
    manuscript's reported value (r=0.48).

    Attributes
    ----------
    matched_data : pd.DataFrame
        A DataFrame indexed by `canonical_org_id` containing the aligned
        `hub_score_hits` and `pagerank` values for the intersection of organisations
        present in both the transaction network and the web crawl.
    pearson_r : float
        The computed Pearson correlation coefficient (r).
    p_value : float
        The two-tailed p-value for the hypothesis that the correlation is zero.
    sample_size : int
        The number of organisations (n) included in the correlation calculation.
    discrepancy : float
        The difference between the computed r and the manuscript's reported r=0.48.
    """
    # The aligned dataset used for correlation
    matched_data: pd.DataFrame

    # The Pearson correlation coefficient
    pearson_r: float

    # The p-value of the correlation
    p_value: float

    # The sample size (n)
    sample_size: int

    # The difference from the reported value (r - 0.48)
    discrepancy: float

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the correlation results.

        Returns
        -------
        str
            A human-readable summary of the correlation coefficient, significance,
            sample size, and alignment with the manuscript.
        """
        # Format summary string
        return (f"Validation Correlation: r={self.pearson_r:.4f} (p={self.p_value:.4e}, n={self.sample_size}). "
                f"Discrepancy from 0.48: {self.discrepancy:.4f}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 1: Align offline and online vectors
# -------------------------------------------------------------------------------------------------------------------------------

def align_metrics(
    centrality_artifact: Any, # CentralityArtifact
    pagerank_artifact: Any, # PageRankArtifact
    df_organisations: pd.DataFrame
) -> pd.DataFrame:
    """
    Aligns offline Hub Scores with online PageRank scores via organisation domains.

    Parameters
    ----------
    centrality_artifact : CentralityArtifact
        Offline metrics.
    pagerank_artifact : PageRankArtifact
        Online metrics.
    df_organisations : pd.DataFrame
        Master table linking IDs to domains.

    Returns
    -------
    pd.DataFrame
        Table with columns ['hub_score', 'pagerank'], indexed by canonical_org_id.
    """
    # 1. Get Hub Scores (Offline)
    # Indexed by canonical_org_id
    hubs = centrality_artifact.centrality_table[['hub_score_hits']]

    # 2. Get PageRank (Online)
    # Indexed by website_domain_normalized
    pr = pagerank_artifact.pagerank_scores[['pagerank']]

    # 3. Get Mapping (ID -> Domain)
    # We need canonical_org_id -> website_domain_normalized
    # Use the cleaned organisation table
    mapping = (
        df_organisations
        .dropna(subset=['canonical_org_id', 'website_domain_normalized'])
        .drop_duplicates('canonical_org_id') # One domain per org
        .set_index('canonical_org_id')[['website_domain_normalized']]
    )

    # 4. Join
    # Start with Hubs (our population of interest)
    # Join Domain
    aligned = hubs.join(mapping, how='inner')

    # Join PageRank on Domain
    # We reset index to join on column, then restore
    aligned = aligned.reset_index().merge(
        pr,
        left_on='website_domain_normalized',
        right_index=True,
        how='inner'
    ).set_index('canonical_org_id')

    logger.info(f"Metric Alignment: Matched {len(aligned)} organisations with both Hub Score and PageRank.")

    return aligned


# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 2: Compute Pearson correlation coefficient
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_pearson_correlation(
    aligned_data: pd.DataFrame
) -> Tuple[float, float]:
    """
    Computes Pearson r between Hub Score and PageRank.

    Parameters
    ----------
    aligned_data : pd.DataFrame
        Table with 'hub_score_hits' and 'pagerank'.

    Returns
    -------
    Tuple[float, float]
        Pearson r and p-value.
    """
    x = aligned_data['hub_score_hits'].values
    y = aligned_data['pagerank'].values

    if len(x) < 2:
        logger.warning("Correlation: Insufficient data (<2 points).")
        return 0.0, 1.0

    # Check variance
    if np.std(x) == 0 or np.std(y) == 0:
        logger.warning("Correlation: Zero variance in one or both vectors.")
        return 0.0, 1.0

    r, p = stats.pearsonr(x, y)

    logger.info(f"Correlation: r={r:.4f}, p={p:.4e}")

    return r, p


# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 3: Validate against manuscript
# -------------------------------------------------------------------------------------------------------------------------------

def package_correlation_artifact(
    aligned_data: pd.DataFrame,
    r: float,
    p: float
) -> CorrelationArtifact:
    """
    Packages the correlation results.

    Parameters
    ----------
    aligned_data : pd.DataFrame
        Data used.
    r : float
        Correlation.
    p : float
        P-value.

    Returns
    -------
    CorrelationArtifact
        Final artifact.
    """
    reported_r = 0.48
    discrepancy = r - reported_r

    artifact = CorrelationArtifact(
        matched_data=aligned_data,
        pearson_r=r,
        p_value=p,
        sample_size=len(aligned_data),
        discrepancy=discrepancy
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_validation_correlation(
    centrality_artifact: Any, # CentralityArtifact
    pagerank_artifact: Any, # PageRankArtifact
    df_organisations: pd.DataFrame
) -> CorrelationArtifact:
    """
    Orchestrator for Task 26. Validates offline centrality against online PageRank.

    Parameters
    ----------
    centrality_artifact : CentralityArtifact
        Offline scores.
    pagerank_artifact : PageRankArtifact
        Online scores.
    df_organisations : pd.DataFrame
        Master table.

    Returns
    -------
    CorrelationArtifact
        The validation results.
    """
    logger.info("Starting Task 26: Correlation Validation...")

    # Step 1: Align
    aligned = align_metrics(centrality_artifact, pagerank_artifact, df_organisations)

    # Step 2: Compute
    r, p = calculate_pearson_correlation(aligned)

    # Step 3: Package
    artifact = package_correlation_artifact(aligned, r, p)

    logger.info("Task 26 Completed Successfully.")

    return artifact


In [None]:
# Task 27 — Create end-to-end orchestrator function

# ==============================================================================
# Task 27: Create end-to-end orchestrator function
# ==============================================================================

@dataclass
class PipelineResults:
    """
    Container for all pipeline artifacts, execution metadata, and persistence paths.

    This dataclass serves as the final output of the orchestrator. It holds references
    to the in-memory artifacts generated by each task, as well as the file paths
    where these artifacts have been persisted. This dual-reference system supports
    both immediate downstream usage (in-memory) and long-term auditability (disk).
    It encapsulates the entire state of a pipeline run, allowing for post-hoc
    inspection of any stage's output.

    Attributes
    ----------
    run_id : str
        A unique identifier for the pipeline execution (e.g., timestamp-based).
    start_time : str
        The timestamp when the pipeline execution began.
    end_time : Optional[str], default=None
        The timestamp when the pipeline execution completed (or failed).
    success : bool, default=False
        A flag indicating whether the pipeline completed all tasks successfully.
    error_message : Optional[str], default=None
        The exception message and stack trace if the pipeline failed.
    artifacts : Dict[str, Any], default=dict
        A dictionary mapping artifact names (e.g., "cleansed_transactions",
        "bipartite_graph") to their in-memory Python objects.
    artifact_paths : Dict[str, Path], default=dict
        A dictionary mapping artifact names to the file paths where they were
        persisted on disk.
    """
    # Execution Metadata
    run_id: str
    start_time: str
    end_time: Optional[str] = None
    success: bool = False
    error_message: Optional[str] = None

    # Artifact Storage (In-Memory)
    artifacts: Dict[str, Any] = field(default_factory=dict)

    # Persistence Paths (On-Disk)
    artifact_paths: Dict[str, Path] = field(default_factory=dict)

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the pipeline results.

        Returns
        -------
        str
            A human-readable summary of the run status and artifact count.
        """
        status = "Success" if self.success else f"Failed: {self.error_message}"
        return f"Pipeline Results (Run {self.run_id}): {status}. {len(self.artifacts)} artifacts generated."

class ArtifactManager:
    """
    Manages the persistence of pipeline artifacts to a structured directory.

    This class handles the I/O operations required to save DataFrames and generic
    Python objects to disk in a reproducible manner. It enforces a directory
    structure keyed by the run ID, ensuring that outputs from different executions
    do not overwrite each other. It abstracts the serialization logic (Parquet for
    tables, Pickle for objects), simplifying the orchestrator's responsibility.

    Attributes
    ----------
    base_path : Path
        The root directory for the current run's artifacts.
    """
    def __init__(self, base_dir: str, run_id: str):
        """
        Initializes the ArtifactManager.

        Parameters
        ----------
        base_dir : str
            The parent directory for all pipeline outputs.
        run_id : str
            The unique identifier for the current run.
        """
        self.base_path = Path(base_dir) / run_id
        self.base_path.mkdir(parents=True, exist_ok=True)

    def save_dataframe(self, df: pd.DataFrame, name: str, subdir: str = "") -> Path:
        """
        Saves a pandas DataFrame to a Parquet file.

        Parameters
        ----------
        df : pd.DataFrame
            The DataFrame to persist.
        name : str
            The base filename (without extension).
        subdir : str, optional
            A subdirectory within the run folder (e.g., "01_cleansing").

        Returns
        -------
        Path
            The full path to the saved file.
        """
        dir_path = self.base_path / subdir
        dir_path.mkdir(exist_ok=True)
        file_path = dir_path / f"{name}.parquet"
        df.to_parquet(file_path, index=True)
        return file_path

    def save_object(self, obj: Any, name: str, subdir: str = "") -> Path:
        """
        Saves a generic Python object (dataclass, dict, etc.) to a Pickle file.

        Parameters
        ----------
        obj : Any
            The object to persist.
        name : str
            The base filename (without extension).
        subdir : str, optional
            A subdirectory within the run folder.

        Returns
        -------
        Path
            The full path to the saved file.
        """
        dir_path = self.base_path / subdir
        dir_path.mkdir(exist_ok=True)
        file_path = dir_path / f"{name}.pkl"
        with open(file_path, "wb") as f:
            pickle.dump(obj, f)
        return file_path

def run_global_aid_pipeline(
    df_transactions_raw: pd.DataFrame,
    df_activities_raw: pd.DataFrame,
    df_organisations_raw: pd.DataFrame,
    df_web_links_raw: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str = "./pipeline_output"
) -> PipelineResults:
    """
    Executes the complete end-to-end Global Aid Network analysis pipeline.

    This orchestrator manages the data lifecycle from raw ingestion through
    cleansing, graph construction, topological analysis, and external validation.
    It enforces the strict dependency order required by the research methodology
    and ensures that all intermediate and final artifacts are persisted for
    reproducibility.

    Parameters
    ----------
    df_transactions_raw : pd.DataFrame
        Raw IATI transaction data.
    df_activities_raw : pd.DataFrame
        Raw IATI activity metadata.
    df_organisations_raw : pd.DataFrame
        Raw organisation master list.
    df_web_links_raw : pd.DataFrame
        Raw web crawl data for validation.
    config : Dict[str, Any]
        Master configuration dictionary defining scope, parameters, and assumptions.
    output_dir : str
        Root directory for artifact persistence.

    Returns
    -------
    PipelineResults
        A container object holding all computed artifacts and execution status.
    """
    run_id = f"run_{int(time.time())}"
    results = PipelineResults(run_id=run_id, start_time=time.ctime())
    manager = ArtifactManager(output_dir, run_id)

    logger.info(f"=== Starting Global Aid Pipeline (Run ID: {run_id}) ===")

    try:
        # ---------------------------------------------------------------------
        # Phase 1: Validation & Setup
        # ---------------------------------------------------------------------
        logger.info("--- Phase 1: Validation & Setup ---")
        input_data = {
            "df_transactions_raw": df_transactions_raw,
            "df_activities_raw": df_activities_raw,
            "df_organisations_raw": df_organisations_raw,
            "df_web_links_raw": df_web_links_raw
        }

        # Task 1: Config Validation
        validate_study_configuration(config, input_data)

        # Task 2: Data Quality
        dq_report = validate_data_quality(df_transactions_raw, df_activities_raw, df_organisations_raw)
        results.artifacts["dq_report"] = dq_report
        results.artifact_paths["dq_report"] = manager.save_object(dq_report, "dq_report", "01_validation")

        # Task 3: Reproducibility Context
        repro_context = establish_reproducibility_context(config, input_data)
        results.artifacts["reproducibility_context"] = repro_context
        results.artifact_paths["reproducibility_context"] = manager.save_object(repro_context, "repro_context", "01_validation")

        sort_policies = repro_context.sort_policies

        # ---------------------------------------------------------------------
        # Phase 2: Cleansing
        # ---------------------------------------------------------------------
        logger.info("--- Phase 2: Cleansing ---")

        # Task 4: Transaction Temporal/Value
        tx_temp, tx_excl_1 = cleanse_transactions_temporal_value(df_transactions_raw, config)
        results.artifacts["transaction_exclusions_1"] = tx_excl_1

        # Task 5: Transaction Endpoints/Dedup
        tx_clean, tx_excl_2 = cleanse_transactions_endpoints_dedup(tx_temp, sort_policies["df_transactions_raw"])
        results.artifacts["cleansed_transactions"] = tx_clean
        results.artifacts["transaction_exclusions_2"] = tx_excl_2
        results.artifact_paths["cleansed_transactions"] = manager.save_dataframe(tx_clean, "transactions_clean", "02_cleansing")

        # Task 6: Activity Normalization
        act_backbone, act_ctry, act_sect, act_excl = cleanse_activities_normalization(
            df_activities_raw, sort_policies["df_activities_raw"]
        )
        results.artifacts["activity_exclusions"] = act_excl

        # Task 7: Instrument Normalization
        act_final, inst_excl, act_cov = cleanse_activities_instruments_coverage(act_backbone, act_ctry, act_sect)
        results.artifacts["activity_backbone"] = act_final
        results.artifacts["activity_countries"] = act_ctry
        results.artifacts["activity_sectors"] = act_sect
        results.artifacts["activity_coverage"] = act_cov
        results.artifact_paths["activity_backbone"] = manager.save_dataframe(act_final, "activities_clean", "02_cleansing")

        # Task 8: Organisation Cleansing
        org_clean, org_excl = cleanse_organisations_substrate(df_organisations_raw)
        results.artifacts["cleansed_organisations"] = org_clean
        results.artifacts["organisation_exclusions"] = org_excl
        results.artifact_paths["cleansed_organisations"] = manager.save_dataframe(org_clean, "organisations_clean", "02_cleansing")

        # ---------------------------------------------------------------------
        # Phase 3: Integration & ER
        # ---------------------------------------------------------------------
        logger.info("--- Phase 3: Integration & ER ---")

        # Task 9: Context Joins
        tx_inst, contexts, ctx_cov = join_transactions_to_contexts(
            tx_clean, act_final, act_ctry, act_sect
        )
        results.artifacts["tx_instruments"] = tx_inst
        results.artifacts["unified_contexts"] = contexts
        results.artifacts["context_coverage"] = ctx_cov
        results.artifact_paths["unified_contexts"] = manager.save_dataframe(contexts, "contexts", "03_integration")

        # Task 10: Entity Resolution
        org_mapped, er_artifact = construct_canonical_mapping(org_clean, config)
        results.artifacts["canonical_mapping"] = er_artifact
        results.artifacts["organisations_mapped"] = org_mapped
        results.artifact_paths["canonical_mapping"] = manager.save_object(er_artifact, "er_artifact", "03_integration")

        # ---------------------------------------------------------------------
        # Phase 4: Descriptive Analysis
        # ---------------------------------------------------------------------
        logger.info("--- Phase 4: Descriptive Analysis ---")

        # Task 11: Geo Density
        # Filter contexts for countries
        tx_countries = contexts[contexts['context_id'].str.startswith('COUNTRY:')].copy()
        tx_countries['recipient_country_code'] = tx_countries['context_id'].str.replace('COUNTRY:', '')

        geo_density = compute_geographic_density(tx_clean, tx_countries)
        results.artifacts["geo_density"] = geo_density
        results.artifact_paths["geo_density"] = manager.save_object(geo_density, "geo_density", "04_descriptive")

        # Task 12: Instrument Evolution
        inst_evolution = compute_instrument_evolution(tx_inst)
        results.artifacts["instrument_evolution"] = inst_evolution
        results.artifact_paths["instrument_evolution"] = manager.save_object(inst_evolution, "instrument_evolution", "04_descriptive")

        # ---------------------------------------------------------------------
        # Phase 5: Network Construction
        # ---------------------------------------------------------------------
        logger.info("--- Phase 5: Network Construction ---")

        # Task 13: Bipartite Graph
        bipartite_graph = construct_bipartite_graph(tx_clean, org_mapped)
        results.artifacts["bipartite_graph"] = bipartite_graph
        results.artifact_paths["bipartite_graph"] = manager.save_object(bipartite_graph, "bipartite_graph", "05_networks")

        # Task 14: Node Sizes
        # Re-map transactions using the logic from Task 13 (exposed via helper or re-run)
        # We use the helper function directly
        df_tx_mapped, _, _ = prepare_bipartite_nodes(tx_clean, org_mapped)
        node_sizes = compute_node_sizes(df_tx_mapped, org_mapped)
        results.artifacts["node_sizes"] = node_sizes
        results.artifact_paths["node_sizes"] = manager.save_object(node_sizes, "node_sizes", "05_networks")

        # Task 15: Projection
        projection_graph = construct_co_occurrence_projection(contexts, df_tx_mapped)
        results.artifacts["projection_graph"] = projection_graph
        results.artifact_paths["projection_graph"] = manager.save_object(projection_graph, "projection_graph", "05_networks")

        # ---------------------------------------------------------------------
        # Phase 6: Topology & Embeddings
        # ---------------------------------------------------------------------
        logger.info("--- Phase 6: Topology & Embeddings ---")

        # Task 16: Node2Vec
        embeddings = learn_node_embeddings(bipartite_graph, config)
        results.artifacts["embeddings"] = embeddings
        results.artifact_paths["embeddings"] = manager.save_object(embeddings, "embeddings", "06_topology")

        # Task 17: UMAP
        umap_proj = project_embeddings_umap(embeddings, config)
        results.artifacts["umap_projection"] = umap_proj
        results.artifact_paths["umap_projection"] = manager.save_object(umap_proj, "umap_projection", "06_topology")

        # ---------------------------------------------------------------------
        # Phase 7: Centrality & Ranking
        # ---------------------------------------------------------------------
        logger.info("--- Phase 7: Centrality & Ranking ---")

        # Task 18: HITS
        hits_scores = compute_hits_centrality(bipartite_graph)
        results.artifacts["hits_scores"] = hits_scores
        results.artifact_paths["hits_scores"] = manager.save_object(hits_scores, "hits_scores", "07_centrality")

        # Task 19: Centrality
        centrality_metrics = compute_network_centrality(projection_graph, hits_scores)
        results.artifacts["centrality_metrics"] = centrality_metrics
        results.artifact_paths["centrality_metrics"] = manager.save_object(centrality_metrics, "centrality_metrics", "07_centrality")

        # Task 20: Solar System
        solar_system = construct_solar_system(centrality_metrics, node_sizes)
        results.artifacts["solar_system"] = solar_system
        results.artifact_paths["solar_system"] = manager.save_object(solar_system, "solar_system", "07_centrality")

        # ---------------------------------------------------------------------
        # Phase 8: Analysis & Validation
        # ---------------------------------------------------------------------
        logger.info("--- Phase 8: Analysis & Validation ---")

        # Task 21: Subgroups
        subgroup_stats = compute_subgroup_statistics(org_mapped, solar_system)
        results.artifacts["subgroup_stats"] = subgroup_stats
        results.artifact_paths["subgroup_stats"] = manager.save_object(subgroup_stats, "subgroup_stats", "08_analysis")

        # Task 22: Broker Comparison
        broker_comparison = compare_broker_betweenness(
            centrality_metrics, subgroup_stats, solar_system
        )
        results.artifacts["broker_comparison"] = broker_comparison
        results.artifact_paths["broker_comparison"] = manager.save_object(broker_comparison, "broker_comparison", "08_analysis")

        # Task 23: Hewlett Network
        hewlett_network = characterize_hewlett_network(
            tx_clean, subgroup_stats, results.artifacts["canonical_mapping"], solar_system
        )
        results.artifacts["hewlett_network"] = hewlett_network
        results.artifact_paths["hewlett_network"] = manager.save_object(hewlett_network, "hewlett_network", "08_analysis")

        # Task 24: Web Graph
        web_graph = construct_web_graph(df_web_links_raw, org_mapped)
        results.artifacts["web_graph"] = web_graph
        results.artifact_paths["web_graph"] = manager.save_object(web_graph, "web_graph", "09_validation")

        # Task 25: PageRank
        pagerank = compute_website_pagerank(web_graph, config)
        results.artifacts["pagerank"] = pagerank
        results.artifact_paths["pagerank"] = manager.save_object(pagerank, "pagerank", "09_validation")

        # Task 26: Correlation
        correlation = compute_validation_correlation(
            centrality_metrics, pagerank, org_mapped
        )
        results.artifacts["correlation"] = correlation
        results.artifact_paths["correlation"] = manager.save_object(correlation, "correlation", "09_validation")

        results.success = True
        results.end_time = time.ctime()
        logger.info("=== Pipeline Completed Successfully ===")

    except Exception as e:
        results.success = False
        results.end_time = time.ctime()
        results.error_message = str(e)
        logger.error(f"Pipeline Failed: {e}")
        logger.error(traceback.format_exc())

        # Attempt to save partial results
        try:
            manager.save_object(results, "partial_results_dump", "failures")
        except:
            pass

    return results


In [None]:
# Task 28 — Conduct robustness analysis

# ==============================================================================
# Task 28: Conduct robustness analysis
# ==============================================================================

@dataclass
class RobustnessArtifact:
    """
    Container for robustness analysis results.

    This dataclass encapsulates the results of the sensitivity analysis, which
    systematically perturbs key methodological parameters (entity resolution
    thresholds, random walk biases, projection contexts) to assess the stability
    of the study's core findings. It includes a detailed table of metrics for
    each simulation run, as well as aggregated summary statistics quantifying
    the variance in rank ordering and external validation correlation.

    Attributes
    ----------
    sensitivity_table : pd.DataFrame
        A DataFrame where each row corresponds to a single simulation run. Columns
        include the parameter settings (tau, p, q, context), the resulting
        Spearman rank correlation with the baseline, the Pearson validation
        correlation, and the Jaccard overlap of the Top 25 core.
    rank_stability_summary : Dict[str, float]
        A dictionary of summary statistics (mean, min, std) for the Spearman
        rank correlation across all runs, indicating the overall robustness of
        the centrality rankings.
    correlation_stability_summary : Dict[str, float]
        A dictionary of summary statistics for the Pearson validation correlation,
        indicating the stability of the external validity check.
    """
    # Detailed metrics per run
    sensitivity_table: pd.DataFrame

    # Summary of rank stability
    rank_stability_summary: Dict[str, float]

    # Summary of correlation stability
    correlation_stability_summary: Dict[str, float]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the robustness analysis.

        Returns
        -------
        str
            A human-readable summary of the number of scenarios and mean stability.
        """
        # Format summary string
        return (f"Robustness: Analyzed {len(self.sensitivity_table)} scenarios. "
                f"Mean Rank Correlation: {self.rank_stability_summary['mean_spearman']:.4f}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 1: Define robustness perturbation axes
# -------------------------------------------------------------------------------------------------------------------------------

def generate_perturbation_grid(
    baseline_config: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """
    Generates a list of configuration dictionaries representing the robustness grid.

    Parameters
    ----------
    baseline_config : Dict[str, Any]
        The baseline configuration.

    Returns
    -------
    List[Dict[str, Any]]
        List of perturbed configurations.
    """
    # Define axes
    er_thresholds = [0.80, 0.85, 0.90]
    p_values = [0.5, 1.0, 2.0]
    q_values = [0.5, 1.0, 2.0]
    context_modes = ["both", "country_only", "sector_only"]

    grid = list(itertools.product(er_thresholds, p_values, q_values, context_modes))

    configs = []
    for i, (tau, p, q, ctx) in enumerate(grid):
        # Deep copy baseline to avoid mutation
        # (In production use copy.deepcopy, here we construct dict)
        new_config = baseline_config.copy()

        # Update ER
        if "ENTITY_RESOLUTION" not in new_config: new_config["ENTITY_RESOLUTION"] = {}
        if "fuzzy_matching" not in new_config["ENTITY_RESOLUTION"]: new_config["ENTITY_RESOLUTION"]["fuzzy_matching"] = {}
        new_config["ENTITY_RESOLUTION"]["fuzzy_matching"]["threshold"] = tau

        # Update Node2Vec
        if "NODE2VEC" not in new_config: new_config["NODE2VEC"] = {}
        new_config["NODE2VEC"]["p"] = p
        new_config["NODE2VEC"]["q"] = q

        # Update Projection Contexts
        if "NETWORKS" not in new_config: new_config["NETWORKS"] = {}
        if "projection" not in new_config["NETWORKS"]: new_config["NETWORKS"]["projection"] = {}
        if "context_definition" not in new_config["NETWORKS"]["projection"]: new_config["NETWORKS"]["projection"]["context_definition"] = {}

        if ctx == "both":
            new_config["NETWORKS"]["projection"]["context_definition"]["use_country"] = True
            new_config["NETWORKS"]["projection"]["context_definition"]["use_sector"] = True
        elif ctx == "country_only":
            new_config["NETWORKS"]["projection"]["context_definition"]["use_country"] = True
            new_config["NETWORKS"]["projection"]["context_definition"]["use_sector"] = False
        elif ctx == "sector_only":
            new_config["NETWORKS"]["projection"]["context_definition"]["use_country"] = False
            new_config["NETWORKS"]["projection"]["context_definition"]["use_sector"] = True

        # Tag run
        new_config["run_tag"] = f"robustness_{i}_tau{tau}_p{p}_q{q}_{ctx}"
        configs.append(new_config)

    logger.info(f"Robustness Grid: Generated {len(configs)} configurations.")
    return configs


# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 2: Execute orchestrator across perturbation grid
# -------------------------------------------------------------------------------------------------------------------------------

def execute_robustness_batch(
    configs: List[Dict[str, Any]],
    pipeline_func: Any, # run_global_aid_pipeline
    data_inputs: Dict[str, pd.DataFrame]
) -> List[Any]: # List[PipelineResults]
    """
    Executes the pipeline for each configuration in the grid.

    Parameters
    ----------
    configs : List[Dict[str, Any]]
        List of configs.
    pipeline_func : Callable
        The orchestrator function.
    data_inputs : Dict[str, pd.DataFrame]
        Raw data.

    Returns
    -------
    List[PipelineResults]
        Results for each run.
    """
    results_list = []

    for cfg in configs:
        tag = cfg.get("run_tag", "unknown")
        logger.info(f"Robustness Run: Starting {tag}...")

        try:
            # Execute pipeline
            # We pass output_dir specific to run to avoid overwrites
            res = pipeline_func(
                data_inputs["df_transactions_raw"],
                data_inputs["df_activities_raw"],
                data_inputs["df_organisations_raw"],
                data_inputs["df_web_links_raw"],
                cfg,
                output_dir=f"./output/{tag}"
            )
            results_list.append(res)
            logger.info(f"Robustness Run: {tag} Success={res.success}")

        except Exception as e:
            logger.error(f"Robustness Run: {tag} Failed: {e}")
            # Append None or failed result object
            results_list.append(None)

    return results_list


# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 3: Synthesize robustness report
# -------------------------------------------------------------------------------------------------------------------------------

def synthesize_robustness_results(
    baseline_results: Any, # PipelineResults
    robustness_results: List[Any] # List[PipelineResults]
) -> RobustnessArtifact:
    """
    Synthesizes metrics from robustness runs, comparing them to the baseline.

    Parameters
    ----------
    baseline_results : PipelineResults
        The baseline run.
    robustness_results : List[PipelineResults]
        The batch runs.

    Returns
    -------
    RobustnessArtifact
        Summary analysis.
    """
    # Extract Baseline Metrics
    base_ranks = baseline_results.solar_system.ranked_nodes[['rank']].copy()
    base_r = baseline_results.correlation.pearson_r

    metrics = []

    for res in robustness_results:
        if not res or not res.success:
            continue

        # Extract Run Metrics
        # 1. Rank Correlation
        run_ranks = res.solar_system.ranked_nodes[['rank']]

        # Align on index (canonical_id)
        # Inner join to compare common nodes
        common = base_ranks.join(run_ranks, lsuffix='_base', rsuffix='_run', how='inner')

        if len(common) > 10:
            spearman_rho, _ = stats.spearmanr(common['rank_base'], common['rank_run'])
        else:
            spearman_rho = np.nan

        # 2. Validation Correlation
        run_r = res.correlation.pearson_r

        # 3. Top 25 Overlap (Jaccard)
        base_top25 = set(base_ranks[base_ranks['rank_base'] <= 25].index)
        run_top25 = set(run_ranks[run_ranks['rank'] <= 25].index)
        overlap = len(base_top25.intersection(run_top25))
        jaccard = overlap / len(base_top25.union(run_top25))

        metrics.append({
            "run_id": res.run_id,
            "spearman_rank_corr": spearman_rho,
            "pearson_validation_r": run_r,
            "top25_jaccard": jaccard
        })

    df_metrics = pd.DataFrame(metrics)

    # Summarize
    rank_stats = {
        "mean_spearman": df_metrics['spearman_rank_corr'].mean(),
        "min_spearman": df_metrics['spearman_rank_corr'].min(),
        "std_spearman": df_metrics['spearman_rank_corr'].std()
    }

    corr_stats = {
        "mean_pearson": df_metrics['pearson_validation_r'].mean(),
        "min_pearson": df_metrics['pearson_validation_r'].min(),
        "std_pearson": df_metrics['pearson_validation_r'].std()
    }

    artifact = RobustnessArtifact(
        sensitivity_table=df_metrics,
        rank_stability_summary=rank_stats,
        correlation_stability_summary=corr_stats
    )

    logger.info(str(artifact))

    return artifact


# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def conduct_robustness_analysis(
    baseline_results: Any, # PipelineResults
    baseline_config: Dict[str, Any],
    pipeline_func: Any,
    data_inputs: Dict[str, pd.DataFrame]
) -> RobustnessArtifact:
    """
    Orchestrator for Task 28. Runs the robustness analysis.

    Parameters
    ----------
    baseline_results : PipelineResults
        Baseline output.
    baseline_config : Dict[str, Any]
        Baseline config.
    pipeline_func : Callable
        Orchestrator.
    data_inputs : Dict[str, pd.DataFrame]
        Raw data.

    Returns
    -------
    RobustnessArtifact
        Analysis.
    """
    logger.info("Starting Task 28: Robustness Analysis...")

    # Step 1: Grid
    configs = generate_perturbation_grid(baseline_config)

    # Step 2: Execute
    run_results = execute_robustness_batch(configs, pipeline_func, data_inputs)

    # Step 3: Synthesize
    artifact = synthesize_robustness_results(baseline_results, run_results)

    logger.info("Task 28 Completed Successfully.")

    return artifact


In [None]:
# Task 29 — Package reproducible outputs and provenance

# ==============================================================================
# Task 29: Package reproducible outputs and provenance
# ==============================================================================

@dataclass
class ProvenanceArtifact:
    """
    Container for the full reproducibility manifest.

    This dataclass serves as the master record for a single execution of the
    Global Aid Network analysis pipeline. It aggregates all necessary information
    required to audit, validate, and reproduce the study's findings. By linking
    the input configuration, the cryptographic hashes of all generated artifacts,
    and the high-level summary metrics, this object provides a complete lineage
    trace from raw data to final manuscript figures.

    Attributes
    ----------
    run_id : str
        The unique identifier for this pipeline execution (e.g., "run_167889234"),
        used to namespace the output directory and log files.
    timestamp : str
        The ISO 8601 formatted timestamp indicating when the packaging process
        was completed.
    config : Dict[str, Any]
        The complete configuration dictionary used to drive the pipeline, including
        all parameters for entity resolution, graph construction, and dimensionality
        reduction. This ensures that the exact parameter set is preserved.
    artifact_manifest : Dict[str, str]
        A dictionary mapping relative file paths of persisted artifacts to their
        SHA-256 checksums. This allows for the verification of file integrity
        and ensures that the outputs have not been tampered with post-generation.
    metadata_summary : Dict[str, Any]
        A consolidated dictionary of key metrics extracted from all pipeline stages
        (e.g., data quality rates, HITS convergence stats, validation correlations).
        This provides a high-level overview of the run's analytical performance.
    """
    # Unique execution identifier
    run_id: str

    # Execution timestamp (ISO 8601)
    timestamp: str

    # Input configuration parameters
    config: Dict[str, Any]

    # Map of artifact paths to SHA-256 hashes
    artifact_manifest: Dict[str, str]

    # Aggregated metrics from all pipeline stages
    metadata_summary: Dict[str, Any]

    def __str__(self) -> str:
        """
        Returns a formatted string summary of the provenance artifact.

        Returns
        -------
        str
            A human-readable summary including the run ID and the count of
            tracked files in the manifest.
        """
        # Format summary string for logging
        return f"Provenance: Run {self.run_id}, {len(self.artifact_manifest)} files tracked."


# -------------------------------------------------------------------------------------------------------------------------------
# Task 29, Step 1: Persist all intermediate and final artifacts
# -------------------------------------------------------------------------------------------------------------------------------

def generate_file_hash(file_path: Path) -> str:
    """
    Computes the SHA-256 cryptographic hash of a file in a memory-efficient manner.

    This utility function is essential for generating the artifact manifest,
    ensuring the integrity and reproducibility of the pipeline's outputs. By
    producing a unique checksum for every persisted file, we allow downstream
    consumers to verify that the data has not been corrupted or altered since
    generation. The implementation uses buffered reading to handle potentially
    large artifacts (e.g., the 10-million-row transaction table) without
    exhausting system memory.

    Parameters
    ----------
    file_path : Path
        The filesystem path to the file to be hashed.

    Returns
    -------
    str
        The hexadecimal representation of the SHA-256 hash.

    Raises
    ------
    FileNotFoundError
        If the specified `file_path` does not exist.
    IsADirectoryError
        If `file_path` points to a directory instead of a file.
    PermissionError
        If the process lacks read permissions for the file.
    """
    # Input Validation: Ensure the path exists and is a file
    if not file_path.exists():
        raise FileNotFoundError(f"Cannot hash file: Path '{file_path}' does not exist.")
    if not file_path.is_file():
        raise IsADirectoryError(f"Cannot hash file: Path '{file_path}' is a directory.")

    # Initialize the SHA-256 hash object from the hashlib library
    sha256_hash = hashlib.sha256()

    try:
        # Open the file in binary read mode ('rb') to handle all file types (text, parquet, pickle)
        with open(file_path, "rb") as f:
            # Read the file in fixed-size chunks (4096 bytes) to maintain low memory footprint
            # iter() with a sentinel (b"") creates an iterator that stops at EOF
            for byte_block in iter(lambda: f.read(4096), b""):
                # Update the hash object with the current chunk of data
                sha256_hash.update(byte_block)

    except OSError as e:
        # Handle potential I/O errors during the read process
        raise OSError(f"Error reading file '{file_path}' for hashing: {e}") from e

    # Return the final digest as a hexadecimal string
    return sha256_hash.hexdigest()

def create_artifact_manifest(
    pipeline_results: Any, # PipelineResults
    output_dir: Path
) -> Dict[str, str]:
    """
    Generates a manifest of all persisted artifacts with their hashes.

    Parameters
    ----------
    pipeline_results : PipelineResults
        The results object containing paths.
    output_dir : Path
        Root output directory.

    Returns
    -------
    Dict[str, str]
        Dictionary mapping relative file paths to SHA-256 hashes.
    """
    manifest = {}

    # Iterate over paths stored in results
    for name, path in pipeline_results.artifact_paths.items():
        if path.exists():
            # Compute hash
            file_hash = generate_file_hash(path)
            # Store relative path
            rel_path = str(path.relative_to(output_dir))
            manifest[rel_path] = file_hash

    logger.info(f"Manifest: Tracked {len(manifest)} artifacts.")
    return manifest


# -------------------------------------------------------------------------------------------------------------------------------
# Task 29, Step 2: Persist complete method metadata
# -------------------------------------------------------------------------------------------------------------------------------

def consolidate_metadata(
    pipeline_results: Any, # PipelineResults
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Aggregates metadata from all pipeline stages into a single dictionary.

    Parameters
    ----------
    pipeline_results : PipelineResults
        The results.
    config : Dict[str, Any]
        The input config.

    Returns
    -------
    Dict[str, Any]
        Consolidated metadata.
    """
    meta = {
        "run_id": pipeline_results.run_id,
        "start_time": pipeline_results.start_time,
        "end_time": pipeline_results.end_time,
        "status": "Success" if pipeline_results.success else "Failed",
        "config": config,
        "metrics": {}
    }

    # Extract specific metrics from artifacts if available
    artifacts = pipeline_results.artifacts

    if "dq_report" in artifacts:
        meta["metrics"]["data_quality"] = artifacts["dq_report"]

    if "reproducibility_context" in artifacts:
        # Extract seeds
        meta["seeds"] = artifacts["reproducibility_context"].seeds

    if "activity_coverage" in artifacts:
        cov = artifacts["activity_coverage"]
        meta["metrics"]["activity_coverage"] = {
            "total": cov.total_activities,
            "country_rate": cov.country_coverage_rate
        }

    if "context_coverage" in artifacts:
        cov = artifacts["context_coverage"]
        meta["metrics"]["transaction_coverage"] = {
            "total": cov.total_transactions,
            "country_rate": cov.transactions_with_country / cov.total_transactions if cov.total_transactions else 0
        }

    if "canonical_mapping" in artifacts:
        meta["metrics"]["entity_resolution"] = {
            "clusters": artifacts["canonical_mapping"].cluster_count,
            "method": artifacts["canonical_mapping"].resolution_method
        }

    if "hits_scores" in artifacts:
        meta["metrics"]["hits_convergence"] = artifacts["hits_scores"].convergence_meta

    if "pagerank" in artifacts:
        meta["metrics"]["pagerank_convergence"] = artifacts["pagerank"].convergence_meta

    if "correlation" in artifacts:
        corr = artifacts["correlation"]
        meta["metrics"]["validation"] = {
            "r": corr.pearson_r,
            "p": corr.p_value,
            "n": corr.sample_size
        }

    return meta


# -------------------------------------------------------------------------------------------------------------------------------
# Task 29, Step 3: Generate manuscript-facing outputs deterministically
# -------------------------------------------------------------------------------------------------------------------------------

def generate_manuscript_tables(
    pipeline_results: Any, # PipelineResults
    output_dir: Path
) -> None:
    """
    Extracts and saves the specific tables required for manuscript figures.

    Parameters
    ----------
    pipeline_results : PipelineResults
        The results.
    output_dir : Path
        Output directory.
    """
    tables_dir = output_dir / "manuscript_tables"
    tables_dir.mkdir(exist_ok=True)

    artifacts = pipeline_results.artifacts

    # Fig 1: Geo Density
    if "geo_density" in artifacts:
        df = artifacts["geo_density"].density_table
        df.to_csv(tables_dir / "fig1_geo_density.csv", index=False)

    # Fig S1: Instrument Evolution
    if "instrument_evolution" in artifacts:
        df = artifacts["instrument_evolution"].time_series_table
        df.to_csv(tables_dir / "figS1_instrument_evolution.csv", index=True)

    # Fig 2: UMAP
    if "umap_projection" in artifacts:
        df = artifacts["umap_projection"].coordinates
        # Join with org type for coloring
        if "solar_system" in artifacts:
            ranks = artifacts["solar_system"].ranked_nodes[['org_type', 'node_size']]
            df = df.join(ranks, how='left')
        df.to_csv(tables_dir / "fig2_umap_coords.csv", index=True)

    # Fig 3: Solar System
    if "solar_system" in artifacts:
        df = artifacts["solar_system"].top_100_table
        df.to_csv(tables_dir / "fig3_solar_system_top100.csv", index=True)

    # Fig S3: Broker Comparison
    if "broker_comparison" in artifacts:
        df = artifacts["broker_comparison"].comparison_table
        df.to_csv(tables_dir / "figS3_broker_comparison.csv", index=True)

    logger.info(f"Manuscript Tables: Saved to {tables_dir}")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 29, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def package_reproducible_outputs(
    pipeline_results: Any, # PipelineResults
    config: Dict[str, Any],
    output_root: str
) -> ProvenanceArtifact:
    """
    Orchestrator for Task 29. Packages all outputs and generates provenance.

    Parameters
    ----------
    pipeline_results : PipelineResults
        Pipeline output.
    config : Dict[str, Any]
        Config.
    output_root : str
        Root directory used for the run.

    Returns
    -------
    ProvenanceArtifact
        The final provenance object.
    """
    logger.info("Starting Task 29: Packaging Outputs...")

    run_dir = Path(output_root) / pipeline_results.run_id

    # Step 1: Manifest
    manifest = create_artifact_manifest(pipeline_results, Path(output_root))

    # Step 2: Metadata
    meta = consolidate_metadata(pipeline_results, config)

    # Save metadata json
    with open(run_dir / "provenance.json", "w") as f:
        json.dump(meta, f, indent=2, default=str)

    # Step 3: Manuscript Tables
    generate_manuscript_tables(pipeline_results, run_dir)

    artifact = ProvenanceArtifact(
        run_id=pipeline_results.run_id,
        timestamp=datetime.datetime.now().isoformat(),
        config=config,
        artifact_manifest=manifest,
        metadata_summary=meta
    )

    logger.info("Task 29 Completed Successfully.")

    return artifact


In [None]:
# Top-Level Orchestrator Callable

# ==============================================================================
# Master Orchestrator: Global Aid Network Analysis
# ==============================================================================

@dataclass
class MasterWorkflowResults:
    """
    Container for the outputs of the entire research workflow.

    This dataclass aggregates the results from the three primary phases of the
    study: the baseline analysis pipeline, the robustness/sensitivity analysis,
    and the final provenance packaging. It serves as the single return object
    for the master orchestrator, providing access to every artifact generated
    during the execution.

    Attributes
    ----------
    baseline_results : PipelineResults
        The complete set of artifacts and metadata from the primary (baseline)
        execution of the analysis pipeline (Task 27).
    robustness_results : RobustnessArtifact
        The summary statistics and detailed metrics from the sensitivity analysis,
        quantifying the stability of the findings under parameter perturbation (Task 28).
    provenance_artifact : ProvenanceArtifact
        The final reproducibility manifest, including cryptographic hashes of all
        persisted files and a consolidated metadata summary (Task 29).
    execution_time : float
        The total wall-clock time (in seconds) taken to execute the master workflow.
    success : bool
        Global success flag indicating if all phases completed without critical error.
    """
    baseline_results: Any # PipelineResults
    robustness_results: Any # RobustnessArtifact
    provenance_artifact: Any # ProvenanceArtifact
    execution_time: float
    success: bool = False

def execute_master_workflow(
    df_transactions_raw: pd.DataFrame,
    df_activities_raw: pd.DataFrame,
    df_organisations_raw: pd.DataFrame,
    df_web_links_raw: pd.DataFrame,
    config: Dict[str, Any],
    output_root: str = "./study_output"
) -> MasterWorkflowResults:
    """
    Executes the complete Global Aid Network research workflow.

    This master orchestrator coordinates the three high-level components of the
    study:
    1.  **Baseline Analysis (Task 27)**: Runs the end-to-end pipeline using the
        primary configuration to generate the core findings (Solar System,
        Brokerage, etc.).
    2.  **Robustness Analysis (Task 28)**: Performs a sensitivity analysis by
        perturbing key parameters (ER threshold, Node2Vec bias) and re-running
        the pipeline to assess the stability of rankings and correlations.
    3.  **Provenance Packaging (Task 29)**: Consolidates all outputs, generates
        cryptographic manifests, and produces the final manuscript-ready tables.

    Parameters
    ----------
    df_transactions_raw : pd.DataFrame
        Raw IATI transaction data.
    df_activities_raw : pd.DataFrame
        Raw IATI activity metadata.
    df_organisations_raw : pd.DataFrame
        Raw organisation master list.
    df_web_links_raw : pd.DataFrame
        Raw web crawl data for validation.
    config : Dict[str, Any]
        Master configuration dictionary.
    output_root : str
        Root directory for all study outputs.

    Returns
    -------
    MasterWorkflowResults
        A container holding the baseline results, robustness analysis, and
        provenance metadata.
    """
    start_time = time.time()
    logger.info("=== Initiating Master Workflow ===")

    # Initialize container with None
    results = MasterWorkflowResults(
        baseline_results=None,
        robustness_results=None,
        provenance_artifact=None,
        execution_time=0.0,
        success=False
    )

    try:
        # ---------------------------------------------------------------------
        # Phase 1: Baseline Pipeline Execution (Task 27)
        # ---------------------------------------------------------------------
        logger.info(">>> Phase 1: Executing Baseline Pipeline...")

        # Define baseline output directory
        baseline_dir = f"{output_root}/baseline"

        baseline_res = run_global_aid_pipeline(
            df_transactions_raw,
            df_activities_raw,
            df_organisations_raw,
            df_web_links_raw,
            config,
            output_dir=baseline_dir
        )

        results.baseline_results = baseline_res

        if not baseline_res.success:
            raise RuntimeError(f"Baseline pipeline failed: {baseline_res.error_message}")

        logger.info(">>> Phase 1 Complete.")

        # ---------------------------------------------------------------------
        # Phase 2: Robustness Analysis (Task 28)
        # ---------------------------------------------------------------------
        logger.info(">>> Phase 2: Conducting Robustness Analysis...")

        # Prepare data inputs dictionary for the robustness orchestrator
        data_inputs = {
            "df_transactions_raw": df_transactions_raw,
            "df_activities_raw": df_activities_raw,
            "df_organisations_raw": df_organisations_raw,
            "df_web_links_raw": df_web_links_raw
        }

        # Execute robustness analysis
        # Note: We pass the run_global_aid_pipeline function itself to allow
        # the robustness orchestrator to invoke it for each grid point.
        robustness_res = conduct_robustness_analysis(
            baseline_res,
            config,
            run_global_aid_pipeline,
            data_inputs
        )

        results.robustness_results = robustness_res
        logger.info(">>> Phase 2 Complete.")

        # ---------------------------------------------------------------------
        # Phase 3: Provenance Packaging (Task 29)
        # ---------------------------------------------------------------------
        logger.info(">>> Phase 3: Packaging Provenance...")

        # Package outputs based on the baseline run
        # (Robustness results are typically summarized in the manuscript text/tables
        # rather than requiring full artifact tracking for every sub-run in the main manifest,
        # though the robustness artifact itself is tracked).

        provenance_res = package_reproducible_outputs(
            baseline_res,
            config,
            baseline_dir # Point to where baseline artifacts were saved
        )

        results.provenance_artifact = provenance_res
        logger.info(">>> Phase 3 Complete.")

        # Finalize
        results.success = True
        logger.info("=== Master Workflow Completed Successfully ===")

    except Exception as e:
        logger.error(f"Master Workflow Failed: {e}")
        logger.error(traceback.format_exc())
        results.success = False

    finally:
        results.execution_time = time.time() - start_time

    return results


#