# `README.md`

# Heuristic-Augmented Financial Risk Index (HAFRI): Assessing Financial Statement Risks among MCDM Techniques

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2512.04035v1-b31b1b.svg)](https://arxiv.org/abs/2512.04035v1)
[![Journal](https://img.shields.io/badge/Journal-Economics%20(econ.TH)-003366)](https://arxiv.org/abs/2512.04035v1)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/assessing_financial_statement_risks)
[![Discipline](https://img.shields.io/badge/Discipline-Financial%20Risk%20Management%20%7C%20MCDM-00529B)](https://github.com/chirindaopensource/assessing_financial_statement_risks)
[![Data Sources](https://img.shields.io/badge/Data-Damascus%20Securities%20Exchange-lightgrey)](http://www.dse.sy/)
[![Data Sources](https://img.shields.io/badge/Data-Bloomberg%20Terminal%20(Fundamentals)-lightgrey)](https://www.bloomberg.com/professional/solution/bloomberg-terminal/)
[![Data Sources](https://img.shields.io/badge/Data-Company%20Annual%20Reports-lightgrey)](https://github.com/chirindaopensource/assessing_financial_statement_risks)
[![Core Method](https://img.shields.io/badge/Method-Analytic%20Hierarchy%20Process%20(AHP)-orange)](https://github.com/chirindaopensource/assessing_financial_statement_risks)
[![Analysis](https://img.shields.io/badge/Analysis-Simple%20Additive%20Weighting%20(SAW)-red)](https://github.com/chirindaopensource/assessing_financial_statement_risks)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%230C55A5.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![PyYAML](https://img.shields.io/badge/PyYAML-gray?logo=yaml&logoColor=white)](https://pyyaml.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/assessing_financial_statement_risks`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Assessing Financial Statement Risks among MCDM Techniques"** by:

*   Marwa Abdullah
*   Revzon Oksana Anatolyevna
*   Duaa Abdullah

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous financial statement data validation and expert survey processing to hierarchical weight derivation via AHP, risk scoring via SAW, and comprehensive robustness analysis.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `execute_hafri_master_pipeline`](#key-callable-execute_hafri_master_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Abdullah et al. (2025). The core of this repository is the iPython Notebook `assessing_financial_statement_risks_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed to be a generalizable toolkit for constructing a **Heuristic-Augmented Financial Risk Index (HAFRI)**, enabling the ranking of fiscal periods by their aggregate exposure to financial risk.

The paper addresses the challenge of quantifying latent financial risk by integrating expert heuristics with objective financial ratios. This codebase operationalizes the paper's framework, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `study_configuration.yaml` file.
-   Process raw expert pairwise comparison surveys to derive consensus weights for risk criteria.
-   Implement the **Analytic Hierarchy Process (AHP)** to decompose risk into Capital Structure, Liquidity, Income, and Cash Flow domains.
-   Apply **Simple Additive Weighting (SAW)** to aggregate normalized financial ratios into a composite risk score.
-   Validate findings against the original study's results for Al-Ahliah Vegetable Oil Company (2008–2017).
-   Conduct automated robustness checks to test the sensitivity of risk rankings to methodological assumptions.
-   Automatically generate a comprehensive technical report and reproducibility package.

## Theoretical Background

The implemented methods are grounded in Multi-Criteria Decision Making (MCDM) theory and financial statement analysis.

**1. Analytic Hierarchy Process (AHP):**
AHP is used to elicit and synthesize expert judgments into a coherent set of priority weights. The problem is decomposed into a hierarchy:
-   **Goal:** Aggregate Financial Risk.
-   **Main Criteria:** Capital Structure Risk (CSR), Liquidity Risk (LR), Income Risk (IR), Cash Flow Risk (CFR).
-   **Sub-Criteria:** 34 financial ratios derived from financial statements.

Weights are derived from pairwise comparison matrices $A$ where $A_{ij}$ represents the relative importance of criterion $i$ over $j$. The principal eigenvector $w$ is approximated using the Row Geometric Mean method, and consistency is validated using the Consistency Ratio ($CR < 0.10$).

**2. Simple Additive Weighting (SAW):**
SAW aggregates the performance of alternatives (fiscal years) across multiple criteria.
-   **Normalization:** Raw ratio values $x_{ij}$ are transformed into commensurable utility scores $r_{ij} \in [0, 1]$ using Min-Max normalization.
    -   **Benefit Criteria (Max):** Higher values reduce risk (e.g., Cash Readiness).
        $$ r_{ij} = \frac{x_j^+ - x_{ij}}{x_j^+ - x_j^-} $$
    -   **Cost Criteria (Min):** Higher values increase risk (e.g., Debt/Equity).
        $$ r_{ij} = \frac{x_{ij} - x_j^-}{x_j^+ - x_j^-} $$
-   **Aggregation:** The composite risk score $V_i$ is the weighted sum of normalized scores:
    $$ V_i = \sum_{j=1}^{n} w_j r_{ij} $$


## Features

The provided iPython Notebook (`assessing_financial_statement_risks_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into 28 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters are managed in an external `study_configuration.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks the schema, content integrity, and temporal consistency of expert surveys and financial statements.
-   **Advanced MCDM Implementation:** Integrates AHP consistency checking, hierarchical weight composition, and SAW normalization with explicit directionality handling.
-   **Robustness Verification:** Includes automated sensitivity analysis scenarios (e.g., Strict vs. Relaxed Consistency thresholds) to validate the stability of risk rankings.
-   **Reproducible Artifacts:** Generates structured dictionaries and serialized files (CSV, JSON) for every intermediate result, ensuring full auditability.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Preprocessing (Tasks 1-7):** Ingests raw data, validates schemas, enforces accounting identities (e.g., Assets = Liabilities + Equity), and handles missing values/zero denominators.
2.  **AHP Analysis (Tasks 8-15):** Constructs the risk hierarchy, builds pairwise comparison matrices, computes local weights, filters inconsistent experts ($CR \ge 0.10$), and aggregates global weights.
3.  **Ratio Computation (Tasks 16-18):** Defines computational logic for 34 ratios, computes the raw decision matrix $X$, and validates it for outliers and zero variance.
4.  **SAW Analysis (Tasks 19-23):** Configures criterion directionality (Benefit/Cost), normalizes the decision matrix to risk scores $R$, applies global weights to get $V$, and computes composite risk scores $V_t$.
5.  **Ranking & Validation (Tasks 24-27):** Ranks fiscal years by aggregate risk, identifies extreme years, and cross-checks results against the published study values.
6.  **Packaging (Task 28):** Generates a technical report and serializes all outputs into a reproducibility package.

## Core Components (Notebook Structure)

The `assessing_financial_statement_risks_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 28 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `execute_hafri_master_pipeline`

The project is designed around a single, top-level user-facing interface function:

-   **`execute_hafri_master_pipeline`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, managing data flow between all 28 sub-tasks, including robustness checks and report generation.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `pyyaml`, `scipy`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/assessing_financial_statement_risks.git
    cd assessing_financial_statement_risks
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy pyyaml scipy
    ```

## Input Data Structure

The pipeline requires two primary DataFrames:
1.  **`raw_expert_survey_df`**: A log of pairwise comparisons with columns: `expert_id`, `hierarchy_level`, `criterion_i`, `criterion_j`, `saaty_scale_value`, `comparison_type`.
2.  **`raw_financial_statement_df`**: Audited financial line items with columns: `fiscal_year`, `total_assets`, `current_assets`, `net_operating_cash_flow`, etc. (27 fields total).

## Usage

The `assessing_financial_statement_risks_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `execute_hafri_master_pipeline` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load the master configuration from the YAML file.
    with open('study_configuration.yaml', 'r') as f:
        study_config = yaml.safe_load(f)
    
    # 2. Load raw datasets (Example using synthetic generator provided in the notebook)
    # In production, load from CSV/Parquet: pd.read_csv(...)
    raw_expert_survey_df = ...
    raw_financial_statement_df = ...
    
    # 3. Execute the entire replication study.
    results = execute_hafri_master_pipeline(
        raw_expert_survey_df=raw_expert_survey_df,
        raw_financial_statement_df=raw_financial_statement_df,
        study_configuration=study_config
    )
    
    # 4. Access results
    print(f"Most Risky Year: {results['baseline_results']['ranking'].index[0]}")
```

## Output Structure

The pipeline returns a master dictionary containing all analytical artifacts:
-   **`baseline_results`**: Contains `global_weights`, `decision_matrix`, `normalized_matrix`, `weighted_matrix`, `composite_scores`, `ranking`, and `comparison`.
-   **`robustness_results`**: Contains `scenario_details` and `stability_summary` (rank statistics across scenarios).
-   **`validation_results`**: Contains `weight_comparison`, `score_comparison`, and `summary_report` (discrepancy analysis).
-   **`final_package`**: Contains serialized strings for `technical_report.md`, `README.md`, and CSVs of all matrices.

## Project Structure

```
assessing_financial_statement_risks/
│
├── assessing_financial_statement_risks_draft.ipynb  # Main implementation notebook
├── study_configuration.yaml                         # Master configuration file
├── requirements.txt                                 # Python package dependencies
│
├── LICENSE                                          # MIT Project License File
└── README.md                                        # This file
```

## Customization

The pipeline is highly customizable via the `study_configuration.yaml` file. Users can modify study parameters such as:
-   **Time Horizon:** `start_year`, `end_year`.
-   **AHP Settings:** `consistency_threshold`, `saaty_scale_mapping`.
-   **Ratio Definitions:** Numerator/denominator logic in `feature_engineering_logic`.
-   **SAW Settings:** `criteria_directionality` (Benefit/Cost assignment).

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Fuzzy AHP:** Incorporating fuzzy logic to handle uncertainty in expert judgments.
-   **TOPSIS Integration:** Adding Technique for Order of Preference by Similarity to Ideal Solution as an alternative ranking method.
-   **Dynamic Weighting:** Allowing weights to evolve over time based on market conditions.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{abdullah2025assessing,
  title={Assessing Financial Statement Risks among MCDM Techniques},
  author={Abdullah, Marwa and Anatolyevna, Revzon Oksana and Abdullah, Duaa},
  journal={arXiv preprint arXiv:2512.04035v1},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). Heuristic-Augmented Financial Risk Index (HAFRI): An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/assessing_financial_statement_risks
```

## Acknowledgments

-   Credit to **Marwa Abdullah et al.** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, and SciPy**.

--

*This README was generated based on the structure and content of the `assessing_financial_statement_risks_draft.ipynb` notebook and follows best practices for research software documentation.*


# Paper

Title: "*Assessing Financial Statement Risks among MCDM Techniques*"

Authors: Marwa Abdullah, Revzon Oksana Anatolyevna, Duaa Abdullah

E-Journal Submission Date: 3 December 2025

Link: https://arxiv.org/abs/2512.04035v1

Revised Abstract:

This paper introduces a Heuristic-Augmented Financial Risk Index (HAFRI) constructed through the systematic integration of two complementary multi-criteria decision-making (MCDM) techniques: the Analytic Hierarchy Process (AHP) for weight elicitation and Simple Additive Weighting (SAW) for multi-attribute utility aggregation. The proposed framework addresses the problem of ranking discrete fiscal periods by their aggregate exposure to financial risk, where risk is conceptualized as a latent construct manifested through observable financial ratios.

**Methodological Architecture.** The risk assessment hierarchy comprises three levels: (1) a singular goal node representing aggregate financial risk; (2) four main criteria corresponding to distinct risk domains—Capital Structure Risk (CSR), Liquidity Risk (LR), Income Risk (IR), and Cash Flow Risk (CFR); and (3) 34 sub-criteria operationalized as financial ratios computed from the Statement of Financial Position, Income Statement, and Statement of Cash Flows. Criterion importance weights are derived from pairwise comparison matrices elicited from a panel of five domain experts using Saaty's 1–9 psychometric scale. Each matrix is validated against the consistency ratio threshold (CR < 0.10), with weights computed via the row geometric mean approximation to the principal eigenvector. Global sub-criterion weights are obtained through hierarchical weight cascade (multiplication of level-specific weights). The SAW phase transforms raw financial ratio values into commensurable utility scores via Min-Max normalization, with explicit encoding of criterion directionality: benefit criteria (higher values reduce risk) and cost criteria (higher values increase risk). Weighted normalized scores are aggregated via linear summation to produce a cardinal risk index for each fiscal period.

**Empirical Application.** The framework is applied to Al-Ahliah Vegetable Oil Company, a publicly listed firm on the Damascus Securities Exchange, over the ten-year period 2008–2017. The ten fiscal years constitute the decision alternatives. Expert consensus assigns the highest aggregate weight to Cash Flow Risk (45.9%), followed by Liquidity Risk (24.4%), Income Risk (15.2%), and Capital Structure Risk (14.6%). At the sub-criterion level, the Cash Readiness Ratio (LR3) emerges as the most critical indicator (13.3% global weight), followed by the Quick Ratio (LR2, 9.4%) and Return on Equity (IR6, 4.5%). The integrated AHP-SAW analysis identifies fiscal year 2016 as exhibiting the highest aggregate risk exposure (relative score: 13.3%), attributable to elevated liquidity ratios signaling increased return volatility. Conversely, fiscal year 2009 demonstrates the lowest risk exposure (relative score: 8.0%).

**Contributions.** The study contributes a replicable, modular, and customizable decision-support methodology for financial risk assessment that (i) synthesizes expert judgment with empirical financial data, (ii) provides full methodological transparency through explicit weight derivation and consistency validation, and (iii) enables comparative risk analysis across temporal or cross-sectional units.


# Summary

### **Bibliographic Context and Research Objective**
The paper proposes a quantitative framework for evaluating financial risk within an industrial firm. Unlike traditional econometric models that might rely on stochastic processes (e.g., Brownian motion for asset pricing) or regression analysis, this study employs **Operations Research** techniques.

*   **Objective:** To quantify the relative importance of various financial risks and rank specific fiscal years based on risk exposure.
*   **Subject:** Al-Ahlia Vegetable Oil Company (Syria).
*   **Time Series:** 2008–2017.
*   **Core Methodology:** A hybrid integration of the **Analytic Hierarchy Process (AHP)** for criteria weighting and **Simple Additive Weighting (SAW)** for alternative ranking.

### **Theoretical Framework and Risk Taxonomy**
The authors depart from standard risk classifications (e.g., systematic vs. unsystematic) and instead propose a taxonomy derived directly from the three primary accounting statements. This creates a hierarchical structure suitable for MCDM decomposition.

The risk model is defined by four main criteria ($C_j$), subdivided into secondary criteria (financial ratios):
1.  **Capital Structure Risk (CSR):** Derived from the Balance Sheet (e.g., Debt-to-Equity, Retained Earnings/Assets).
2.  **Liquidity Risk (LR):** Derived from the Balance Sheet (e.g., Turnover ratio, Fast liquidity ratio).
3.  **Income Risk (IR):** Derived from the Income Statement (e.g., Net Profit/Sales).
4.  **Cash Flow Risk (CFR):** Derived from the Cash Flow Statement (e.g., Operating Cash Flow/Total Debt).

### **Algorithmic Methodology**
The research employs a two-phase computational approach.

#### **Phase I: Weight Determination via AHP**
The Analytic Hierarchy Process (Saaty, 1980) is used to extract expert heuristics and convert them into numerical weights.
1.  **Pairwise Comparison:** A matrix $A$ is constructed where $A_{ij}$ represents the relative importance of criterion $i$ over $j$.
2.  **Eigenvector Calculation:** The principal eigenvector is computed to determine the relative weights ($W$). The maximum eigenvalue ($\lambda_{max}$) is derived using:
    $$ \lambda_{max} = \sum_{i=1}^{m} EV_i \times S_i $$
    *(Where $EV$ is the eigenvector and $S$ is the column sum).*
3.  **Consistency Check:** A Consistency Ratio ($CR$) is calculated ($CR = CI/RI$). The authors ensure $CR < 0.10$ to validate the logical consistency of the experts' judgments.

#### **Ranking via SAW**
Simple Additive Weighting is applied to rank the "alternatives," which in this study are the **financial years** (2008–2017).
1.  **Decision Matrix Construction:** A matrix $D$ of size $m \times n$ (10 years $\times$ 34 ratios).
2.  **Normalization (Max-Min):** The data is normalized to a $[0,1]$ scale to handle incommensurable units.
    *   **Benefit Criteria (Max):** $r_{ij} = (x_{ij} - x_j^-) / (x_j^+ - x_j^-)$
    *   **Cost Criteria (Min):** $r_{ij} = (x_j^+ - x_{ij}) / (x_j^+ - x_j^-)$
    *(Note: The paper explicitly identifies ratios like Total Debt/Equity as cost criteria and Cash Flow/Debt as benefit criteria).*
3.  **Weighted Aggregation:** The final score $V_i$ for each year is the dot product of the weight vector and the normalized attribute vector:
    $$ V_i = \sum_{j=1}^{m} w_j r_{ij} $$

### **Empirical Results**

#### **Weighting Results (AHP)**
Based on the input of five financial experts, the algorithm yielded the following hierarchy of risk importance:
1.  **Cash Flow Risk (CFR):** **45.9%** (Highest Weight). The experts view the ability to generate cash as the primary determinant of risk.
2.  **Liquidity Risk (LR):** **24.3%**.
3.  **Income Risk (IR):** **15.1%**.
4.  **Capital Structure Risk (CSR):** **14.5%** (Lowest Weight).

*Observation:* The specific sub-criterion **"Liquidity Readiness Ratio" (LR3)** was deemed the single most critical metric (13.2% global weight), while **"Retained Earnings to Assets"** was the least critical.

#### **Temporal Ranking (SAW)**
The model generated a risk index for each year (where a higher score implies *better* performance/lower risk, though the paper's phrasing on "risk exposure" vs "performance score" requires careful interpretation of the SAW inversion).

*   **Highest Risk Year:** **2016**.
    *   *Reasoning:* Despite high liquidity ratios, the company suffered from poor cash flow generation relative to profits.
*   **Lowest Risk Year:** **2009**.
    *   *Reasoning:* High cash generation rates and stable financial ratios.

### **Critical Assessment & Conclusion**

**Strengths:**
*   **Methodological Integration:** Successfully demonstrates the coupling of subjective expert intuition (AHP) with objective historical data (SAW).
*   **Granularity:** The decomposition of risk into 34 distinct financial ratios provides a high-resolution view of the company's financial health.

**Limitations (Academic Critique):**
*   **Deterministic Nature:** The model treats financial ratios as static inputs. It lacks stochastic elements (e.g., Monte Carlo simulations) that would account for market volatility or uncertainty in the ratios themselves.
*   **Sample Size:** The AHP relies on only five experts, which may introduce bias.
*   **Interpretation of "Risk":** The paper conflates "Performance" with "Risk." In SAW, a higher score usually indicates the "best" alternative. The paper concludes 2016 is the "most dangerous" but assigns it the highest score (Rank 1) in Table 12, while the text implies 2016 is the riskiest. There is a semantic inversion in the paper's conclusion versus the standard mathematical interpretation of SAW scores (usually High Score = Good). *Correction: Upon close inspection of the conclusion, the authors state 2016 had the highest "financial risk," yet Table 12 ranks it #1. This suggests the authors may have modeled "Risk" as the benefit criterion, or there is a discrepancy in their interpretation of the SAW output.*

**Final Verdict:** The paper provides a functional heuristic framework for internal audit and risk assessment, prioritizing **Cash Flow** analysis over traditional Balance Sheet leverage metrics.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  Heuristic-Augmented Financial Risk Index (HAFRI) Construction
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Assessing Financial Statement Risks among
#  MCDM Techniques" by Marwa Abdullah, Revzon Oksana Anatolyevna, and Duaa
#  Abdullah (2025). It delivers a computationally tractable system for quantitative
#  financial risk assessment, enabling robust, multi-criteria ranking of fiscal
#  periods through the integration of expert heuristics and objective financial data.
#
#  Core Methodological Components:
#  • Analytic Hierarchy Process (AHP) for subjective weight elicitation
#  • Hierarchical decomposition of financial risk into Capital Structure, Liquidity,
#    Income, and Cash Flow domains
#  • Pairwise comparison matrix construction and consistency validation (CR < 0.10)
#  • Principal eigenvector approximation via Row Average Method for weight derivation
#  • Simple Additive Weighting (SAW) for multi-attribute utility aggregation
#  • Min-Max normalization with explicit Benefit/Cost criterion directionality
#  • Composite risk index calculation via linear weighted summation
#
#  Technical Implementation Features:
#  • Robust data validation pipeline for expert surveys and financial statements
#  • Automated enforcement of accounting identities and derived field computation
#  • Vectorized matrix operations for efficient AHP weight calculation
#  • Dynamic handling of zero-denominator anomalies and missing data policies
#  • Comprehensive robustness analysis framework (sensitivity to weights/methods)
#  • Automated generation of technical reports and reproducibility artifacts
#
#  Paper Reference:
#  Abdullah, M., Anatolyevna, R. O., & Abdullah, D. (2025). Assessing Financial
#  Statement Risks among MCDM Techniques. arXiv preprint arXiv:2512.04035v1.
#  https://arxiv.org/abs/2512.04035v1
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

import copy
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set, Tuple

import numpy as np
import pandas as pd


# Implementation

## Draft 1

### **Discussion of the Inputs, Processes and Outputs of Key Pipeline Consituents**

### **Task 1: Validate the `study_configuration` dictionary**

**1. `validate_study_configuration` (Orchestrator)**
*   **Inputs:** `study_configuration` (Dict).
*   **Process:** Sequentially invokes sub-validators (`validate_top_level_keys`, `validate_metadata_scope`, `validate_schema_definitions`) to check structural integrity, metadata constraints (e.g., 2008–2017 timeframe), and schema completeness.
*   **Outputs:** `bool` (True if valid, raises Exception otherwise).
*   **Research Role:** Implements the **initialization phase** of the research pipeline. It ensures the computational environment matches the study's parameters: "Applied to AL-Ahliah Vegetable Oil Company... from 2008 to 2017." It enforces the structural prerequisites for all subsequent MCDM calculations.

### **Task 2: Validate the `raw_expert_survey_df` DataFrame schema**

**2. `validate_raw_expert_survey` (Orchestrator)**
*   **Inputs:** `raw_expert_survey_df` (DataFrame).
*   **Process:** Checks for required columns (`expert_id`, `hierarchy_level`, etc.), validates categorical domains (e.g., exactly 5 experts), and enforces the Saaty scale constraint ($x \in \{1..9\}$).
*   **Outputs:** `bool` (True if valid).
*   **Research Role:** Implements the **data quality assurance** for the AHP phase. It ensures the input data conforms to the "expert questionnaire" methodology described in Section 5, specifically validating the "scale of the hierarchical analysis process" (Table 4).

### **Task 3: Validate combinatorial completeness**

**3. `validate_combinatorial_completeness` (Orchestrator)**
*   **Inputs:** `raw_expert_survey_df` (DataFrame).
*   **Process:** Retrieves the canonical hierarchy structure. For each expert and hierarchy level, it calculates the required number of pairwise comparisons $N_h = \binom{n_h}{2}$ and verifies that the input data contains exactly this many unique unordered pairs.
*   **Outputs:** `bool` (True if complete).
*   **Research Role:** Enforces the **completeness axiom** of the AHP methodology. It ensures that for every matrix $A^{(e)}_{(h)}$, all off-diagonal entries $A_{ij}$ ($i < j$) are provided, allowing for the construction of fully defined reciprocal matrices as required by Saaty's method.

### **Task 4: Clean and standardize `raw_expert_survey_df`**

**4. `clean_and_standardize_survey` (Orchestrator)**
*   **Inputs:** `raw_expert_survey_df` (DataFrame).
*   **Process:** Removes degenerate rows (self-comparisons where $i=j$), strips whitespace from string identifiers to ensure canonical matching, and re-verifies combinatorial completeness post-cleaning.
*   **Outputs:** `cleaned_survey_df` (DataFrame).
*   **Research Role:** Implements the **data preprocessing** step necessary to construct the "pairwise comparison matrix" described in Section 5. It ensures that the diagonal elements $A_{ii}=1$ are implicit and not duplicated in the input stream.

### **Task 5: Validate the `raw_financial_statement_df` DataFrame schema**

**5. `validate_financial_statements` (Orchestrator)**
*   **Inputs:** `raw_financial_statement_df` (DataFrame).
*   **Process:** Validates the presence of the `fiscal_year` index and 25 base financial columns. Enforces temporal continuity and uniqueness for the period 2008–2017.
*   **Outputs:** `validated_financial_df` (DataFrame).
*   **Research Role:** Implements the **data validation** for the "financial statements" analysis (Section 3). It ensures the dataset covers the exact "ten years (2008–2017)" specified in the Abstract and Case Study sections.

### **Task 6: Validate financial identities and enforce derived equalities**

**6. `enforce_financial_identities` (Orchestrator)**
*   **Inputs:** `validated_financial_df` (DataFrame).
*   **Process:** Validates the accounting identity $\text{Total Debt} \approx \text{Short Term} + \text{Long Term}$. Derives `net_working_capital` ($CA - CL$) and `net_profit_before_interest` ($\text{Net Profit} + \text{Interest}$).
*   **Outputs:** `enriched_financial_df` (DataFrame).
*   **Research Role:** Implements the **feature engineering** logic required to compute specific ratios defined in Tables 1 and 2. For example, it constructs the numerator for the ratio "Net Profit Before Interest / net profit after interest" (IR1).

### **Task 7: Handle missing values and zero denominators**

**7. `handle_missing_and_zero_values` (Orchestrator)**
*   **Inputs:** `enriched_financial_df` (DataFrame), `study_configuration` (Dict).
*   **Process:** Scans for critical missing data. Evaluates denominator expressions for all 34 ratios and identifies year-ratio pairs where the denominator is effectively zero. Returns a mask to enforce NaNs in these cases.
*   **Outputs:** `diagnostic_report` (Dict), `zero_mask` (DataFrame).
*   **Research Role:** Implements the **robustness checks** for ratio computation. It addresses the "degree of uncertainty" mentioned in the introduction by identifying data points where financial ratios (e.g., "Net cash flows / net profit") are mathematically undefined.

### **Task 8: Define hierarchical structure**

**8. `initialize_ahp_hierarchy` (Orchestrator)**
*   **Inputs:** None (uses internal static definition).
*   **Process:** Instantiates the `AHPHierarchy` class, which defines the 4 main criteria and 34 sub-criteria, maps sub-criteria to parents, and establishes the canonical global order.
*   **Outputs:** `hierarchy` (AHPHierarchy object).
*   **Research Role:** Implements the **hierarchical decomposition** of the problem (Figure 1). It structurally defines the "goal representing the problem" and the "main criteria" and "secondary criteria" as described in Section 4.

### **Task 9: Build pairwise comparison matrices**

**9. `build_ahp_matrices` (Orchestrator)**
*   **Inputs:** `cleaned_survey_df` (DataFrame), `hierarchy` (Object).
*   **Process:** Initializes $n \times n$ matrices with 1s on the diagonal. Populates off-diagonal entries $A_{ij}$ from survey data. Enforces reciprocity by setting $A_{ji} = 1/A_{ij}$.
*   **Outputs:** `matrices` (Nested Dict of np.ndarray).
*   **Research Role:** Implements **Step 1(a)** of the AHP methodology (Section 5): "Building a pairwise comparison matrix... $A_{ij} = 1/a_{ji}$".

### **Task 10: Compute local weights**

**10. `compute_ahp_local_weights` (Orchestrator)**
*   **Inputs:** `matrices` (Dict).
*   **Process:** Computes column sums $S_j = \sum_i A_{ij}$. Normalizes matrices $\tilde{A}_{ij} = A_{ij}/S_j$. Computes row averages $w_i = \frac{1}{n} \sum_j \tilde{A}_{ij}$ to derive local weights.
*   **Outputs:** `local_weights` (Dict), `column_sums` (Dict).
*   **Research Role:** Implements **Step 1(b) and 1(c)** of the AHP methodology: "Derive the normalized comparison matrix" and "Calculate the relative weights for each row".

### **Task 11: Compute consistency metrics**

**11. `compute_ahp_consistency_metrics` (Orchestrator)**
*   **Inputs:** `local_weights`, `column_sums`, `hierarchy`, `ahp_parameters`.
*   **Process:** Computes $\lambda_{max} = \sum w_i S_i$. Computes Consistency Index $CI = (\lambda_{max} - n)/(n-1)$. Retrieves Random Index $RI(n)$.
*   **Outputs:** `lambda_max` (Dict), `ci` (Dict), `ri` (Dict).
*   **Research Role:** Implements **Step 1(d)** of the AHP methodology: "Calculate the consistency ratio or consistency index... $\lambda_{max} = \sum EV_i * S_i$".

### **Task 12: Filter consistent matrices**

**12. `filter_consistent_matrices` (Orchestrator)**
*   **Inputs:** `ci`, `ri`, `ahp_parameters`.
*   **Process:** Computes $CR = CI/RI$. Filters experts where $CR < 0.10$. Verifies at least one expert remains per level.
*   **Outputs:** `cr_values` (Dict), `accepted_experts` (Dict).
*   **Research Role:** Implements the **consistency validation** rule: "The result is considered acceptable if the consistency ratio is less than 0.10."

### **Task 13: Aggregate main criteria weights**

**13. `aggregate_main_criteria_weights` (Orchestrator)**
*   **Inputs:** `local_weights`, `accepted_experts`.
*   **Process:** Selects weight vectors for "Main_Criteria" from accepted experts. Computes the arithmetic mean vector and normalizes it to sum to 1.
*   **Outputs:** `main_weights` (np.ndarray).
*   **Research Role:** Implements the **group decision making** aggregation to derive the consensus weights for CSR, LR, IR, and CFR (Table 9).

### **Task 14: Aggregate sub-criteria weights**

**14. `aggregate_all_sub_criteria_weights` (Orchestrator)**
*   **Inputs:** `local_weights`, `accepted_experts`.
*   **Process:** Iterates through sub-levels (Sub_CSR, etc.), aggregates accepted expert vectors via arithmetic mean, and maps them to their parent codes.
*   **Outputs:** `sub_weights` (Dict of np.ndarray).
*   **Research Role:** Implements the **group aggregation** for the secondary criteria levels, producing the "Average weights of secondary criteria" (Table 9).

### **Task 15: Compute global weights**

**15. `compute_global_weights` (Orchestrator)**
*   **Inputs:** `main_weights`, `sub_weights`, `hierarchy`.
*   **Process:** Computes global weights via hierarchical composition: $w_s^{global} = w_{parent(s)}^{main} \times w_s^{local}$. Assembles them into a canonical vector and normalizes.
*   **Outputs:** `global_weights` (np.ndarray).
*   **Research Role:** Implements the **hierarchical synthesis** to produce the final "Average weights of secondary standards relative to the target" (Figure 3).

### **Task 16: Prepare ratio logic**

**16. `prepare_ratio_computation_logic` (Orchestrator)**
*   **Inputs:** `hierarchy`, `study_configuration`.
*   **Process:** Retrieves the canonical ratio order and extracts the numerator/denominator expressions for all 34 ratios from the configuration.
*   **Outputs:** `canonical_order` (List), `ratio_specs` (Dict).
*   **Research Role:** Prepares the **computational definitions** for the financial ratios listed in Tables 1, 2, and 3.

### **Task 17: Compute decision matrix**

**17. `compute_decision_matrix` (Orchestrator)**
*   **Inputs:** `enriched_financial_df`, `canonical_order`, `ratio_specs`, `zero_mask`.
*   **Process:** Evaluates ratio expressions against financial data for each year. Applies the zero-mask to force NaNs where denominators are zero.
*   **Outputs:** `X` (DataFrame), `report` (Dict).
*   **Research Role:** Computes the **raw decision matrix** $X$ where rows are years (alternatives) and columns are ratios (criteria), representing the "actual historical values derived from the financial statements".

### **Task 18: Validate decision matrix**

**18. `validate_and_freeze_decision_matrix` (Orchestrator)**
*   **Inputs:** `X`.
*   **Process:** Checks for zero-variance columns and statistical outliers. Freezes the matrix for SAW processing.
*   **Outputs:** `X_final` (DataFrame), `valid_mask` (DataFrame), `diagnostics` (Dict).
*   **Research Role:** Ensures the **data integrity** of the decision matrix before normalization, a prerequisite for the SAW method.

### **Task 19: Configure SAW directionality**

**19. `configure_saw_directionality` (Orchestrator)**
*   **Inputs:** `saw_parameters`, `canonical_order`.
*   **Process:** Validates that every ratio is assigned exactly one direction (Benefit or Cost) and builds a lookup map.
*   **Outputs:** `directionality_map` (Dict).
*   **Research Role:** Defines the **risk semantics** for each ratio (e.g., "Total Debt / Equity" is a cost criterion/Min, "Cash Readiness" is a benefit criterion/Max) as required for Step 2 of the SAW method.

### **Task 20: Normalize decision matrix**

**20. `normalize_decision_matrix` (Orchestrator)**
*   **Inputs:** `X_final`, `directionality_map`.
*   **Process:** Computes column extrema ($x_j^+, x_j^-$). Applies Min-Max normalization: $r_{tj} = \frac{x_j^+ - x_{tj}}{x_j^+ - x_j^-}$ (Benefit) or $r_{tj} = \frac{x_{tj} - x_j^-}{x_j^+ - x_j^-}$ (Cost).
*   **Outputs:** `R` (DataFrame).
*   **Research Role:** Implements **Step 2 of the SAW method**: "Converting the Pairwise Comparison Matrix into a Normalized Matrix (Max/Min)". Note: The paper's notation for cost criteria was inverted; this implementation uses the standard correct formula for risk (higher raw cost = higher risk score).

### **Task 21: Compute weighted risk matrix**

**21. `compute_weighted_risk_matrix` (Orchestrator)**
*   **Inputs:** `R`, `global_weights`, `canonical_order`.
*   **Process:** Aligns weights to columns and computes element-wise product $v_{tj} = w_j \times r_{tj}$.
*   **Outputs:** `V` (DataFrame), `report` (Dict).
*   **Research Role:** Implements **Step 3 of the SAW method**: "Building the Weighted Normalized Matrix V... multiplying the normalized matrix... by the vector of relative weights".

### **Task 22: Compute composite risk scores**

**22. `compute_composite_risk_scores` (Orchestrator)**
*   **Inputs:** `V`.
*   **Process:** Sums the weighted risk scores across all criteria for each year: $V_t = \sum_j v_{tj}$.
*   **Outputs:** `V_t` (Series), `report` (Dict).
*   **Research Role:** Implements **Step 4 of the SAW method**: "Calculate the performance vector of alternatives... $V = \sum w_j r_{ij}$".

### **Task 23: Compute relative risk indices**

**23. `compute_relative_risk_indices` (Orchestrator)**
*   **Inputs:** `V_t`.
*   **Process:** Computes the total sum $S = \sum V_t$ and divides each year's score by the sum: $A_t = V_t / S$.
*   **Outputs:** `A_t` (Series), `report` (Dict).
*   **Research Role:** Implements **Step 5 of the SAW method**: "Rank the alternatives... $A_{ij} = V_{ij} / \sum V_{ij}$". This produces the relative risk share for each year.

### **Task 24: Rank and compare years**

**24. `rank_and_compare_years` (Orchestrator)**
*   **Inputs:** `A_t`.
*   **Process:** Ranks years by $A_t$ descending (Rank 1 = Highest Risk). Identifies max/min risk years. Compares rankings with the study's reported results (Table 12).
*   **Outputs:** `ranking_df` (DataFrame), `extremes` (Dict), `comparison_df` (DataFrame).
*   **Research Role:** Implements **Step 6 of the SAW method**: "Determine the best alternative" (or in this context, the riskiest year). It validates the finding that "The year 2016 was the year in which the company was exposed to the highest level of financial risk".

### **Task 25: Main Pipeline Orchestrator**

**25. `run_hafri_pipeline` (Orchestrator)**
*   **Inputs:** Raw DataFrames, Configuration.
*   **Process:** Sequentially executes Validation, AHP, Ratio, and SAW phases using the sub-orchestrators.
*   **Outputs:** `baseline_results` (Dict).
*   **Research Role:** Represents the **execution of the primary research methodology** to derive the HAFRI index.

### **Task 26: Robustness Analysis Orchestrator**

**26. `conduct_robustness_analysis` (Orchestrator)**
*   **Inputs:** Configuration, Raw DataFrames.
*   **Process:** Generates scenario configurations (Strict/Relaxed Consistency), executes the pipeline for each, and aggregates ranking statistics.
*   **Outputs:** `scenario_results` (Dict), `robustness_summary` (DataFrame).
*   **Research Role:** Implements a **sensitivity analysis** (implied by the need for rigorous validation) to test if the identification of 2016 as the riskiest year holds under varying assumptions.

### **Task 27: Validation Orchestrator**

**27. `validate_results_against_study` (Orchestrator)**
*   **Inputs:** `global_weights`, `hierarchy`, `ranking_df`, `comparison_df`.
*   **Process:** Compares computed weights and scores against hardcoded values from the paper. Generates a discrepancy report.
*   **Outputs:** `validation_results` (Dict).
*   **Research Role:** Performs the **verification** step, ensuring the implementation reproduces the published results (e.g., CFR weight $\approx$ 45.9%).

### **Task 28: Packaging Orchestrator**

**28. `package_project_outputs` (Orchestrator)**
*   **Inputs:** `results`, `study_configuration`.
*   **Process:** Generates a Markdown technical report and serializes all data artifacts (CSVs, JSONs) into a file mapping.
*   **Outputs:** `final_package` (Dict).
*   **Research Role:** Produces the **final deliverables** and documentation required for reproducibility and external audit.

<br><br>
### **Usage Example**

The following code snippet uses synthentic data to demonstrate, in a step by step fashion, how to run the Heuristic-Augmented Financial Risk Index (HAFRI) pipeline accurately:

```python
import pandas as pd
import numpy as np
import yaml
import itertools
from typing import List, Dict

# ==============================================================================
# HAFRI Pipeline Usage Example
# ==============================================================================
# This script demonstrates the end-to-end execution of the Heuristic-Augmented
# Financial Risk Index (HAFRI) pipeline using synthetic data that mirrors the
# structure of the Al-Ahlia Vegetable Oil Company case study.

# ------------------------------------------------------------------------------
# Step 1: Synthetic Data Generation - Expert Survey
# ------------------------------------------------------------------------------
# We generate a synthetic dataset of expert pairwise comparisons.
# The data must satisfy the combinatorial requirement: n*(n-1)/2 pairs per level.

def generate_synthetic_survey() -> pd.DataFrame:
    """Generates a structurally valid synthetic expert survey DataFrame."""
    
    experts = [f"E{i}" for i in range(1, 6)] # 5 Experts
    
    # Define hierarchy levels and their criteria (matching the config)
    hierarchy_structure = {
        "Main_Criteria": ["CSR", "LR", "IR", "CFR"],
        "Sub_CSR": [f"CSR{i}" for i in range(1, 12)],
        "Sub_LR": [f"LR{i}" for i in range(1, 4)],
        "Sub_IR": [f"IR{i}" for i in range(1, 7)],
        "Sub_CFR": [f"CFR{i}" for i in range(1, 15)]
    }
    
    survey_rows = []
    
    for expert in experts:
        for level, criteria in hierarchy_structure.items():
            # Generate all unique unordered pairs (i, j) where i != j
            # We use combinations to get n*(n-1)/2 pairs
            pairs = list(itertools.combinations(criteria, 2))
            
            for crit_i, crit_j in pairs:
                # Assign a random Saaty scale value {1..9}
                # In a real scenario, these are elicited judgments.
                # We use a fixed seed for reproducibility in this example context.
                val = np.random.randint(1, 10)
                
                survey_rows.append({
                    "expert_id": expert,
                    "hierarchy_level": level,
                    "criterion_i": crit_i,
                    "criterion_j": crit_j,
                    "saaty_scale_value": float(val),
                    "comparison_type": "Direct"
                })
                
    return pd.DataFrame(survey_rows)

# Generate the survey dataframe
print("Generating synthetic expert survey data...")
raw_expert_survey_df = generate_synthetic_survey()
print(f"Survey Data Generated: {raw_expert_survey_df.shape[0]} rows.")
print(raw_expert_survey_df.head())
print("-" * 80)

# ------------------------------------------------------------------------------
# Step 2: Synthetic Data Generation - Financial Statements
# ------------------------------------------------------------------------------
# We generate synthetic financial statements for 2008-2017.
# We ensure basic accounting identities hold to pass validation.

def generate_synthetic_financials() -> pd.DataFrame:
    """Generates structurally valid synthetic financial statements."""
    years = list(range(2008, 2018))
    n_years = len(years)
    
    data = {"fiscal_year": years}
    
    # Helper to generate positive floats
    def gen_series(low, high):
        return np.random.uniform(low, high, n_years)

    # --- Balance Sheet ---
    # Construct components first to ensure identities
    data["current_assets"] = gen_series(500, 1000)
    data["net_fixed_assets"] = gen_series(1000, 2000)
    data["total_assets"] = data["current_assets"] + data["net_fixed_assets"] # Identity
    
    data["current_liabilities"] = gen_series(200, 500)
    data["short_term_debt"] = data["current_liabilities"] * 0.8 # Assumption
    data["long_term_debt"] = gen_series(100, 300)
    data["total_debt"] = data["short_term_debt"] + data["long_term_debt"] # Identity
    
    # Equity = Assets - Liabilities (simplified)
    # We treat total_debt + other_liabilities as total liabilities
    # For simplicity in this mock, we define equity explicitly
    data["shareholders_equity"] = data["total_assets"] - (data["current_liabilities"] + data["long_term_debt"])
    
    data["inventory"] = data["current_assets"] * 0.4
    data["cash_and_equivalents"] = data["current_assets"] * 0.1
    data["retained_earnings"] = data["shareholders_equity"] * 0.2
    
    # --- Income Statement ---
    data["sales_revenue"] = gen_series(800, 1500)
    data["gross_profit"] = data["sales_revenue"] * 0.4
    data["ebit"] = data["gross_profit"] * 0.5
    data["interest_expense"] = data["long_term_debt"] * 0.05
    data["net_profit"] = data["ebit"] - data["interest_expense"]
    
    # --- Cash Flow Statement ---
    data["net_operating_cash_flow"] = gen_series(100, 300)
    data["capital_expenditures"] = gen_series(50, 100)
    data["net_investing_cash_flow"] = -data["capital_expenditures"]
    data["net_financing_cash_flow"] = gen_series(-50, 50)
    
    # Derived CF fields
    data["total_cash_flow_inv_fin"] = data["net_investing_cash_flow"] + data["net_financing_cash_flow"]
    data["cash_distributions"] = gen_series(10, 50)
    data["operating_cash_inflows"] = data["sales_revenue"] # Proxy
    data["initial_cash_requirements"] = gen_series(50, 100)

    return pd.DataFrame(data)

# Generate the financial dataframe
print("Generating synthetic financial statement data...")
raw_financial_statement_df = generate_synthetic_financials()
print(f"Financial Data Generated: {raw_financial_statement_df.shape} (Rows, Cols).")
print(raw_financial_statement_df[["fiscal_year", "total_assets", "net_profit"]].head())
print("-" * 80)

# ------------------------------------------------------------------------------
# Step 3: Load Configuration
# ------------------------------------------------------------------------------
# We load the study parameters from the YAML file.

print("Loading study configuration...")
try:
    with open("study_configuration.yaml", "r") as f:
        study_configuration = yaml.safe_load(f)
    print("Configuration loaded successfully.")
except FileNotFoundError:
    print("Error: 'study_configuration.yaml' not found. Please ensure Task 28 output is saved.")
    # For the sake of the example running if file is missing, we would define it inline,
    # but we assume the file exists as per instructions.
    raise

print("-" * 80)

# ------------------------------------------------------------------------------
# Step 4: Execute the HAFRI Master Pipeline
# ------------------------------------------------------------------------------
# We invoke the top-level orchestrator to run Validation, AHP, SAW, and Packaging.

print("Executing HAFRI Master Pipeline...")

# Note: We assume 'execute_hafri_master_pipeline' is available in the environment.
# In a real notebook, all previous cells defining the functions must be run first.

try:
    master_artifacts = execute_hafri_master_pipeline(
        raw_expert_survey_df=raw_expert_survey_df,
        raw_financial_statement_df=raw_financial_statement_df,
        study_configuration=study_configuration
    )
    print("Pipeline execution completed.")
except Exception as e:
    print(f"Pipeline failed: {e}")
    # In a real scenario, we would inspect the logs here.
    raise

print("-" * 80)

# ------------------------------------------------------------------------------
# Step 5: Inspect Outputs
# ------------------------------------------------------------------------------
# We explore the returned artifacts to understand the results.

if 'master_artifacts' in locals():
    baseline = master_artifacts["baseline_results"]
    
    # 1. View Global Weights
    print("\n>>> Top 5 Global Risk Drivers (Weights):")
    weights = pd.Series(baseline["global_weights"], index=baseline["decision_matrix"].columns)
    print(weights.sort_values(ascending=False).head(5))
    
    # 2. View Final Ranking
    print("\n>>> Financial Year Risk Ranking:")
    ranking = baseline["ranking"][["A_t", "Rank"]]
    print(ranking.sort_values("Rank"))
    
    # 3. View Robustness Summary
    print("\n>>> Robustness Analysis (Rank Stability):")
    robustness = master_artifacts["robustness_results"]["stability_summary"]
    print(robustness[["Mean_Rank", "Std_Rank", "Min_Rank", "Max_Rank"]])
    
    # 4. View Validation Report
    print("\n>>> Validation against Original Study:")
    validation = master_artifacts["validation_results"]["summary_report"]
    print(f"Rank Consistency: {validation['rank_consistency']['status']}")
    print(f"Weight Consistency: {validation['weight_consistency']['status']}")
```

<br>

In [None]:
# Task 1 – Validate the `study_configuration` dictionary for structural completeness

# ==============================================================================
# Task 1: Validate and parse the study configuration dictionary
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1:  Load the `study_parameters` dictionary and verify structural completeness.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_top_level_keys(study_configuration: Dict[str, Any]) -> None:
    """
    Validates the presence of required top-level keys in the study configuration dictionary.

    This function ensures that the configuration object contains all the necessary sections
    required for the HAFRI pipeline, specifically metadata, schemas, AHP parameters,
    feature engineering logic, and SAW parameters.

    Args:
        study_configuration (Dict[str, Any]): The main configuration dictionary.

    Raises:
        TypeError: If `study_configuration` is not a dictionary.
        ValueError: If any required top-level key is missing.
    """
    # Validate that the input is a dictionary
    if not isinstance(study_configuration, dict):
        raise TypeError(f"study_configuration must be a dict, got {type(study_configuration)}")

    # Define the set of required top-level keys based on the study specification
    required_keys: Set[str] = {
        "metadata",
        "input_data_schemas",
        "ahp_parameters",
        "feature_engineering_logic",
        "saw_parameters"
    }

    # Extract the actual keys present in the configuration
    actual_keys: Set[str] = set(study_configuration.keys())

    # Calculate missing keys by set difference
    missing_keys: Set[str] = required_keys - actual_keys

    # If there are missing keys, raise an error with a descriptive message
    if missing_keys:
        raise ValueError(f"Missing required top-level keys in study_configuration: {missing_keys}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Validate numerical parameter ranges and types.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_metadata_scope(study_configuration: Dict[str, Any]) -> None:
    """
    Validates the metadata section of the configuration, specifically temporal scope and expert panel size.

    This function enforces the specific constraints of the study:
    - Time horizon: 2008 to 2017 (10 periods).
    - Expert panel: 5 experts.
    It also verifies the arithmetic consistency of the time horizon.

    Args:
        study_configuration (Dict[str, Any]): The main configuration dictionary.

    Raises:
        KeyError: If required nested keys are missing.
        ValueError: If values are invalid, inconsistent, or do not match the study's requirements.
        TypeError: If values cannot be cast to integers.
    """
    try:
        # Access the metadata section
        metadata = study_configuration["metadata"]

        # Access the time_horizon subsection
        time_horizon = metadata["time_horizon"]

        # Extract and cast temporal parameters to integers
        start_year: int = int(time_horizon["start_year"])
        end_year: int = int(time_horizon["end_year"])
        total_periods: int = int(time_horizon["total_periods"])

        # Access the expert_panel subsection
        expert_panel = metadata["expert_panel"]

        # Extract and cast expert count
        expert_count: int = int(expert_panel["count"])

    except KeyError as e:
        # Raise error if a specific key path is missing
        raise KeyError(f"Missing required metadata key: {e}")
    except (ValueError, TypeError) as e:
        # Raise error if values are not valid integers
        raise TypeError(f"Metadata values must be convertible to integers: {e}")

    # Verify the arithmetic identity: total_periods = end - start + 1
    # Equation: T = Y_end - Y_start + 1
    expected_periods: int = end_year - start_year + 1
    if total_periods != expected_periods:
        raise ValueError(
            f"Time horizon inconsistency: calculated periods ({expected_periods}) "
            f"!= provided total_periods ({total_periods})"
        )

    # Enforce specific study constraints for replication
    # Constraint: Start Year = 2008
    if start_year != 2008:
        raise ValueError(f"Study replication requires start_year=2008, got {start_year}")

    # Constraint: End Year = 2017
    if end_year != 2017:
        raise ValueError(f"Study replication requires end_year=2017, got {end_year}")

    # Constraint: Expert Count = 5
    if expert_count != 5:
        raise ValueError(f"Study replication requires expert_count=5, got {expert_count}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Validate string-based model identifiers and create a configuration snapshot.
# -------------------------------------------------------------------------------------------------------------------------------

def validate_schema_definitions(study_configuration: Dict[str, Any]) -> None:
    """
    Validates the input data schemas for expert surveys and financial statements.

    Ensures that the configuration defines the correct column structures required
    for downstream processing (AHP matrix construction and Ratio computation).

    Args:
        study_configuration (Dict[str, Any]): The main configuration dictionary.

    Raises:
        KeyError: If schema sections are missing.
        ValueError: If required columns are missing from the schema definitions.
    """
    # Access the input_data_schemas section
    schemas = study_configuration.get("input_data_schemas", {})

    # --- Validate Expert Survey Schema ---
    survey_schema = schemas.get("expert_survey_schema", {})
    survey_columns_dict = survey_schema.get("columns", {})

    # Define required columns for the expert survey dataframe
    required_survey_cols: Set[str] = {
        "expert_id",
        "hierarchy_level",
        "criterion_i",
        "criterion_j",
        "saaty_scale_value",
        "comparison_type"
    }

    # Check for missing survey columns
    actual_survey_cols = set(survey_columns_dict.keys())
    missing_survey_cols = required_survey_cols - actual_survey_cols

    if missing_survey_cols:
        raise ValueError(f"expert_survey_schema missing required columns: {missing_survey_cols}")

    # --- Validate Financial Statement Schema ---
    fin_schema = schemas.get("financial_statement_schema", {})
    fin_columns_dict = fin_schema.get("columns", {})

    # Define required columns for the financial statement dataframe
    # These correspond to the 27 fields identified in the pre-code analysis
    required_fin_cols: Set[str] = {
        # Balance Sheet
        "total_assets", "current_assets", "inventory", "cash_and_equivalents",
        "net_fixed_assets", "total_debt", "long_term_debt", "short_term_debt",
        "current_liabilities", "shareholders_equity", "retained_earnings",
        "net_working_capital",

        # Income Statement
        "sales_revenue", "gross_profit", "ebit", "interest_expense",
        "net_profit", "net_profit_before_interest",

        # Cash Flow Statement
        "net_operating_cash_flow", "capital_expenditures", "net_investing_cash_flow",
        "net_financing_cash_flow", "total_cash_flow_inv_fin", "cash_distributions",
        "operating_cash_inflows", "initial_cash_requirements"
    }

    # Check for missing financial columns
    actual_fin_cols = set(fin_columns_dict.keys())
    missing_fin_cols = required_fin_cols - actual_fin_cols

    # Note: We allow net_working_capital and net_profit_before_interest to be missing
    # from the *raw* dataframe (as they can be derived), but the *schema* must define them
    # as expected fields. However, strictly following the prompt's schema definition,
    # we enforce their presence in the schema keys.
    if missing_fin_cols:
        raise ValueError(f"financial_statement_schema missing required columns: {missing_fin_cols}")

    # Validate index definition
    if fin_schema.get("index") != "fiscal_year (Int64, 2008–2017)":
         # We check loosely for 'fiscal_year' in the string to be robust to description changes
         if "fiscal_year" not in str(fin_schema.get("index", "")):
             raise ValueError("financial_statement_schema index must be defined as 'fiscal_year'")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_study_configuration(study_configuration: Dict[str, Any]) -> bool:
    """
    Orchestrator function for Task 1: Validates the entire study configuration structure.

    Sequentially calls sub-validators to ensure structural completeness, metadata accuracy,
    and schema validity. This function acts as a gatekeeper before any data processing begins.

    Args:
        study_configuration (Dict[str, Any]): The main configuration dictionary.

    Returns:
        bool: True if validation passes.

    Raises:
        TypeError, ValueError, KeyError: If any validation step fails.
    """
    # Step 1: Check top-level structure
    validate_top_level_keys(study_configuration)

    # Step 2: Check metadata and scope constraints
    validate_metadata_scope(study_configuration)

    # Step 3: Check schema definitions
    validate_schema_definitions(study_configuration)

    return True


In [None]:
# Task 2 – Validate the raw_expert_survey_df DataFrame schema

# ==============================================================================
# Task 2: Validate the `raw_expert_survey_df` DataFrame schema
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Check DataFrame type and column presence
# -------------------------------------------------------------------------------------------------------------------------------

def validate_survey_columns(raw_expert_survey_df: pd.DataFrame) -> None:
    """
    Validates that the input is a pandas DataFrame and contains all required columns
    for the expert survey data.

    This function enforces the schema defined in the study configuration, ensuring
    that the DataFrame has the necessary structure for AHP processing.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.

    Raises:
        TypeError: If the input is not a pandas DataFrame.
        ValueError: If any required column is missing.
    """
    # Validate input type
    # Check if the input object is an instance of pandas DataFrame
    if not isinstance(raw_expert_survey_df, pd.DataFrame):
        raise TypeError(f"Input must be a pandas DataFrame, got {type(raw_expert_survey_df)}")

    # Define the set of required columns based on the schema
    # These columns are essential for identifying experts, hierarchy levels, criteria, and judgments
    required_columns: Set[str] = {
        "expert_id",
        "hierarchy_level",
        "criterion_i",
        "criterion_j",
        "saaty_scale_value",
        "comparison_type"
    }

    # Extract the actual columns from the DataFrame
    actual_columns: Set[str] = set(raw_expert_survey_df.columns)

    # Calculate missing columns
    # Set difference operation: required - actual
    missing_columns: Set[str] = required_columns - actual_columns

    # If missing columns exist, raise an error
    if missing_columns:
        raise ValueError(f"raw_expert_survey_df is missing required columns: {missing_columns}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Validate domain of categorical columns
# -------------------------------------------------------------------------------------------------------------------------------

def validate_survey_categorical_domains(raw_expert_survey_df: pd.DataFrame) -> None:
    """
    Validates the domain constraints for categorical columns in the expert survey DataFrame.

    Checks:
    1. expert_id: Exactly 5 unique experts.
    2. hierarchy_level: Values must belong to the defined set of AHP levels.
    3. comparison_type: All values must be 'Direct'.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.

    Raises:
        ValueError: If any domain constraint is violated.
    """
    # --- Validate expert_id ---
    # Extract unique expert IDs
    unique_experts = raw_expert_survey_df["expert_id"].unique()
    num_experts = len(unique_experts)

    # Constraint: There must be exactly 5 unique expert_id values
    if num_experts != 5:
        raise ValueError(f"Expected exactly 5 unique experts, found {num_experts}: {unique_experts}")

    # --- Validate hierarchy_level ---
    # Define the allowed hierarchy levels as per the study design
    allowed_levels: Set[str] = {
        "Main_Criteria",
        "Sub_CSR",
        "Sub_LR",
        "Sub_IR",
        "Sub_CFR"
    }

    # Check for invalid hierarchy levels
    # We use .isin() to create a boolean mask, then invert it with ~ to find invalid rows
    invalid_levels_mask = ~raw_expert_survey_df["hierarchy_level"].isin(allowed_levels)

    if invalid_levels_mask.any():
        # Extract the specific invalid values for the error message
        invalid_values = raw_expert_survey_df.loc[invalid_levels_mask, "hierarchy_level"].unique()
        raise ValueError(f"Invalid hierarchy_level values found: {invalid_values}. Allowed: {allowed_levels}")

    # --- Validate comparison_type ---
    # Constraint: All entries must be 'Direct'
    # We check if any value is NOT 'Direct'
    invalid_type_mask = raw_expert_survey_df["comparison_type"] != "Direct"

    if invalid_type_mask.any():
        invalid_types = raw_expert_survey_df.loc[invalid_type_mask, "comparison_type"].unique()
        raise ValueError(f"Invalid comparison_type values found: {invalid_types}. All must be 'Direct'.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Validate numeric range of Saaty scale values
# -------------------------------------------------------------------------------------------------------------------------------

def validate_saaty_scale_values(raw_expert_survey_df: pd.DataFrame) -> None:
    """
    Validates that the 'saaty_scale_value' column contains valid numeric values
    conforming to Saaty's 1-9 scale.

    Constraints:
    - Values must be numeric.
    - Values must be integers in the set {1, 2, 3, 4, 5, 6, 7, 8, 9}.
    - Reciprocals (values < 1) are not allowed in the raw log.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.

    Raises:
        ValueError: If non-numeric values or values outside the 1-9 integer set are found.
    """
    # Ensure the column is numeric, coercing errors to NaN to identify non-numbers
    # This does not modify the original DataFrame in place, but returns a Series
    numeric_values = pd.to_numeric(raw_expert_survey_df["saaty_scale_value"], errors='coerce')

    # Check for non-numeric values (NaNs after coercion)
    if numeric_values.isna().any():
        # Identify indices where values are non-numeric
        invalid_indices = raw_expert_survey_df.index[numeric_values.isna()].tolist()
        raise ValueError(f"Non-numeric saaty_scale_value found at indices: {invalid_indices}")

    # Define the allowed Saaty scale set
    allowed_saaty_values: Set[int] = {1, 2, 3, 4, 5, 6, 7, 8, 9}

    # Check if values are in the allowed set
    # We check against the numeric series. We use np.isin for efficient checking.
    # Note: This implicitly handles the check for integers, as 3.0 is in {1..9} but 3.5 is not.
    is_valid = numeric_values.isin(allowed_saaty_values)

    if not is_valid.all():
        # Extract invalid values
        invalid_values = raw_expert_survey_df.loc[~is_valid, "saaty_scale_value"].unique()
        raise ValueError(
            f"Invalid saaty_scale_value entries found: {invalid_values}. "
            f"Must be integers in {allowed_saaty_values}."
        )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_raw_expert_survey(raw_expert_survey_df: pd.DataFrame) -> bool:
    """
    Orchestrator function for Task 2: Validates the schema and content domains of the raw expert survey DataFrame.

    Sequentially executes column presence checks, categorical domain validation, and Saaty scale validation.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.

    Returns:
        bool: True if all validations pass.

    Raises:
        TypeError, ValueError: If any validation step fails.
    """
    # Step 1: Check DataFrame type and column presence
    validate_survey_columns(raw_expert_survey_df)

    # Step 2: Validate domain of categorical columns
    validate_survey_categorical_domains(raw_expert_survey_df)

    # Step 3: Validate numeric range of Saaty scale values
    validate_saaty_scale_values(raw_expert_survey_df)

    return True


In [None]:
# Task 3 – Validate combinatorial completeness of expert pairwise comparisons

# ==============================================================================
# Task 3: Validate combinatorial completeness of expert pairwise comparisons
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Define canonical criterion sets per hierarchy level
# -------------------------------------------------------------------------------------------------------------------------------

def get_canonical_hierarchy_criteria() -> Dict[str, List[str]]:
    """
    Returns the canonical mapping of hierarchy levels to their ordered criterion sets.

    This function defines the ground truth for the AHP structure as specified in the
    LaTeX context. The order of criteria in these lists is fixed and determines the
    row/column indices for the AHP matrices.

    Returns:
        Dict[str, List[str]]: A dictionary where keys are hierarchy levels and values
        are lists of criterion labels.
    """
    # Define the hierarchy structure based on the study's taxonomy
    # Level 1: Main Criteria (4 items)
    # Level 2: Sub-criteria for each main criterion
    hierarchy: Dict[str, List[str]] = {
        "Main_Criteria": ["CSR", "LR", "IR", "CFR"],
        "Sub_CSR": [
            "CSR1", "CSR2", "CSR3", "CSR4", "CSR5",
            "CSR6", "CSR7", "CSR8", "CSR9", "CSR10", "CSR11"
        ],
        "Sub_LR": ["LR1", "LR2", "LR3"],
        "Sub_IR": ["IR1", "IR2", "IR3", "IR4", "IR5", "IR6"],
        "Sub_CFR": [
            "CFR1", "CFR2", "CFR3", "CFR4", "CFR5",
            "CFR6", "CFR7", "CFR8", "CFR9", "CFR10",
            "CFR11", "CFR12", "CFR13", "CFR14"
        ]
    }
    return hierarchy

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Validate criterion labels against canonical sets
# -------------------------------------------------------------------------------------------------------------------------------

def validate_criterion_labels(raw_expert_survey_df: pd.DataFrame, hierarchy: Dict[str, List[str]]) -> None:
    """
    Validates that every criterion label in the survey DataFrame belongs to the
    canonical set defined for its hierarchy level.

    This ensures that there are no typos, misclassified criteria, or unknown labels
    in the input data.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.
        hierarchy (Dict[str, List[str]]): The canonical hierarchy mapping.

    Raises:
        ValueError: If a criterion label is invalid for its hierarchy level.
    """
    # Iterate over each hierarchy level present in the canonical definition
    # We filter the dataframe by level to check validity in batches
    for level, valid_criteria in hierarchy.items():
        # Convert valid criteria list to a set for O(1) lookup
        valid_set: Set[str] = set(valid_criteria)

        # Filter dataframe for the current level
        # We assume the dataframe has been cleaned/normalized (Task 4) or we check raw values
        # Here we check raw values, so we must match the case exactly or rely on prior cleaning.
        # The prompt implies we are validating the *raw* df, but strict validation implies
        # we expect correct inputs. We will check exact matches.
        level_mask = raw_expert_survey_df["hierarchy_level"] == level
        subset = raw_expert_survey_df[level_mask]

        if subset.empty:
            continue

        # Check criterion_i column
        # Identify rows where criterion_i is NOT in the valid set
        invalid_i = ~subset["criterion_i"].isin(valid_set)
        if invalid_i.any():
            invalid_labels = subset.loc[invalid_i, "criterion_i"].unique()
            raise ValueError(
                f"Invalid 'criterion_i' labels found in level '{level}': {invalid_labels}. "
                f"Expected one of {valid_criteria}."
            )

        # Check criterion_j column
        # Identify rows where criterion_j is NOT in the valid set
        invalid_j = ~subset["criterion_j"].isin(valid_set)
        if invalid_j.any():
            invalid_labels = subset.loc[invalid_j, "criterion_j"].unique()
            raise ValueError(
                f"Invalid 'criterion_j' labels found in level '{level}': {invalid_labels}. "
                f"Expected one of {valid_criteria}."
            )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Check cardinality of pairwise comparisons per (expert, level)
# -------------------------------------------------------------------------------------------------------------------------------

def validate_pairwise_cardinality(raw_expert_survey_df: pd.DataFrame, hierarchy: Dict[str, List[str]]) -> None:
    """
    Validates that for every expert and hierarchy level, the number of unique pairwise
    comparisons matches exactly the combinatorial requirement n*(n-1)/2.

    This ensures the survey data is complete (no missing comparisons) and minimal
    (no duplicates or self-comparisons).

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.
        hierarchy (Dict[str, List[str]]): The canonical hierarchy mapping.

    Raises:
        ValueError: If the count of unique pairs is incorrect for any expert/level.
    """
    # Group data by expert and hierarchy level
    grouped = raw_expert_survey_df.groupby(["expert_id", "hierarchy_level"])

    # Iterate through each group to verify cardinality
    for (expert_id, level), group in grouped:
        # Retrieve the canonical criteria for this level
        if level not in hierarchy:
            # This should have been caught by domain validation, but as a safeguard:
            raise ValueError(f"Unknown hierarchy level '{level}' found for expert '{expert_id}'")

        criteria_list = hierarchy[level]
        n = len(criteria_list)

        # Calculate expected number of unique unordered pairs
        # Equation: N = n * (n - 1) / 2
        expected_pairs_count = (n * (n - 1)) // 2

        # Extract pairs from the group
        # We canonicalize pairs to handle (A, B) vs (B, A) equivalence by sorting
        # We use a set comprehension to automatically filter exact duplicates
        actual_pairs: Set[Tuple[str, str]] = {
            tuple(sorted((row.criterion_i, row.criterion_j)))
            for row in group.itertuples(index=False)
            if row.criterion_i != row.criterion_j # Exclude self-comparisons if any slipped through
        }

        actual_count = len(actual_pairs)

        # Check for completeness
        if actual_count != expected_pairs_count:
            raise ValueError(
                f"Combinatorial incompleteness for Expert '{expert_id}' at Level '{level}'. "
                f"Expected {expected_pairs_count} unique pairs, found {actual_count}. "
                f"Criteria count n={n}."
            )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_combinatorial_completeness(raw_expert_survey_df: pd.DataFrame) -> bool:
    """
    Orchestrator function for Task 3: Validates the combinatorial integrity of the expert survey.

    1. Retrieves the canonical hierarchy definition.
    2. Validates that all criterion labels match the canonical definitions.
    3. Validates that every expert has provided exactly the required number of unique pairwise comparisons.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.

    Returns:
        bool: True if all validations pass.

    Raises:
        ValueError: If any validation step fails.
    """
    # Step 1: Define canonical criterion sets
    hierarchy = get_canonical_hierarchy_criteria()

    # Step 2: Validate criterion labels against canonical sets
    validate_criterion_labels(raw_expert_survey_df, hierarchy)

    # Step 3: Check cardinality of pairwise comparisons
    validate_pairwise_cardinality(raw_expert_survey_df, hierarchy)

    return True


In [None]:
# Task 4 – Clean and standardize raw_expert_survey_df

# ==============================================================================
# Task 4: Clean and standardize `raw_expert_survey_df`
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Remove invalid or degenerate rows
# -------------------------------------------------------------------------------------------------------------------------------

def remove_degenerate_survey_rows(raw_expert_survey_df: pd.DataFrame) -> pd.DataFrame:
    """
    Removes degenerate rows from the expert survey DataFrame.

    Specifically:
    1. Removes rows where criterion_i == criterion_j (self-comparisons), as the
       diagonal of AHP matrices is implicitly 1.
    2. Validates that no rows have invalid saaty_scale_values (though Task 2
       should have caught this, we enforce it here as a fatal error rather than
       dropping, to preserve combinatorial integrity).

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.

    Returns:
        pd.DataFrame: The cleaned DataFrame with self-comparisons removed.

    Raises:
        ValueError: If invalid saaty_scale_values are detected (data corruption).
    """
    df = raw_expert_survey_df.copy()

    # Identify self-comparisons
    # Logic: AHP matrices have 1s on the diagonal. Explicit rows for i==j are redundant.
    self_comparison_mask = df["criterion_i"] == df["criterion_j"]

    if self_comparison_mask.any():
        # Log or print could be added here in a real system
        # Drop these rows
        df = df[~self_comparison_mask].copy()

    # Check for invalid Saaty values (1-9 integers)
    # We assume Task 2 validation passed, but we double check for safety before processing
    valid_saaty_values = {1, 2, 3, 4, 5, 6, 7, 8, 9}
    # Coerce to numeric to handle potential object types, errors='coerce' turns non-numeric to NaN
    numeric_scales = pd.to_numeric(df["saaty_scale_value"], errors='coerce')

    # Check for NaNs (non-numeric) or values not in set
    is_valid_scale = numeric_scales.isin(valid_saaty_values)

    if not is_valid_scale.all():
        invalid_rows = df[~is_valid_scale]
        raise ValueError(
            f"Found {len(invalid_rows)} rows with invalid 'saaty_scale_value'. "
            f"Values must be integers 1-9. Invalid values: {invalid_rows['saaty_scale_value'].unique()}"
        )

    # Reset index after dropping rows to maintain clean indexing
    df.reset_index(drop=True, inplace=True)

    return df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Normalize string fields to canonical case and strip whitespace
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_survey_strings(cleaned_survey_df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalizes string columns in the survey DataFrame.

    Operations:
    1. Strips leading/trailing whitespace from 'expert_id', 'hierarchy_level',
       'criterion_i', 'criterion_j', 'comparison_type'.
    2. Does NOT force uppercase, as the canonical hierarchy keys (Task 3) are mixed case
       (e.g., 'Main_Criteria', 'Sub_CSR'). We preserve case to ensure matching.

    Args:
        cleaned_survey_df (pd.DataFrame): The DataFrame from Step 1.

    Returns:
        pd.DataFrame: The DataFrame with normalized string columns.
    """
    df = cleaned_survey_df.copy()

    # List of string columns to normalize
    string_cols = ["expert_id", "hierarchy_level", "criterion_i", "criterion_j", "comparison_type"]

    for col in string_cols:
        if col in df.columns:
            # Ensure column is string type before stripping
            # .astype(str) handles potential mixed types or numbers-as-strings
            df[col] = df[col].astype(str).str.strip()

            # Note: We explicitly do NOT .upper() here because 'Main_Criteria' != 'MAIN_CRITERIA'
            # and we must match the canonical keys defined in Task 3.

    return df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Re-verify completeness after cleaning
# -------------------------------------------------------------------------------------------------------------------------------

def verify_post_cleaning_integrity(cleaned_survey_df: pd.DataFrame) -> None:
    """
    Re-runs the combinatorial completeness validation on the cleaned DataFrame.

    This ensures that removing degenerate rows (like self-comparisons) did not
    accidentally create gaps in the required pairwise comparison set.

    Args:
        cleaned_survey_df (pd.DataFrame): The fully cleaned and standardized DataFrame.

    Raises:
        ValueError: If the cleaned data fails combinatorial validation.
    """
    # We reuse the orchestrator from Task 3.
    # Since we are in the same environment, we assume validate_combinatorial_completeness is available.
    # If this were a separate module, we would import it.

    # Call Task 3 orchestrator
    # This will raise ValueError if integrity is compromised
    validate_combinatorial_completeness(cleaned_survey_df)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def clean_and_standardize_survey(raw_expert_survey_df: pd.DataFrame) -> pd.DataFrame:
    """
    Orchestrator function for Task 4: Cleans and standardizes the expert survey data.

    Pipeline:
    1. Remove degenerate rows (self-comparisons).
    2. Normalize string formats (strip whitespace).
    3. Re-verify combinatorial completeness to ensure data integrity.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw input DataFrame.

    Returns:
        pd.DataFrame: The cleaned, standardized, and validated DataFrame ready for AHP matrix construction.

    Raises:
        ValueError: If data integrity checks fail.
    """
    # Step 1: Remove degenerate rows
    df_no_degenerate = remove_degenerate_survey_rows(raw_expert_survey_df)

    # Step 2: Normalize string fields
    df_normalized = normalize_survey_strings(df_no_degenerate)

    # Step 3: Re-verify completeness
    # This acts as a gatekeeper; if cleaning broke the data, we fail here.
    verify_post_cleaning_integrity(df_normalized)

    return df_normalized


In [None]:
# Task 5 – Validate the raw_financial_statement_df DataFrame schema

# ==============================================================================
# Task 5: Validate the `raw_financial_statement_df` DataFrame schema
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1: Check DataFrame type and index
# -------------------------------------------------------------------------------------------------------------------------------

def validate_financial_structure(raw_financial_statement_df: pd.DataFrame) -> pd.DataFrame:
    """
    Validates the structural integrity of the financial statement DataFrame.

    Checks:
    1. Input is a pandas DataFrame.
    2. 'fiscal_year' exists (either as column or index).
    3. Coerces 'fiscal_year' to Int64 and sets it as a column (resetting index if needed).
    4. Sorts the DataFrame by 'fiscal_year' ascending.

    Args:
        raw_financial_statement_df (pd.DataFrame): The raw financial data.

    Returns:
        pd.DataFrame: The structurally validated DataFrame sorted by fiscal year.

    Raises:
        TypeError: If input is not a DataFrame.
        ValueError: If 'fiscal_year' is missing.
    """
    # Check type
    if not isinstance(raw_financial_statement_df, pd.DataFrame):
        raise TypeError(f"Input must be a pandas DataFrame, got {type(raw_financial_statement_df)}")

    df = raw_financial_statement_df.copy()

    # Handle fiscal_year index vs column
    if "fiscal_year" not in df.columns:
        if df.index.name == "fiscal_year":
            df.reset_index(inplace=True)
        else:
            raise ValueError("DataFrame must have a 'fiscal_year' column or index.")

    # Coerce fiscal_year to integer
    try:
        df["fiscal_year"] = df["fiscal_year"].astype("Int64")
    except Exception as e:
        raise ValueError(f"Could not convert 'fiscal_year' to integer: {e}")

    # Sort by year for time-series consistency
    df.sort_values("fiscal_year", ascending=True, inplace=True)
    df.reset_index(drop=True, inplace=True)

    return df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 2: Validate column presence against schema
# -------------------------------------------------------------------------------------------------------------------------------

def validate_financial_columns(raw_financial_statement_df: pd.DataFrame) -> None:
    """
    Validates that the DataFrame contains all required base financial line items.

    Note: Derived fields 'net_working_capital' and 'net_profit_before_interest'
    are NOT enforced here, as they are computed in Task 6. However, their
    source components MUST be present.

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.

    Raises:
        ValueError: If any required base column is missing.
    """
    # Define required base columns (25 items)
    # These are the raw inputs needed to compute the 34 ratios
    required_base_columns: Set[str] = {
        # Balance Sheet
        "total_assets", "current_assets", "inventory", "cash_and_equivalents",
        "net_fixed_assets", "total_debt", "long_term_debt", "short_term_debt",
        "current_liabilities", "shareholders_equity", "retained_earnings",

        # Income Statement
        "sales_revenue", "gross_profit", "ebit", "interest_expense", "net_profit",

        # Cash Flow Statement
        "net_operating_cash_flow", "capital_expenditures", "net_investing_cash_flow",
        "net_financing_cash_flow", "total_cash_flow_inv_fin", "cash_distributions",
        "operating_cash_inflows", "initial_cash_requirements"
    }

    actual_columns = set(raw_financial_statement_df.columns)
    missing_columns = required_base_columns - actual_columns

    if missing_columns:
        raise ValueError(f"Missing required financial base columns: {missing_columns}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Validate temporal coverage and uniqueness
# -------------------------------------------------------------------------------------------------------------------------------

def validate_temporal_coverage(raw_financial_statement_df: pd.DataFrame) -> None:
    """
    Validates that the financial data covers exactly the required time horizon
    (2008-2017) with no duplicates.

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.

    Raises:
        ValueError: If years are missing, extra years exist, or duplicates are found.
    """
    # Define expected years
    expected_years = set(range(2008, 2018)) # 2008 to 2017 inclusive

    # Extract actual years
    actual_years = set(raw_financial_statement_df["fiscal_year"].dropna().unique())

    # Check for missing years
    missing_years = expected_years - actual_years
    if missing_years:
        raise ValueError(f"Missing fiscal years: {missing_years}. Expected 2008-2017.")

    # Check for extra years (strict replication mode)
    extra_years = actual_years - expected_years
    if extra_years:
        raise ValueError(f"Unexpected extra fiscal years found: {extra_years}. Expected only 2008-2017.")

    # Check for duplicates
    # We check the length of the dataframe against the number of unique years
    # Since we already filtered to expected years (implicitly via the set check),
    # if len(df) > len(actual_years), we have duplicates.
    if len(raw_financial_statement_df) != len(actual_years):
        # Find specific duplicates
        duplicates = raw_financial_statement_df[raw_financial_statement_df.duplicated("fiscal_year")]
        raise ValueError(f"Duplicate entries found for fiscal years: {duplicates['fiscal_year'].unique()}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_financial_statements(raw_financial_statement_df: pd.DataFrame) -> pd.DataFrame:
    """
    Orchestrator function for Task 5: Validates the schema and temporal integrity of financial statements.

    Pipeline:
    1. Structural validation (type, index, sorting).
    2. Column presence validation (base fields).
    3. Temporal coverage validation (2008-2017 uniqueness).

    Args:
        raw_financial_statement_df (pd.DataFrame): The raw financial data.

    Returns:
        pd.DataFrame: The validated, sorted DataFrame with 'fiscal_year' as a column.

    Raises:
        ValueError, TypeError: If validation fails.
    """
    # Step 1: Structure and Sort
    df_structured = validate_financial_structure(raw_financial_statement_df)

    # Step 2: Column Presence
    validate_financial_columns(df_structured)

    # Step 3: Temporal Integrity
    validate_temporal_coverage(df_structured)

    return df_structured


In [None]:
# Task 6 – Validate financial identities and enforce derived equalities

# ==============================================================================
# Task 6: Validate financial identities and enforce derived equalities
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1: Check accounting identity for total debt
# -------------------------------------------------------------------------------------------------------------------------------

def validate_total_debt_identity(raw_financial_statement_df: pd.DataFrame) -> None:
    """
    Validates the accounting identity: Total Debt = Short Term Debt + Long Term Debt.

    This ensures that the debt figures reported in the financial statements are
    internally consistent before being used for ratio calculation.

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.

    Raises:
        ValueError: If the identity is violated for any fiscal year.
    """
    df = raw_financial_statement_df

    # Extract relevant columns
    total_debt = df["total_debt"]
    short_term = df["short_term_debt"]
    long_term = df["long_term_debt"]

    # Calculate expected total debt
    # We fill NaNs with 0 for the check if one component is missing but total exists?
    # No, strict validation implies if data is present it must match.
    # If components are NaN, the sum is NaN.
    calculated_total = short_term + long_term

    # Check for mismatches where all values are present
    # We use a small epsilon for floating point comparisons
    # Mask for rows where all three are non-NaN
    valid_mask = total_debt.notna() & short_term.notna() & long_term.notna()

    if not valid_mask.any():
        # If no rows have full debt data, we can't validate, but we don't fail unless required.
        # However, Task 5 ensured columns exist.
        return

    # Compare
    # We use np.isclose for robust float comparison
    matches = np.isclose(total_debt[valid_mask], calculated_total[valid_mask], rtol=1e-5, atol=1e-5)

    if not matches.all():
        mismatched_indices = df.loc[valid_mask][~matches].index
        mismatched_years = df.loc[mismatched_indices, "fiscal_year"].tolist()
        raise ValueError(
            f"Total Debt identity violated for years: {mismatched_years}. "
            f"Total Debt must equal Short Term + Long Term Debt."
        )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 2: Derive net working capital if not provided
# -------------------------------------------------------------------------------------------------------------------------------

def derive_net_working_capital(raw_financial_statement_df: pd.DataFrame) -> pd.DataFrame:
    """
    Derives or enforces 'net_working_capital' based on Current Assets and Current Liabilities.

    Formula: Net Working Capital = Current Assets - Current Liabilities.

    If the column exists, it is overwritten to ensure consistency with the components
    used in other ratios (LR2, etc.).

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.

    Returns:
        pd.DataFrame: The DataFrame with the 'net_working_capital' column enforced.
    """
    df = raw_financial_statement_df.copy()

    # Calculate NWC
    # Equation: NWC = CA - CL
    computed_nwc = df["current_assets"] - df["current_liabilities"]

    # Overwrite or create the column
    # We do this unconditionally to guarantee algebraic consistency
    df["net_working_capital"] = computed_nwc

    return df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Derive net profit before interest
# -------------------------------------------------------------------------------------------------------------------------------

def derive_net_profit_before_interest(raw_financial_statement_df: pd.DataFrame) -> pd.DataFrame:
    """
    Derives or enforces 'net_profit_before_interest'.

    Formula: Net Profit Before Interest = Net Profit + Interest Expense.

    This specific definition is required for Income Risk ratios IR1 and IR5
    as per the study's methodology.

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.

    Returns:
        pd.DataFrame: The DataFrame with the 'net_profit_before_interest' column enforced.
    """
    df = raw_financial_statement_df.copy()

    # Calculate NPBI
    # Equation: NPBI = Net Profit + Interest Expense
    # Note: This adds back interest to get the pre-interest profit
    computed_npbi = df["net_profit"] + df["interest_expense"]

    # Overwrite or create the column
    df["net_profit_before_interest"] = computed_npbi

    return df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def enforce_financial_identities(raw_financial_statement_df: pd.DataFrame) -> pd.DataFrame:
    """
    Orchestrator function for Task 6: Validates identities and enforces derived fields.

    Pipeline:
    1. Validates Total Debt = Short + Long Term Debt.
    2. Derives/Enforces Net Working Capital.
    3. Derives/Enforces Net Profit Before Interest.

    Args:
        raw_financial_statement_df (pd.DataFrame): The validated financial data from Task 5.

    Returns:
        pd.DataFrame: The financial data with consistent derived columns.

    Raises:
        ValueError: If accounting identities are violated.
    """
    # Step 1: Validate Debt Identity
    # This is a check; it does not modify the dataframe
    validate_total_debt_identity(raw_financial_statement_df)

    # Step 2: Derive Net Working Capital
    # This modifies the dataframe
    df_nwc = derive_net_working_capital(raw_financial_statement_df)

    # Step 3: Derive Net Profit Before Interest
    # This modifies the dataframe further
    df_final = derive_net_profit_before_interest(df_nwc)

    return df_final


In [None]:
# Task 7 – Handle missing values and zero denominators in financial data

# ==============================================================================
# Task 7: Handle missing values and zero denominators in financial data
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1: Inspect for missing or anomalous values
# -------------------------------------------------------------------------------------------------------------------------------

def inspect_financial_anomalies(raw_financial_statement_df: pd.DataFrame) -> Dict[str, Any]:
    """
    Inspects the financial DataFrame for missing values (NaNs) and anomalous negative values
    in fields that are strictly non-negative.

    Critical fields (Total Assets, Shareholders' Equity) are checked strictly; missing values
    there trigger a ValueError. Other anomalies are logged in the return dictionary.

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.

    Returns:
        Dict[str, Any]: A report containing counts of NaNs and negative values per column.

    Raises:
        ValueError: If critical fields contain NaNs or invalid negative values (e.g., negative Total Assets).
    """
    df = raw_financial_statement_df
    report = {"nan_counts": {}, "negative_counts": {}}

    # Define fields that must strictly be non-negative
    # Note: Equity can be negative in distress, but Assets cannot.
    strictly_positive_fields = {
        "total_assets", "current_assets", "net_fixed_assets",
        "total_debt", "long_term_debt", "short_term_debt", "current_liabilities"
    }

    # Define critical fields where missing data is fatal for the model
    critical_fields = {"total_assets", "shareholders_equity", "net_operating_cash_flow"}

    for col in df.columns:
        if not pd.api.types.is_numeric_dtype(df[col]):
            continue

        # Check NaNs
        nan_count = df[col].isna().sum()
        if nan_count > 0:
            report["nan_counts"][col] = int(nan_count)
            if col in critical_fields:
                raise ValueError(f"Critical field '{col}' has {nan_count} missing values.")

        # Check Negatives
        if col in strictly_positive_fields:
            neg_count = (df[col] < 0).sum()
            if neg_count > 0:
                report["negative_counts"][col] = int(neg_count)
                # Negative assets are physically impossible and indicate data error
                if col == "total_assets":
                    raise ValueError(f"Total Assets contains {neg_count} negative values.")

    return report

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 2: Identify potential zero denominators for ratio computation
# -------------------------------------------------------------------------------------------------------------------------------

def identify_zero_denominators(
    raw_financial_statement_df: pd.DataFrame,
    feature_engineering_logic: Dict[str, Dict[str, str]]
) -> pd.DataFrame:
    """
    Identifies (Year, Ratio) pairs where the denominator is effectively zero,
    which would lead to division-by-zero errors or infinite ratios.

    It evaluates the denominator expression for each ratio and checks against a
    small epsilon tolerance.

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.
        feature_engineering_logic (Dict): Configuration mapping ratio IDs to numerator/denominator expressions.

    Returns:
        pd.DataFrame: A boolean DataFrame (index=fiscal_year, columns=ratio_ids)
                      where True indicates a zero denominator.
    """
    df = raw_financial_statement_df.set_index("fiscal_year")
    ratio_ids = list(feature_engineering_logic.keys())

    # Initialize mask with False
    zero_mask = pd.DataFrame(False, index=df.index, columns=ratio_ids)

    epsilon = 1e-9

    for ratio_id, logic in feature_engineering_logic.items():
        den_expr = logic["denominator"]

        # Evaluate denominator expression
        # We use pd.eval for safe evaluation of expressions like "current_assets - inventory"
        # The local context is the dataframe columns
        try:
            # pd.eval works on the dataframe context
            den_values = df.eval(den_expr)
        except Exception as e:
            # Fallback: if eval fails (e.g., simple column name with special chars?), try direct access
            if den_expr in df.columns:
                den_values = df[den_expr]
            else:
                raise ValueError(f"Could not evaluate denominator '{den_expr}' for ratio '{ratio_id}': {e}")

        # Check for near-zero values
        # We use abs() to handle negative denominators (which are valid but not if 0)
        is_zero = den_values.abs() < epsilon

        if is_zero.any():
            zero_mask[ratio_id] = is_zero

    return zero_mask

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 3: Decide on policy for zero-denominator ratios
# -------------------------------------------------------------------------------------------------------------------------------

def apply_zero_denominator_policy(zero_mask: pd.DataFrame) -> pd.DataFrame:
    """
    Formalizes the policy for handling zero denominators.

    Policy:
    - If the denominator is zero, the ratio value is undefined (NaN).
    - We do NOT substitute epsilon, as this distorts the magnitude.
    - We do NOT set to zero, as that implies low risk/value which might be false.

    This function returns the mask to be used by the ratio computer to force NaNs.
    It serves as a documentation and enforcement point for this policy.

    Args:
        zero_mask (pd.DataFrame): The boolean mask from Step 2.

    Returns:
        pd.DataFrame: The validated mask to be used for NaN enforcement.
    """
    # The policy is strictly to treat these as NaNs.
    # We return the mask as-is, but this function exists to make the decision explicit
    # in the pipeline architecture.
    return zero_mask

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def handle_missing_and_zero_values(
    raw_financial_statement_df: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Tuple[Dict[str, Any], pd.DataFrame]:
    """
    Orchestrator function for Task 7: Detects data anomalies and zero-denominator conditions.

    Pipeline:
    1. Inspects for missing/negative values in critical fields.
    2. Identifies ratios where the denominator evaluates to zero.
    3. Returns a diagnostic report and a mask of zero-denominator entries.

    Args:
        raw_financial_statement_df (pd.DataFrame): The financial data.
        study_configuration (Dict): The full study configuration containing feature logic.

    Returns:
        Tuple[Dict, pd.DataFrame]:
            - Diagnostic report (dict).
            - Zero denominator mask (DataFrame, True = zero/undefined).

    Raises:
        ValueError: If critical data quality issues are found.
    """
    # Step 1: Inspect Anomalies
    report = inspect_financial_anomalies(raw_financial_statement_df)

    # Step 2: Identify Zero Denominators
    feature_logic = study_configuration["feature_engineering_logic"]
    zero_mask_raw = identify_zero_denominators(raw_financial_statement_df, feature_logic)

    # Step 3: Apply Policy
    zero_mask_final = apply_zero_denominator_policy(zero_mask_raw)

    return report, zero_mask_final


In [None]:
# Task 8 – Define the hierarchical structure and criterion sets for AHP

# ==============================================================================
# Task 8: Define the hierarchical structure and criterion sets for AHP
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1, 2, 3: Define Hierarchy Class and Structures
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass(frozen=True)
class AHPHierarchy:
    """
    Encapsulates the hierarchical structure of the Analytic Hierarchy Process (AHP) model
    for Financial Risk Assessment.

    This class serves as the immutable, single source of truth for the structural definitions
    required by the AHP methodology. It defines the decomposition of the financial risk
    problem into a goal, main criteria, and sub-criteria (ratios), and provides O(1)
    access to structural relationships.

    Attributes:
        levels (Dict[str, List[str]]):
            A dictionary mapping hierarchy level identifiers (e.g., "Main_Criteria", "Sub_CSR")
            to their corresponding ordered lists of criterion labels. The order defined here
            is canonical and determines the row/column indices for pairwise comparison matrices.

        parent_mapping (Dict[str, str]):
            A dictionary mapping each secondary criterion label (e.g., "CSR1") to its
            parent main criterion label (e.g., "CSR"). This is essential for computing
            global weights via hierarchical composition:
            w_global = w_main * w_local.

        canonical_ratio_order (List[str]):
            The fixed, global ordered list of all 34 secondary criteria (ratios). This order
            is used to align the columns of the decision matrix X with the global weight vector w.

        label_to_index (Dict[str, Dict[str, int]]):
            A nested dictionary mapping hierarchy levels to a mapping of criterion labels
            to their integer indices (0 to n-1). Used for efficient matrix population.
    """

    # Default factory defines the static structure derived from the study's taxonomy.
    levels: Dict[str, List[str]] = field(default_factory=lambda: {
        "Main_Criteria": ["CSR", "LR", "IR", "CFR"],
        "Sub_CSR": [
            "CSR1", "CSR2", "CSR3", "CSR4", "CSR5",
            "CSR6", "CSR7", "CSR8", "CSR9", "CSR10", "CSR11"
        ],
        "Sub_LR": ["LR1", "LR2", "LR3"],
        "Sub_IR": ["IR1", "IR2", "IR3", "IR4", "IR5", "IR6"],
        "Sub_CFR": [
            "CFR1", "CFR2", "CFR3", "CFR4", "CFR5",
            "CFR6", "CFR7", "CFR8", "CFR9", "CFR10",
            "CFR11", "CFR12", "CFR13", "CFR14"
        ]
    })

    # Fields initialized in __post_init__
    parent_mapping: Dict[str, str] = field(init=False)
    canonical_ratio_order: List[str] = field(init=False)
    label_to_index: Dict[str, Dict[str, int]] = field(init=False)

    def __post_init__(self) -> None:
        """
        Initializes derived structural attributes (`parent_mapping`, `canonical_ratio_order`,
        `label_to_index`) based on the `levels` definition.

        This method constructs the reverse mappings and global orderings required for
        efficient lookups during the AHP and SAW calculation phases.
        """
        # 1. Build Parent Mapping and Global Order
        # We map the sub-level keys to the main criterion codes they represent.
        # This relationship undergirds the hierarchical weight composition.
        level_to_parent_code: Dict[str, str] = {
            "Sub_CSR": "CSR",
            "Sub_LR": "LR",
            "Sub_IR": "IR",
            "Sub_CFR": "CFR"
        }

        mapping: Dict[str, str] = {}
        ratio_order: List[str] = []

        # Iterate through sub-levels in a deterministic order to build the global list.
        # The order of keys here determines the order of columns in the final decision matrix.
        sub_levels: List[str] = ["Sub_CSR", "Sub_LR", "Sub_IR", "Sub_CFR"]

        for sub_level in sub_levels:
            # Identify the parent main criterion for this group
            parent_code = level_to_parent_code[sub_level]

            # Retrieve the list of secondary criteria for this group
            criteria = self.levels[sub_level]

            # Extend the global canonical order list
            ratio_order.extend(criteria)

            # Populate the parent mapping for each criterion in the group
            for criterion in criteria:
                mapping[criterion] = parent_code

        # Use object.__setattr__ to bypass frozen dataclass immutability constraints during initialization
        object.__setattr__(self, 'parent_mapping', mapping)
        object.__setattr__(self, 'canonical_ratio_order', ratio_order)

        # 2. Build Label to Index Mapping for each level
        # This allows O(1) lookup of the matrix index (row/col) for a given criterion label.
        # Structure: {level_name: {criterion_label: index}}
        l_to_i: Dict[str, Dict[str, int]] = {}

        for level, criteria in self.levels.items():
            l_to_i[level] = {label: idx for idx, label in enumerate(criteria)}

        object.__setattr__(self, 'label_to_index', l_to_i)

    def get_criteria(self, level: str) -> List[str]:
        """
        Retrieves the ordered list of criteria labels for a specific hierarchy level.

        This list defines the canonical order of rows and columns for the pairwise
        comparison matrix associated with the specified level.

        Args:
            level (str): The hierarchy level identifier (e.g., "Main_Criteria", "Sub_CSR").

        Returns:
            List[str]: The ordered list of criterion labels.

        Raises:
            ValueError: If the provided `level` key does not exist in the hierarchy definition.
        """
        # Validate that the requested level exists in the hierarchy
        if level not in self.levels:
            raise ValueError(f"Unknown hierarchy level: '{level}'. Available levels: {list(self.levels.keys())}")

        return self.levels[level]

    def get_matrix_size(self, level: str) -> int:
        """
        Returns the dimension (n) of the pairwise comparison matrix for a given hierarchy level.

        This corresponds to the number of criteria being compared at that specific node
        of the hierarchy.

        Args:
            level (str): The hierarchy level identifier.

        Returns:
            int: The number of criteria at that level (n).

        Raises:
            ValueError: If the provided `level` key does not exist (propagated from get_criteria).
        """
        # Retrieve criteria list (validation happens inside get_criteria)
        criteria_list = self.get_criteria(level)

        # Return the length of the list
        return len(criteria_list)

    def get_parent(self, secondary_criterion: str) -> str:
        """
        Returns the parent main criterion label for a given secondary criterion (ratio).

        This lookup is used during the calculation of global weights, where the local weight
        of a secondary criterion is multiplied by the weight of its parent main criterion.

        Args:
            secondary_criterion (str): The secondary criterion label (e.g., "CSR1").

        Returns:
            str: The label of the parent main criterion (e.g., "CSR").

        Raises:
            ValueError: If the provided criterion label is not a known secondary criterion.
        """
        # Validate that the criterion exists in the parent mapping
        if secondary_criterion not in self.parent_mapping:
            raise ValueError(f"Unknown secondary criterion: '{secondary_criterion}'. Cannot determine parent.")

        return self.parent_mapping[secondary_criterion]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def initialize_ahp_hierarchy() -> AHPHierarchy:
    """
    Orchestrator function for Task 8: Instantiates and returns the AHP Hierarchy configuration object.

    This object contains all structural definitions required for AHP matrix construction
    and weight aggregation.

    Returns:
        AHPHierarchy: The configured hierarchy object.
    """
    return AHPHierarchy()


In [None]:
# Task 9 – Build pairwise comparison matrices for each (expert, hierarchy_level) combination

# ==========================================================================================
# Task 9: Build pairwise comparison matrices for each (expert, hierarchy_level) combination
# ==========================================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 1: Initialize square matrices with diagonal ones
# -------------------------------------------------------------------------------------------------------------------------------

def initialize_matrices(
    cleaned_survey_df: pd.DataFrame,
    hierarchy: 'AHPHierarchy'
) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Initializes empty pairwise comparison matrices for every expert and hierarchy level
    found in the survey data.

    Matrices are initialized with NaNs off-diagonal and 1.0 on the diagonal.

    Args:
        cleaned_survey_df (pd.DataFrame): The validated expert survey data.
        hierarchy (AHPHierarchy): The configuration object containing structure definitions.

    Returns:
        Dict[str, Dict[str, np.ndarray]]: A nested dictionary structure:
            matrices[expert_id][hierarchy_level] -> np.ndarray (n x n)
    """
    matrices: Dict[str, Dict[str, np.ndarray]] = {}

    # Identify all unique (expert, level) combinations
    groups = cleaned_survey_df.groupby(["expert_id", "hierarchy_level"])

    for (expert_id, level), _ in groups:
        # Ensure the expert dict exists
        if expert_id not in matrices:
            matrices[expert_id] = {}

        # Get matrix dimension n for this level
        # We use the hierarchy object to get the canonical size
        try:
            n = hierarchy.get_matrix_size(level)
        except ValueError as e:
            raise ValueError(f"Unknown hierarchy level '{level}' in survey data: {e}")

        # Initialize n x n matrix with NaNs
        # We use float64 for precision
        mat = np.full((n, n), np.nan, dtype=np.float64)

        # Set diagonal to 1.0
        np.fill_diagonal(mat, 1.0)

        matrices[expert_id][level] = mat

    return matrices

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 2: Populate direct entries from `raw_expert_survey_df`
# -------------------------------------------------------------------------------------------------------------------------------

def populate_direct_entries(
    matrices: Dict[str, Dict[str, np.ndarray]],
    cleaned_survey_df: pd.DataFrame,
    hierarchy: 'AHPHierarchy'
) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Populates the initialized matrices with the direct pairwise judgments from the survey.

    Maps criterion labels to integer indices using the hierarchy configuration.

    Args:
        matrices (Dict): The initialized nested dictionary of matrices.
        cleaned_survey_df (pd.DataFrame): The survey data.
        hierarchy (AHPHierarchy): The configuration object for label-to-index mapping.

    Returns:
        Dict: The matrices with direct entries filled.

    Raises:
        ValueError: If a criterion label is not found in the hierarchy or if overwriting occurs.
    """
    # Iterate over each row in the survey
    # Using itertuples for performance, though dataset is small
    for row in cleaned_survey_df.itertuples(index=False):
        expert_id = row.expert_id
        level = row.hierarchy_level
        crit_i = row.criterion_i
        crit_j = row.criterion_j
        value = float(row.saaty_scale_value)

        # Retrieve the specific matrix
        # Note: initialize_matrices ensures existence, but we check for safety
        if expert_id not in matrices or level not in matrices[expert_id]:
            # This implies the grouping logic in step 1 missed something or df changed
            raise ValueError(f"Matrix not initialized for Expert '{expert_id}', Level '{level}'")

        mat = matrices[expert_id][level]

        # Map labels to indices
        # hierarchy.label_to_index is Dict[level, Dict[label, int]]
        try:
            idx_i = hierarchy.label_to_index[level][crit_i]
            idx_j = hierarchy.label_to_index[level][crit_j]
        except KeyError as e:
            raise ValueError(f"Criterion label mapping failed for '{e}' in level '{level}'")

        # Assign value
        # Check for overwrite (should be caught by validation, but defensive coding)
        if not np.isnan(mat[idx_i, idx_j]) and mat[idx_i, idx_j] != value:
             # If it's the diagonal, it must be 1.0. If input says otherwise, it's an error.
             # But we filtered self-comparisons in Task 4.
             # So this implies duplicate rows for (i, j).
             raise ValueError(
                 f"Duplicate conflicting entry for Expert '{expert_id}', Level '{level}', "
                 f"Pair ({crit_i}, {crit_j}). Existing: {mat[idx_i, idx_j]}, New: {value}"
             )

        mat[idx_i, idx_j] = value

    return matrices

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 3: Enforce reciprocity to fill the lower (or upper) triangle
# -------------------------------------------------------------------------------------------------------------------------------

def enforce_reciprocity(matrices: Dict[str, Dict[str, np.ndarray]]) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Completes the pairwise comparison matrices by enforcing the reciprocity property:
    A_ji = 1 / A_ij.

    Also validates that the resulting matrices are fully populated (no NaNs).

    Args:
        matrices (Dict): The partially populated matrices.

    Returns:
        Dict: The fully populated, reciprocal matrices.

    Raises:
        ValueError: If a matrix cannot be fully populated (missing pairs) or contains conflicts.
    """
    for expert_id, levels_dict in matrices.items():
        for level, mat in levels_dict.items():
            n = mat.shape[0]

            # Iterate over the upper triangle (excluding diagonal)
            # We assume inputs might be in upper or lower, so we check both
            for i in range(n):
                for j in range(i + 1, n):
                    val_ij = mat[i, j]
                    val_ji = mat[j, i]

                    has_ij = not np.isnan(val_ij)
                    has_ji = not np.isnan(val_ji)

                    if has_ij and not has_ji:
                        # Fill reciprocal
                        mat[j, i] = 1.0 / val_ij
                    elif not has_ij and has_ji:
                        # Fill reciprocal
                        mat[i, j] = 1.0 / val_ji
                    elif has_ij and has_ji:
                        # Both set: check consistency
                        # Reciprocity check: val_ij * val_ji should be approx 1.0
                        if not np.isclose(val_ij * val_ji, 1.0, rtol=1e-5):
                            raise ValueError(
                                f"Reciprocity violation for Expert '{expert_id}', Level '{level}', "
                                f"Indices ({i}, {j}). Values: {val_ij}, {val_ji}"
                            )
                    else:
                        # Neither set: Missing pair
                        # We raise error after the loop to catch all, or here immediately.
                        # Immediate failure is clearer for debugging specific pairs.
                        raise ValueError(
                            f"Missing pairwise comparison for Expert '{expert_id}', Level '{level}', "
                            f"Indices ({i}, {j}). Matrix is incomplete."
                        )

            # Final check for any remaining NaNs (should be covered above, but as safeguard)
            if np.isnan(mat).any():
                raise ValueError(f"Matrix for Expert '{expert_id}', Level '{level}' contains NaNs after reciprocity enforcement.")

    return matrices

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def build_ahp_matrices(
    cleaned_survey_df: pd.DataFrame,
    hierarchy: 'AHPHierarchy'
) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Orchestrator function for Task 9: Constructs full AHP pairwise comparison matrices.

    Pipeline:
    1. Initialize n x n matrices for all expert/level combinations.
    2. Populate matrices with direct survey judgments.
    3. Enforce reciprocity (A_ji = 1/A_ij) to fill missing triangles and validate completeness.

    Args:
        cleaned_survey_df (pd.DataFrame): The validated and cleaned survey data.
        hierarchy (AHPHierarchy): The hierarchy configuration object.

    Returns:
        Dict[str, Dict[str, np.ndarray]]: A nested dictionary of fully constructed AHP matrices.
            Structure: result[expert_id][hierarchy_level] -> np.ndarray
    """
    # Step 1: Initialize
    matrices_init = initialize_matrices(cleaned_survey_df, hierarchy)

    # Step 2: Populate Direct
    matrices_populated = populate_direct_entries(matrices_init, cleaned_survey_df, hierarchy)

    # Step 3: Enforce Reciprocity
    matrices_final = enforce_reciprocity(matrices_populated)

    return matrices_final


In [None]:
# Task 10 – Compute column normalization and local weight vectors

# ==============================================================================
# Task 10: Compute column normalization and local weight vectors
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1: Compute column sums for each matrix
# -------------------------------------------------------------------------------------------------------------------------------

def compute_column_sums(matrices: Dict[str, Dict[str, np.ndarray]]) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Computes the column sums for each pairwise comparison matrix.

    This is the first step of the AHP Row Average Method. It also validates that
    matrices contain no NaNs and that column sums are positive (valid positive reciprocal matrices).

    Args:
        matrices (Dict): Nested dictionary of AHP matrices [expert][level] -> np.ndarray.

    Returns:
        Dict: Nested dictionary of column sum vectors [expert][level] -> np.ndarray (1D).

    Raises:
        ValueError: If a matrix contains NaNs or has non-positive column sums.
    """
    column_sums: Dict[str, Dict[str, np.ndarray]] = {}

    for expert_id, levels in matrices.items():
        column_sums[expert_id] = {}
        for level, mat in levels.items():
            # Check for NaNs (defensive, though Task 9 should have cleared them)
            if np.isnan(mat).any():
                raise ValueError(f"Matrix for Expert '{expert_id}', Level '{level}' contains NaNs.")

            # Compute column sums
            # axis=0 sums down the rows (collapsing to columns)
            sums = np.sum(mat, axis=0)

            # Validate positivity
            if np.any(sums <= 0):
                raise ValueError(f"Non-positive column sums found for Expert '{expert_id}', Level '{level}'.")

            column_sums[expert_id][level] = sums

    return column_sums

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2: Normalize each column
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_matrices(
    matrices: Dict[str, Dict[str, np.ndarray]],
    column_sums: Dict[str, Dict[str, np.ndarray]]
) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Normalizes each pairwise comparison matrix by dividing each element by its column sum.

    After this step, the sum of each column in the normalized matrix should be exactly 1.0.

    Args:
        matrices (Dict): The original AHP matrices.
        column_sums (Dict): The column sums computed in Step 1.

    Returns:
        Dict: Nested dictionary of normalized matrices.
    """
    normalized_matrices: Dict[str, Dict[str, np.ndarray]] = {}

    for expert_id, levels in matrices.items():
        normalized_matrices[expert_id] = {}
        for level, mat in levels.items():
            sums = column_sums[expert_id][level]

            # Broadcasting division: (n, n) / (n,) divides each column j by sums[j]
            norm_mat = mat / sums

            # Verify normalization (sum of columns should be 1)
            # We use a small epsilon for float comparison
            check_sums = np.sum(norm_mat, axis=0)
            if not np.allclose(check_sums, 1.0, atol=1e-10):
                raise ValueError(f"Normalization failed for Expert '{expert_id}', Level '{level}'. Column sums != 1.")

            normalized_matrices[expert_id][level] = norm_mat

    return normalized_matrices

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 3: Compute local weight vector via row averaging
# -------------------------------------------------------------------------------------------------------------------------------

def compute_row_averages(normalized_matrices: Dict[str, Dict[str, np.ndarray]]) -> Dict[str, Dict[str, np.ndarray]]:
    """
    Computes the local weight vector (eigenvector approximation) by averaging the rows
    of the normalized matrix.

    The resulting vector is re-normalized to ensure it sums exactly to 1.0.

    Args:
        normalized_matrices (Dict): The normalized matrices from Step 2.

    Returns:
        Dict: Nested dictionary of local weight vectors [expert][level] -> np.ndarray (1D).
    """
    local_weights: Dict[str, Dict[str, np.ndarray]] = {}

    for expert_id, levels in normalized_matrices.items():
        local_weights[expert_id] = {}
        for level, norm_mat in levels.items():
            # Compute row means
            # axis=1 sums across columns (collapsing to rows)
            weights = np.mean(norm_mat, axis=1)

            # Re-normalize to ensure sum is exactly 1.0
            # (Row averaging usually sums to 1, but float precision might drift)
            total_weight = np.sum(weights)
            if total_weight == 0:
                 raise ValueError(f"Zero weight vector for Expert '{expert_id}', Level '{level}'.")

            weights = weights / total_weight

            local_weights[expert_id][level] = weights

    return local_weights

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_ahp_local_weights(
    matrices: Dict[str, Dict[str, np.ndarray]]
) -> Tuple[Dict[str, Dict[str, np.ndarray]], Dict[str, Dict[str, np.ndarray]]]:
    """
    Orchestrator function for Task 10: Computes local AHP weights using the Row Average Method.

    Pipeline:
    1. Compute column sums of the original matrices.
    2. Normalize matrices by column sums.
    3. Compute row averages of normalized matrices to get local weights.

    Args:
        matrices (Dict): The fully constructed pairwise comparison matrices.

    Returns:
        Tuple[Dict, Dict]:
            - local_weights: The computed weight vectors [expert][level] -> np.ndarray.
            - column_sums: The column sums of original matrices (needed for Lambda_max calculation in Task 11).
    """
    # Step 1: Column Sums
    # We return these because Task 11 needs them for Lambda_max = dot(weights, column_sums)
    col_sums = compute_column_sums(matrices)

    # Step 2: Normalize
    norm_matrices = normalize_matrices(matrices, col_sums)

    # Step 3: Row Averages (Weights)
    weights = compute_row_averages(norm_matrices)

    return weights, col_sums


In [None]:
# Task 11 – Compute maximum eigenvalue and consistency index (CI) for each matrix.

import numpy as np
from typing import Dict, Any, Tuple

# ==============================================================================
# Task 11: Compute maximum eigenvalue and consistency index (CI) for each matrix
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1: Compute approximate maximum eigenvalue (Lambda_max)
# -------------------------------------------------------------------------------------------------------------------------------

def compute_lambda_max(
    local_weights: Dict[str, Dict[str, np.ndarray]],
    column_sums: Dict[str, Dict[str, np.ndarray]]
) -> Dict[str, Dict[str, float]]:
    """
    Computes the approximate maximum eigenvalue (Lambda_max) for each AHP matrix.

    Formula: Lambda_max = sum(w_i * S_i)
    Where w_i is the local weight for criterion i, and S_i is the column sum for criterion i.

    Args:
        local_weights (Dict): Nested dictionary of local weight vectors.
        column_sums (Dict): Nested dictionary of column sum vectors.

    Returns:
        Dict: Nested dictionary of Lambda_max scalars [expert][level] -> float.
    """
    lambda_max_values: Dict[str, Dict[str, float]] = {}

    for expert_id, levels in local_weights.items():
        lambda_max_values[expert_id] = {}
        for level, weights in levels.items():
            # Retrieve corresponding column sums
            if expert_id not in column_sums or level not in column_sums[expert_id]:
                raise ValueError(f"Missing column sums for Expert '{expert_id}', Level '{level}'.")

            sums = column_sums[expert_id][level]

            # Validate dimensions
            if weights.shape != sums.shape:
                raise ValueError(
                    f"Dimension mismatch for Expert '{expert_id}', Level '{level}': "
                    f"weights {weights.shape} vs sums {sums.shape}."
                )

            # Compute dot product
            l_max = np.dot(weights, sums)

            lambda_max_values[expert_id][level] = float(l_max)

    return lambda_max_values

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 2: Compute Consistency Index (CI)
# -------------------------------------------------------------------------------------------------------------------------------

def compute_consistency_index(
    lambda_max_values: Dict[str, Dict[str, float]],
    hierarchy: 'AHPHierarchy'
) -> Dict[str, Dict[str, float]]:
    """
    Computes the Consistency Index (CI) for each AHP matrix.

    Formula: CI = (Lambda_max - n) / (n - 1)

    Args:
        lambda_max_values (Dict): The computed Lambda_max values.
        hierarchy (AHPHierarchy): The hierarchy configuration to retrieve matrix size n.

    Returns:
        Dict: Nested dictionary of CI scalars [expert][level] -> float.
    """
    ci_values: Dict[str, Dict[str, float]] = {}

    for expert_id, levels in lambda_max_values.items():
        ci_values[expert_id] = {}
        for level, l_max in levels.items():
            # Get matrix size n
            n = hierarchy.get_matrix_size(level)

            if n <= 1:
                # CI is 0 for 1x1 matrices (perfectly consistent)
                ci = 0.0
            else:
                ci = (l_max - n) / (n - 1)

            # Theoretical lower bound check (CI >= 0 for positive reciprocal matrices)
            # Allow small epsilon for float noise
            if ci < -1e-9:
                 # This indicates a calculation error or invalid matrix properties
                 raise ValueError(f"Negative CI ({ci}) computed for Expert '{expert_id}', Level '{level}'.")

            ci_values[expert_id][level] = max(0.0, ci) # Clamp negative epsilon to 0

    return ci_values

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 3: Retrieve Random Index (RI) from lookup table
# -------------------------------------------------------------------------------------------------------------------------------

def retrieve_random_indices(
    ci_values: Dict[str, Dict[str, float]],
    hierarchy: 'AHPHierarchy',
    ri_lookup: Dict[int, float]
) -> Dict[str, Dict[str, float]]:
    """
    Retrieves the Random Index (RI) for each matrix based on its order n.

    Args:
        ci_values (Dict): The CI values structure (used to iterate keys).
        hierarchy (AHPHierarchy): The hierarchy configuration for matrix size n.
        ri_lookup (Dict[int, float]): The Saaty RI lookup table {n: RI}.

    Returns:
        Dict: Nested dictionary of RI scalars [expert][level] -> float.
    """
    ri_values: Dict[str, Dict[str, float]] = {}

    # Retrieve the Random Index (RI) for each matrix based on its order n.
    for expert_id, levels in ci_values.items():
        ri_values[expert_id] = {}
        for level in levels:
            n = hierarchy.get_matrix_size(level)

            if n not in ri_lookup:
                raise ValueError(f"Matrix size n={n} not found in RI lookup table for Level '{level}'.")

            ri_values[expert_id][level] = ri_lookup[n]

    return ri_values

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_ahp_consistency_metrics(
    local_weights: Dict[str, Dict[str, np.ndarray]],
    column_sums: Dict[str, Dict[str, np.ndarray]],
    hierarchy: 'AHPHierarchy',
    ahp_parameters: Dict[str, Any]
) -> Tuple[Dict[str, Dict[str, float]], Dict[str, Dict[str, float]], Dict[str, Dict[str, float]]]:
    """
    Orchestrator function for Task 11: Computes AHP consistency metrics (Lambda_max, CI, RI).

    Pipeline:
    1. Compute Lambda_max using weights and column sums.
    2. Compute CI using Lambda_max and matrix size n.
    3. Retrieve RI using matrix size n and the provided lookup table.

    Args:
        local_weights (Dict): Local weight vectors.
        column_sums (Dict): Column sum vectors.
        hierarchy (AHPHierarchy): Hierarchy configuration.
        ahp_parameters (Dict): Configuration containing the 'random_index_lookup' table.

    Returns:
        Tuple[Dict, Dict, Dict]:
            - lambda_max_values: [expert][level] -> float
            - ci_values: [expert][level] -> float
            - ri_values: [expert][level] -> float
    """
    # Extract RI lookup table
    ri_lookup = ahp_parameters.get("random_index_lookup")
    if not ri_lookup:
        raise ValueError("ahp_parameters missing 'random_index_lookup'.")

    # Ensure keys are integers (JSON keys might be strings)
    ri_lookup_int = {int(k): float(v) for k, v in ri_lookup.items()}

    # Step 1: Lambda Max
    lambda_max = compute_lambda_max(local_weights, column_sums)

    # Step 2: Consistency Index
    ci = compute_consistency_index(lambda_max, hierarchy)

    # Step 3: Random Index
    ri = retrieve_random_indices(ci, hierarchy, ri_lookup_int)

    return lambda_max, ci, ri


In [None]:
# Task 12 – Compute Consistency Ratio (CR) and filter acceptable matrices

# ==============================================================================
# Task 12: Compute Consistency Ratio (CR) and filter acceptable matrices
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1: Compute Consistency Ratio (CR)
# -------------------------------------------------------------------------------------------------------------------------------

def compute_consistency_ratio(
    ci_values: Dict[str, Dict[str, float]],
    ri_values: Dict[str, Dict[str, float]]
) -> Dict[str, Dict[str, float]]:
    """
    Computes the Consistency Ratio (CR) for each AHP matrix.

    Formula: CR = CI / RI
    If RI is 0 (for n=1, 2), CR is defined as 0.

    Args:
        ci_values (Dict): Nested dictionary of Consistency Indices [expert][level] -> float.
        ri_values (Dict): Nested dictionary of Random Indices [expert][level] -> float.

    Returns:
        Dict: Nested dictionary of Consistency Ratios [expert][level] -> float.
    """
    cr_values: Dict[str, Dict[str, float]] = {}

    # Compute the Consistency Ratio (CR) for each AHP matrix
    for expert_id, levels in ci_values.items():
        cr_values[expert_id] = {}
        for level, ci in levels.items():
            if expert_id not in ri_values or level not in ri_values[expert_id]:
                raise ValueError(f"Missing RI value for Expert '{expert_id}', Level '{level}'.")

            ri = ri_values[expert_id][level]

            if ri == 0:
                # For n=1 or n=2, RI is 0. Perfect consistency is assumed/enforced.
                cr = 0.0
            else:
                cr = ci / ri

            cr_values[expert_id][level] = cr

    return cr_values

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 2: Apply consistency threshold and classify matrices
# -------------------------------------------------------------------------------------------------------------------------------

def classify_matrices(
    cr_values: Dict[str, Dict[str, float]],
    threshold: float
) -> Dict[str, Set[str]]:
    """
    Classifies matrices as accepted or rejected based on the Consistency Ratio threshold.

    Args:
        cr_values (Dict): Nested dictionary of CR values.
        threshold (float): The maximum allowed CR (typically 0.10).

    Returns:
        Dict[str, Set[str]]: A dictionary mapping hierarchy levels to sets of accepted expert IDs.
            Structure: {level: {expert_id1, expert_id2, ...}}
    """
    accepted_experts: Dict[str, Set[str]] = {}

    # Initialize sets for all levels found in the input
    # We iterate to find all unique levels first
    all_levels = set()
    for levels in cr_values.values():
        all_levels.update(levels.keys())

    for level in all_levels:
        accepted_experts[level] = set()

    for expert_id, levels in cr_values.items():
        for level, cr in levels.items():
            # Strict inequality: CR < 0.10 is accepted
            if cr < threshold:
                accepted_experts[level].add(expert_id)
            # Else: rejected (implicitly not added)

    return accepted_experts

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Verify that at least some experts remain per level
# -------------------------------------------------------------------------------------------------------------------------------

def verify_accepted_experts(accepted_experts: Dict[str, Set[str]]) -> None:
    """
    Verifies that at least one expert has been accepted for every hierarchy level.

    If any level has zero accepted experts, the AHP aggregation cannot proceed.

    Args:
        accepted_experts (Dict): The mapping of levels to accepted expert sets.

    Raises:
        ValueError: If any level has no accepted experts.
    """
    for level, experts in accepted_experts.items():
        if not experts:
            raise ValueError(
                f"No experts passed the consistency check for Level '{level}'. "
                "Cannot proceed with aggregation. Review expert judgments."
            )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def filter_consistent_matrices(
    ci_values: Dict[str, Dict[str, float]],
    ri_values: Dict[str, Dict[str, float]],
    ahp_parameters: Dict[str, Any]
) -> Tuple[Dict[str, Dict[str, float]], Dict[str, Set[str]]]:
    """
    Orchestrator function for Task 12: Computes CR and filters matrices based on consistency.

    Pipeline:
    1. Compute CR = CI / RI.
    2. Classify matrices as accepted/rejected based on threshold (0.10).
    3. Verify that every level has at least one accepted expert.

    Args:
        ci_values (Dict): Consistency Indices.
        ri_values (Dict): Random Indices.
        ahp_parameters (Dict): Configuration containing 'consistency_threshold'.

    Returns:
        Tuple[Dict, Dict]:
            - cr_values: [expert][level] -> float
            - accepted_experts: [level] -> Set[expert_id]
    """
    # Extract threshold
    threshold_config = ahp_parameters.get("consistency_threshold", {})
    threshold = threshold_config.get("max_allowed_value", 0.10)

    # Step 1: Compute CR
    cr_values = compute_consistency_ratio(ci_values, ri_values)

    # Step 2: Classify
    accepted_experts = classify_matrices(cr_values, threshold)

    # Step 3: Verify
    verify_accepted_experts(accepted_experts)

    return cr_values, accepted_experts


In [None]:
# Task 13 – Aggregate accepted expert weights for the main criteria level

# ==============================================================================
# Task 13: Aggregate accepted expert weights for the main criteria level
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1: Collect accepted weight vectors for main criteria
# -------------------------------------------------------------------------------------------------------------------------------

def collect_main_weights(
    local_weights: Dict[str, Dict[str, np.ndarray]],
    accepted_experts: Dict[str, Set[str]]
) -> np.ndarray:
    """
    Collects the local weight vectors for the 'Main_Criteria' level from all accepted experts.

    Args:
        local_weights (Dict): Nested dictionary of local weights [expert][level] -> np.ndarray.
        accepted_experts (Dict): Dictionary mapping levels to sets of accepted expert IDs.

    Returns:
        np.ndarray: A 2D array of shape (num_accepted_experts, 4) containing the weight vectors.

    Raises:
        ValueError: If no experts are accepted for 'Main_Criteria'.
    """
    level = "Main_Criteria"

    if level not in accepted_experts or not accepted_experts[level]:
        raise ValueError(f"No accepted experts found for level '{level}'. Cannot aggregate.")

    accepted_ids = accepted_experts[level]
    weight_vectors: List[np.ndarray] = []

    # Collect the local weight vectors for the 'Main_Criteria' level from all accepted expert
    for expert_id in accepted_ids:
        if expert_id not in local_weights or level not in local_weights[expert_id]:
            raise ValueError(f"Missing weights for accepted expert '{expert_id}' at level '{level}'.")

        w = local_weights[expert_id][level]
        weight_vectors.append(w)

    # Stack into (K, 4) matrix
    return np.stack(weight_vectors, axis=0)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 2: Compute arithmetic mean of weight vectors
# -------------------------------------------------------------------------------------------------------------------------------

def compute_mean_weights(weight_matrix: np.ndarray) -> np.ndarray:
    """
    Computes the arithmetic mean of the weight vectors across experts.

    Args:
        weight_matrix (np.ndarray): 2D array of weights (experts x criteria).

    Returns:
        np.ndarray: 1D array of aggregated weights.
    """
    # Compute mean along axis 0 (across experts)
    mean_weights = np.mean(weight_matrix, axis=0)
    return mean_weights

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 3: Normalize aggregated weights
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_aggregated_vector(weights: np.ndarray) -> np.ndarray:
    """
    Normalizes the aggregated weight vector to ensure it sums exactly to 1.0.

    Args:
        weights (np.ndarray): The raw aggregated weight vector.

    Returns:
        np.ndarray: The normalized weight vector.
    """
    total = np.sum(weights)

    if total == 0:
        raise ValueError("Aggregated weight vector sums to zero.")

    return weights / total

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_main_criteria_weights(
    local_weights: Dict[str, Dict[str, np.ndarray]],
    accepted_experts: Dict[str, Set[str]]
) -> np.ndarray:
    """
    Orchestrator function for Task 13: Aggregates expert weights for the Main Criteria level.

    Pipeline:
    1. Collect weight vectors from accepted experts.
    2. Compute arithmetic mean.
    3. Normalize to sum to 1.

    Args:
        local_weights (Dict): Local weights from Task 10.
        accepted_experts (Dict): Accepted experts from Task 12.

    Returns:
        np.ndarray: The final aggregated global weight vector for Main Criteria (CSR, LR, IR, CFR).
    """
    # Step 1: Collect
    weight_matrix = collect_main_weights(local_weights, accepted_experts)

    # Step 2: Mean
    mean_weights = compute_mean_weights(weight_matrix)

    # Step 3: Normalize
    final_weights = normalize_aggregated_vector(mean_weights)

    return final_weights


In [None]:
# Task 14 – Aggregate accepted expert weights for each sub-criteria level

# ==============================================================================
# Task 14: Aggregate accepted expert weights for each sub-criteria level
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1 & 2: Generic Aggregation Function for Sub-Levels
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_level_weights(
    level: str,
    local_weights: Dict[str, Dict[str, np.ndarray]],
    accepted_experts: Dict[str, Set[str]]
) -> np.ndarray:
    """
    Aggregates the local weight vectors for a specific hierarchy level from all accepted experts.

    This function implements the arithmetic mean aggregation followed by normalization,
    consistent with the AHP methodology for group decision making.

    Args:
        level (str): The hierarchy level to aggregate (e.g., "Sub_CSR").
        local_weights (Dict): Nested dictionary of local weights [expert][level] -> np.ndarray.
        accepted_experts (Dict): Dictionary mapping levels to sets of accepted expert IDs.

    Returns:
        np.ndarray: The aggregated, normalized weight vector for the specified level.

    Raises:
        ValueError: If no experts are accepted for the level or weights are missing.
    """
    if level not in accepted_experts or not accepted_experts[level]:
        raise ValueError(f"No accepted experts found for level '{level}'. Cannot aggregate.")

    accepted_ids = accepted_experts[level]
    weight_vectors: List[np.ndarray] = []

    for expert_id in accepted_ids:
        if expert_id not in local_weights or level not in local_weights[expert_id]:
            raise ValueError(f"Missing weights for accepted expert '{expert_id}' at level '{level}'.")

        w = local_weights[expert_id][level]
        weight_vectors.append(w)

    # Stack into (K, n) matrix
    weight_matrix = np.stack(weight_vectors, axis=0)

    # Compute arithmetic mean across experts
    mean_weights = np.mean(weight_matrix, axis=0)

    # Normalize to sum to 1.0
    total = np.sum(mean_weights)
    if total == 0:
        raise ValueError(f"Aggregated weight vector for level '{level}' sums to zero.")

    return mean_weights / total

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 3: Store aggregated local weights for all sub-criteria
# -------------------------------------------------------------------------------------------------------------------------------

def map_sub_weights_to_main_groups(
    aggregated_vectors: Dict[str, np.ndarray]
) -> Dict[str, np.ndarray]:
    """
    Reorganizes the aggregated sub-level weight vectors by their parent main criterion code.

    Mapping:
    - "Sub_CSR" -> "CSR"
    - "Sub_LR"  -> "LR"
    - "Sub_IR"  -> "IR"
    - "Sub_CFR" -> "CFR"

    Args:
        aggregated_vectors (Dict): Dictionary mapping hierarchy level names to weight vectors.

    Returns:
        Dict[str, np.ndarray]: Dictionary mapping main criterion codes to sub-criteria weight vectors.
    """
    level_to_parent = {
        "Sub_CSR": "CSR",
        "Sub_LR": "LR",
        "Sub_IR": "IR",
        "Sub_CFR": "CFR"
    }

    mapped_weights: Dict[str, np.ndarray] = {}

    # Reorganize the aggregated sub-level weight vectors by their parent main criterion code
    for level, parent_code in level_to_parent.items():
        if level not in aggregated_vectors:
            raise ValueError(f"Missing aggregated weights for level '{level}'.")

        mapped_weights[parent_code] = aggregated_vectors[level]

    return mapped_weights

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_all_sub_criteria_weights(
    local_weights: Dict[str, Dict[str, np.ndarray]],
    accepted_experts: Dict[str, Set[str]]
) -> Dict[str, np.ndarray]:
    """
    Orchestrator function for Task 14: Aggregates expert weights for all sub-criteria levels.

    Pipeline:
    1. Iterates through all sub-levels (Sub_CSR, Sub_LR, Sub_IR, Sub_CFR).
    2. Aggregates weights for each level using accepted experts.
    3. Maps the results to their parent main criterion codes.

    Args:
        local_weights (Dict): Local weights from Task 10.
        accepted_experts (Dict): Accepted experts from Task 12.

    Returns:
        Dict[str, np.ndarray]: A dictionary mapping main criterion codes (CSR, LR, IR, CFR)
                               to their aggregated sub-criteria weight vectors.
    """
    sub_levels = ["Sub_CSR", "Sub_LR", "Sub_IR", "Sub_CFR"]
    aggregated_vectors: Dict[str, np.ndarray] = {}

    # Step 1 & 2: Aggregate per level
    for level in sub_levels:
        aggregated_vectors[level] = aggregate_level_weights(level, local_weights, accepted_experts)

    # Step 3: Map to structure
    final_sub_weights = map_sub_weights_to_main_groups(aggregated_vectors)

    return final_sub_weights


In [None]:
# Task 15 – Compute global weights for all 34 financial ratios

# ==============================================================================
# Task 15: Compute global weights for all 34 financial ratios
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 1: Apply hierarchical weight composition
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_raw_global_weights(
    main_weights: np.ndarray,
    sub_weights: Dict[str, np.ndarray],
    hierarchy: 'AHPHierarchy'
) -> Dict[str, float]:
    """
    Computes the raw global weight for each secondary criterion (ratio) by multiplying
    its local weight by the weight of its parent main criterion.

    Formula: w_global(s) = w_main(parent(s)) * w_local(s)

    Args:
        main_weights (np.ndarray): The aggregated weight vector for main criteria (CSR, LR, IR, CFR).
        sub_weights (Dict): Dictionary mapping main criterion codes to aggregated sub-criteria weight vectors.
        hierarchy (AHPHierarchy): The hierarchy configuration object.

    Returns:
        Dict[str, float]: A dictionary mapping ratio labels to their raw global weights.
    """
    # Map main criterion labels to their indices in the main_weights vector
    # The order is defined in hierarchy.levels["Main_Criteria"]
    main_criteria_order = hierarchy.get_criteria("Main_Criteria")
    main_weight_map = {label: main_weights[i] for i, label in enumerate(main_criteria_order)}

    raw_global_weights: Dict[str, float] = {}

    # Iterate through all sub-levels to compute global weights
    # We use the hierarchy to get the list of sub-criteria for each main group
    # Note: sub_weights keys are main codes (CSR, LR, etc.)

    # We need to know which sub-level corresponds to which main code to get the correct label order
    # Mapping: CSR -> Sub_CSR, etc.
    parent_to_level = {
        "CSR": "Sub_CSR",
        "LR": "Sub_LR",
        "IR": "Sub_IR",
        "CFR": "Sub_CFR"
    }

    for parent_code, local_vector in sub_weights.items():
        # Get the weight of the parent main criterion
        if parent_code not in main_weight_map:
             raise ValueError(f"Main criterion '{parent_code}' not found in main weights.")

        parent_weight = main_weight_map[parent_code]

        # Get the ordered list of sub-criteria labels for this group
        level_name = parent_to_level[parent_code]
        sub_criteria_labels = hierarchy.get_criteria(level_name)

        # Validate vector length
        if len(local_vector) != len(sub_criteria_labels):
            raise ValueError(
                f"Dimension mismatch for '{parent_code}': "
                f"Vector len {len(local_vector)} != Labels len {len(sub_criteria_labels)}"
            )

        # Compute global weights
        for i, ratio_label in enumerate(sub_criteria_labels):
            local_w = local_vector[i]
            global_w = parent_weight * local_w
            raw_global_weights[ratio_label] = global_w

    return raw_global_weights

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 2: Assemble the global weight vector
# -------------------------------------------------------------------------------------------------------------------------------

def assemble_global_vector(
    raw_weights_map: Dict[str, float],
    hierarchy: 'AHPHierarchy'
) -> np.ndarray:
    """
    Assembles the global weights into a single 1D array ordered according to the
    canonical global ratio order defined in the hierarchy.

    Args:
        raw_weights_map (Dict): Dictionary of ratio labels to weights.
        hierarchy (AHPHierarchy): Hierarchy configuration containing canonical_ratio_order.

    Returns:
        np.ndarray: The ordered global weight vector (length 34).
    """
    canonical_order = hierarchy.canonical_ratio_order
    vector_list = []

    # Assemble the global weights into a single 1D array
    for ratio_label in canonical_order:
        if ratio_label not in raw_weights_map:
            raise ValueError(f"Missing global weight for ratio '{ratio_label}'.")

        vector_list.append(raw_weights_map[ratio_label])

    return np.array(vector_list, dtype=np.float64)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 3: Normalize the global weight vector
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_final_global_vector(weights: np.ndarray) -> np.ndarray:
    """
    Normalizes the final global weight vector to ensure it sums exactly to 1.0.

    Args:
        weights (np.ndarray): The raw global weight vector.

    Returns:
        np.ndarray: The normalized global weight vector.
    """
    total = np.sum(weights)

    if total == 0:
        raise ValueError("Global weight vector sums to zero.")

    return weights / total

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_global_weights(
    main_weights: np.ndarray,
    sub_weights: Dict[str, np.ndarray],
    hierarchy: 'AHPHierarchy'
) -> np.ndarray:
    """
    Orchestrator function for Task 15: Computes the final global AHP weights for all 34 ratios.

    Pipeline:
    1. Calculate raw global weights via hierarchical composition (Main * Local).
    2. Assemble weights into a canonical vector.
    3. Normalize the vector to sum to 1.

    Args:
        main_weights (np.ndarray): Aggregated main criteria weights.
        sub_weights (Dict): Aggregated sub-criteria weights.
        hierarchy (AHPHierarchy): Hierarchy configuration.

    Returns:
        np.ndarray: The final global weight vector (length 34), summing to 1.
    """
    # Step 1: Hierarchical Composition
    raw_map = calculate_raw_global_weights(main_weights, sub_weights, hierarchy)

    # Step 2: Assemble Vector
    raw_vector = assemble_global_vector(raw_map, hierarchy)

    # Step 3: Normalize
    final_vector = normalize_final_global_vector(raw_vector)

    return final_vector


In [None]:
# Task 16 – Define the computational logic for all 34 financial ratios

# ==============================================================================
# Task 16: Define the computational logic for all 34 financial ratios
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 1: Create a deterministic index ordering for the 34 ratios
# -------------------------------------------------------------------------------------------------------------------------------

def get_canonical_ratio_order(hierarchy: 'AHPHierarchy') -> List[str]:
    """
    Retrieves the deterministic order of the 34 financial ratios from the hierarchy object.

    This order is used to align the columns of the decision matrix X with the global weight vector w.

    Args:
        hierarchy (AHPHierarchy): The hierarchy configuration object.

    Returns:
        List[str]: The ordered list of 34 ratio identifiers (e.g., ['CSR1', ..., 'CFR14']).
    """
    return hierarchy.canonical_ratio_order

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2: Extract numerator and denominator variable names
# -------------------------------------------------------------------------------------------------------------------------------

def extract_ratio_logic(
    feature_engineering_logic: Dict[str, Dict[str, str]],
    canonical_order: List[str]
) -> Dict[str, Tuple[str, str]]:
    """
    Extracts and validates the numerator and denominator expressions for all ratios.

    Args:
        feature_engineering_logic (Dict): Configuration dictionary mapping ratio IDs to logic.
        canonical_order (List[str]): The expected list of ratio IDs.

    Returns:
        Dict[str, Tuple[str, str]]: A mapping of ratio ID -> (numerator_expr, denominator_expr).

    Raises:
        ValueError: If logic is missing for any ratio in the canonical order.
    """
    ratio_specs: Dict[str, Tuple[str, str]] = {}

    # Extract and validate the numerator and denominator expressions for all ratios
    for ratio_id in canonical_order:
        if ratio_id not in feature_engineering_logic:
            raise ValueError(f"Missing feature engineering logic for ratio '{ratio_id}'.")

        logic = feature_engineering_logic[ratio_id]

        if "numerator" not in logic or "denominator" not in logic:
             raise ValueError(f"Incomplete logic for ratio '{ratio_id}': must have 'numerator' and 'denominator'.")

        ratio_specs[ratio_id] = (logic["numerator"], logic["denominator"])

    return ratio_specs

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 3: Define a parser for compound numerator expressions
# -------------------------------------------------------------------------------------------------------------------------------

def evaluate_expression(
    expression: str,
    df: pd.DataFrame
) -> pd.Series:
    """
    Evaluates a string arithmetic expression against the columns of a DataFrame.

    Supports basic arithmetic operators (+, -) and column references.
    Uses pandas.eval for efficient vectorized evaluation.

    Args:
        expression (str): The arithmetic expression (e.g., "current_assets - inventory").
        df (pd.DataFrame): The DataFrame containing the financial data.

    Returns:
        pd.Series: The result of the evaluation for each row.

    Raises:
        ValueError: If the expression cannot be evaluated (e.g., invalid column name).
    """
    try:
        # pd.eval evaluates the string expression using columns in df
        # engine='numexpr' is fast for large data, 'python' is safer for complex logic
        # Given simple arithmetic, default engine is fine.
        result = df.eval(expression)
        return result
    except Exception as e:
        # Fallback: check if it's a direct column name that failed eval for some reason (e.g. special chars)
        if expression in df.columns:
            return df[expression]

        raise ValueError(f"Failed to evaluate expression '{expression}': {e}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_ratio_computation_logic(
    hierarchy: 'AHPHierarchy',
    study_configuration: Dict[str, Any]
) -> Tuple[List[str], Dict[str, Tuple[str, str]]]:
    """
    Orchestrator function for Task 16: Prepares the structural and logical definitions for ratio computation.

    Pipeline:
    1. Retrieve canonical ratio order.
    2. Extract and validate numerator/denominator logic for all ratios.
    3. (The evaluator function is returned implicitly as a callable utility for the next task).

    Args:
        hierarchy (AHPHierarchy): The hierarchy configuration.
        study_configuration (Dict): The full study configuration.

    Returns:
        Tuple[List[str], Dict[str, Tuple[str, str]]]:
            - canonical_order: List of 34 ratio IDs.
            - ratio_specs: Mapping of ID -> (num_expr, den_expr).
    """
    # Step 1: Order
    canonical_order = get_canonical_ratio_order(hierarchy)

    # Step 2: Logic Extraction
    feature_logic = study_configuration["feature_engineering_logic"]
    ratio_specs = extract_ratio_logic(feature_logic, canonical_order)

    return canonical_order, ratio_specs


In [None]:
# Task 17 – Compute the raw decision matrix X of all ratios for all years

# ==============================================================================
# Task 17: Compute the raw decision matrix X of all ratios for all years
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 1: Initialize the decision matrix X
# -------------------------------------------------------------------------------------------------------------------------------

def initialize_decision_matrix(
    financial_df: pd.DataFrame,
    canonical_order: List[str]
) -> pd.DataFrame:
    """
    Initializes an empty decision matrix X with dimensions (T x n).

    Rows correspond to fiscal years (sorted) and columns to the 34 financial ratios.
    Initialized with NaNs.

    Args:
        financial_df (pd.DataFrame): The validated financial statements (source of index).
        canonical_order (List[str]): The ordered list of 34 ratio identifiers.

    Returns:
        pd.DataFrame: An empty DataFrame indexed by fiscal_year with ratio columns.
    """
    # Ensure index is set to fiscal_year if not already
    if "fiscal_year" in financial_df.columns:
        years = financial_df["fiscal_year"].unique()
    else:
        years = financial_df.index.unique()

    # Sort years to ensure chronological order (2008 -> 2017)
    sorted_years = sorted(years)

    # Create DataFrame
    X = pd.DataFrame(
        np.nan,
        index=sorted_years,
        columns=canonical_order,
        dtype=np.float64
    )

    X.index.name = "fiscal_year"
    return X

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 2: Compute each ratio value x_tj
# -------------------------------------------------------------------------------------------------------------------------------

def populate_decision_matrix(
    X: pd.DataFrame,
    financial_df: pd.DataFrame,
    ratio_specs: Dict[str, Tuple[str, str]],
    zero_mask: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes the values for all financial ratios and populates the decision matrix.

    For each ratio:
    1. Evaluates the numerator and denominator expressions using the financial data.
    2. Checks the zero-denominator mask.
    3. Computes the ratio (Numerator / Denominator) where valid, else sets to NaN.

    Args:
        X (pd.DataFrame): The initialized decision matrix.
        financial_df (pd.DataFrame): The financial data.
        ratio_specs (Dict): Mapping of ratio ID -> (numerator_expr, denominator_expr).
        zero_mask (pd.DataFrame): Boolean mask where True indicates a zero denominator.

    Returns:
        pd.DataFrame: The populated decision matrix X.
    """
    # Ensure financial_df is indexed by fiscal_year for alignment
    if "fiscal_year" in financial_df.columns:
        df_aligned = financial_df.set_index("fiscal_year").sort_index()
    else:
        df_aligned = financial_df.sort_index()

    # Iterate over each ratio in the canonical order (columns of X)
    for ratio_id in X.columns:
        if ratio_id not in ratio_specs:
            raise ValueError(f"Missing logic for ratio '{ratio_id}'")

        num_expr, den_expr = ratio_specs[ratio_id]

        # Evaluate expressions
        # We use the helper from Task 16 (assumed available in environment)
        # Note: evaluate_expression was defined in Task 16.
        numerator = evaluate_expression(num_expr, df_aligned)
        denominator = evaluate_expression(den_expr, df_aligned)

        # Compute ratio
        # We align everything by index (fiscal_year)
        ratio_values = numerator / denominator

        # Apply zero denominator policy (force NaN)
        # zero_mask is indexed by fiscal_year and has columns for ratios
        if ratio_id in zero_mask.columns:
            mask = zero_mask[ratio_id]
            # Where mask is True, set to NaN
            ratio_values.loc[mask] = np.nan

        # Assign to matrix
        X[ratio_id] = ratio_values

    return X

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 3: Log and validate the decision matrix
# -------------------------------------------------------------------------------------------------------------------------------

def validate_decision_matrix(X: pd.DataFrame) -> Dict[str, Any]:
    """
    Validates the populated decision matrix and generates summary statistics.

    Checks for columns that are entirely NaN (which would break SAW normalization).

    Args:
        X (pd.DataFrame): The populated decision matrix.

    Returns:
        Dict[str, Any]: A summary dictionary containing NaN counts and descriptive stats.
    """
    nan_counts = X.isna().sum()
    all_nan_cols = nan_counts[nan_counts == len(X)].index.tolist()

    if all_nan_cols:
        # This is a warning condition in a real system, but we log it.
        # In strict mode, we might want to halt, but for now we report.
        pass

    stats = X.describe().to_dict()

    return {
        "nan_counts": nan_counts.to_dict(),
        "all_nan_columns": all_nan_cols,
        "statistics": stats
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_decision_matrix(
    financial_df: pd.DataFrame,
    canonical_order: List[str],
    ratio_specs: Dict[str, Tuple[str, str]],
    zero_mask: pd.DataFrame
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrator function for Task 17: Computes the raw decision matrix X.

    Pipeline:
    1. Initialize empty matrix X.
    2. Populate X by evaluating ratio logic against financial data.
    3. Validate and summarize X.

    Args:
        financial_df (pd.DataFrame): Financial data.
        canonical_order (List[str]): Ordered list of ratio IDs.
        ratio_specs (Dict): Logic for ratios.
        zero_mask (pd.DataFrame): Zero denominator mask.

    Returns:
        Tuple[pd.DataFrame, Dict]: The decision matrix X and its validation report.
    """
    # Step 1: Initialize
    X_init = initialize_decision_matrix(financial_df, canonical_order)

    # Step 2: Populate
    X_populated = populate_decision_matrix(X_init, financial_df, ratio_specs, zero_mask)

    # Step 3: Validate
    report = validate_decision_matrix(X_populated)

    return X_populated, report


In [None]:
# Task 18 – Validate the decision matrix and enforce minimum variance constraints

# ==============================================================================
# Task 18: Validate the decision matrix and enforce minimum variance constraints
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 1: Check for zero-variance criteria
# -------------------------------------------------------------------------------------------------------------------------------

def check_zero_variance(X: pd.DataFrame) -> List[str]:
    """
    Identifies criteria (columns) in the decision matrix that have zero variance across years.

    Zero-variance criteria provide no discriminatory power in SAW and would cause
    division-by-zero errors during normalization (max - min = 0).

    Args:
        X (pd.DataFrame): The decision matrix.

    Returns:
        List[str]: A list of column names with zero variance.
    """
    zero_variance_cols = []

    for col in X.columns:
        # Skip columns that are all NaN
        if X[col].isna().all():
            continue

        min_val = X[col].min()
        max_val = X[col].max()

        # Check if min is approximately equal to max
        if np.isclose(min_val, max_val, atol=1e-9):
            zero_variance_cols.append(col)

    return zero_variance_cols

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 2: Inspect for outliers or unrealistic ratio values
# -------------------------------------------------------------------------------------------------------------------------------

def detect_outliers(X: pd.DataFrame, threshold: float = 5.0) -> Dict[str, List[int]]:
    """
    Detects statistical outliers in the decision matrix using the Z-score method.

    Outliers are defined as values deviating from the mean by more than `threshold`
    standard deviations. This is a diagnostic step; values are not modified.

    Args:
        X (pd.DataFrame): The decision matrix.
        threshold (float): The Z-score threshold for flagging outliers.

    Returns:
        Dict[str, List[int]]: A dictionary mapping column names to lists of fiscal years
                              where outliers were detected.
    """
    outliers = {}

    # Detect statistical outliers in the decision matrix using the Z-score method
    for col in X.columns:
        series = X[col].dropna()
        if len(series) < 2:
            continue

        mean = series.mean()
        std = series.std()

        if std == 0:
            continue

        z_scores = (series - mean) / std
        outlier_mask = z_scores.abs() > threshold

        if outlier_mask.any():
            # Get the fiscal years (index) corresponding to outliers
            outlier_years = series[outlier_mask].index.tolist()
            outliers[col] = outlier_years

    return outliers

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 3: Freeze the validated decision matrix
# -------------------------------------------------------------------------------------------------------------------------------

def freeze_matrix(X: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Finalizes the decision matrix for SAW processing and creates a validity mask.

    The validity mask indicates which entries are non-NaN and thus valid for
    normalization and weighting.

    Args:
        X (pd.DataFrame): The validated decision matrix.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]:
            - The frozen decision matrix X (unchanged).
            - A boolean DataFrame mask (True where data is valid/non-NaN).
    """
    # Create validity mask (True where not NaN)
    valid_mask = X.notna()

    # Return X and the mask
    # We return a copy of X to ensure immutability of the input if needed,
    # though here we just pass it through as the "frozen" version.
    return X.copy(), valid_mask

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_and_freeze_decision_matrix(X: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrator function for Task 18: Validates and finalizes the decision matrix.

    Pipeline:
    1. Check for zero-variance columns.
    2. Detect statistical outliers.
    3. Freeze the matrix and generate a validity mask.

    Args:
        X (pd.DataFrame): The raw decision matrix from Task 17.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, Dict]:
            - Frozen decision matrix X.
            - Validity mask.
            - Diagnostic report (zero variance cols, outliers).
    """
    # Step 1: Zero Variance
    zero_var_cols = check_zero_variance(X)

    # Step 2: Outliers
    outliers = detect_outliers(X)

    # Step 3: Freeze
    X_final, valid_mask = freeze_matrix(X)

    report = {
        "zero_variance_columns": zero_var_cols,
        "outliers": outliers
    }

    return X_final, valid_mask, report


In [None]:
# Task 19 – Assign benefit/cost directionality to each criterion for SAW normalization

# ====================================================================================
# Task 19: Assign benefit/cost directionality to each criterion for SAW normalization
# ====================================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 1: Retrieve directionality lists from configuration
# -------------------------------------------------------------------------------------------------------------------------------

def retrieve_directionality_sets(
    saw_parameters: Dict[str, Any]
) -> Tuple[Set[str], Set[str]]:
    """
    Retrieves the sets of benefit and cost criteria from the SAW configuration.

    Args:
        saw_parameters (Dict): The SAW parameters dictionary.

    Returns:
        Tuple[Set[str], Set[str]]:
            - benefit_criteria: Set of ratio IDs where higher is better (lower risk).
            - cost_criteria: Set of ratio IDs where higher is worse (higher risk).

    Raises:
        KeyError: If required keys are missing from configuration.
    """
    directionality = saw_parameters.get("criteria_directionality", {})

    if "benefit_criteria_max" not in directionality:
        raise KeyError("Missing 'benefit_criteria_max' in SAW configuration.")
    if "cost_criteria_min" not in directionality:
        raise KeyError("Missing 'cost_criteria_min' in SAW configuration.")

    benefit_set = set(directionality["benefit_criteria_max"])
    cost_set = set(directionality["cost_criteria_min"])

    return benefit_set, cost_set

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 2: Verify completeness and non-overlap
# -------------------------------------------------------------------------------------------------------------------------------

def validate_directionality_completeness(
    benefit_set: Set[str],
    cost_set: Set[str],
    canonical_order: List[str]
) -> None:
    """
    Validates that the directionality sets form a partition of the canonical ratio set.

    Checks:
    1. Completeness: Union of sets equals the set of all ratios.
    2. Non-overlap: Intersection of sets is empty.

    Args:
        benefit_set (Set[str]): Benefit criteria.
        cost_set (Set[str]): Cost criteria.
        canonical_order (List[str]): The list of all 34 ratio IDs.

    Raises:
        ValueError: If sets overlap or do not cover all ratios.
    """
    all_ratios = set(canonical_order)
    union_set = benefit_set.union(cost_set)
    intersection_set = benefit_set.intersection(cost_set)

    # Check completeness
    missing = all_ratios - union_set
    extra = union_set - all_ratios

    if missing:
        raise ValueError(f"Directionality configuration missing ratios: {missing}")
    if extra:
        raise ValueError(f"Directionality configuration contains unknown ratios: {extra}")

    # Check non-overlap
    if intersection_set:
        raise ValueError(f"Ratios assigned to both benefit and cost sets: {intersection_set}")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 3: Construct a directionality lookup
# -------------------------------------------------------------------------------------------------------------------------------

def build_directionality_map(
    benefit_set: Set[str],
    cost_set: Set[str]
) -> Dict[str, str]:
    """
    Constructs a lookup dictionary mapping each ratio ID to its directionality type.

    Args:
        benefit_set (Set[str]): Benefit criteria.
        cost_set (Set[str]): Cost criteria.

    Returns:
        Dict[str, str]: Mapping {ratio_id: "benefit" | "cost"}.
    """
    directionality_map = {}

    # Construct a lookup dictionary mapping each ratio ID to its directionality type
    for ratio in benefit_set:
        directionality_map[ratio] = "benefit"

    for ratio in cost_set:
        directionality_map[ratio] = "cost"

    return directionality_map

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def configure_saw_directionality(
    saw_parameters: Dict[str, Any],
    canonical_order: List[str]
) -> Dict[str, str]:
    """
    Orchestrator function for Task 19: Configures and validates SAW directionality.

    Pipeline:
    1. Retrieve sets from config.
    2. Validate partition property against canonical order.
    3. Build lookup map.

    Args:
        saw_parameters (Dict): SAW configuration.
        canonical_order (List[str]): List of all ratio IDs.

    Returns:
        Dict[str, str]: Validated directionality map.
    """
    # Step 1: Retrieve
    benefit_set, cost_set = retrieve_directionality_sets(saw_parameters)

    # Step 2: Validate
    validate_directionality_completeness(benefit_set, cost_set, canonical_order)

    # Step 3: Build Map
    directionality_map = build_directionality_map(benefit_set, cost_set)

    return directionality_map


In [None]:
# Task 20 – Compute per-criterion extrema and normalize to risk-coded scores

# ==============================================================================
# Task 20: Compute per-criterion extrema and normalize to risk-coded scores
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 1: Compute min and max for each criterion
# -------------------------------------------------------------------------------------------------------------------------------

def compute_criterion_extrema(X: pd.DataFrame) -> Tuple[pd.Series, pd.Series]:
    """
    Computes the minimum and maximum values for each criterion (column) in the decision matrix.

    These extrema are used for Min-Max normalization.

    Args:
        X (pd.DataFrame): The decision matrix.

    Returns:
        Tuple[pd.Series, pd.Series]:
            - x_min: Series of minimum values per criterion.
            - x_max: Series of maximum values per criterion.
    """
    # Compute min and max, skipping NaNs by default
    x_min = X.min(axis=0)
    x_max = X.max(axis=0)

    return x_min, x_max

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 2: Normalize to risk-coded scores r_tj
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_normalized_scores(
    X: pd.DataFrame,
    x_min: pd.Series,
    x_max: pd.Series,
    directionality: Dict[str, str]
) -> pd.DataFrame:
    """
    Normalizes the decision matrix into risk-coded scores in [0, 1].

    Formulas:
    - Benefit (Higher is Better/Lower Risk): r = (max - x) / (max - min)
      (Max value gets 0 risk, Min value gets 1 risk)
    - Cost (Higher is Worse/Higher Risk): r = (x - min) / (max - min)
      (Min value gets 0 risk, Max value gets 1 risk)

    Handles zero-variance criteria (max == min) and missing values (NaN) by assigning 0 risk.

    Args:
        X (pd.DataFrame): The raw decision matrix.
        x_min (pd.Series): Minimum values.
        x_max (pd.Series): Maximum values.
        directionality (Dict): Mapping of ratio ID to 'benefit' or 'cost'.

    Returns:
        pd.DataFrame: The normalized risk matrix R.
    """
    R = pd.DataFrame(index=X.index, columns=X.columns, dtype=np.float64)

    for col in X.columns:
        if col not in directionality:
            raise ValueError(f"Missing directionality for ratio '{col}'.")

        col_type = directionality[col]
        xmin = x_min[col]
        xmax = x_max[col]

        # Check for zero variance or all-NaN
        if pd.isna(xmin) or pd.isna(xmax) or np.isclose(xmin, xmax):
            # Assign 0 risk (neutral)
            R[col] = 0.0
            continue

        denominator = xmax - xmin
        values = X[col]

        if col_type == "benefit":
            # Benefit: Higher raw value -> Lower risk
            # r = (max - x) / (max - min)
            # x=max -> r=0; x=min -> r=1
            norm_values = (xmax - values) / denominator
        elif col_type == "cost":
            # Cost: Higher raw value -> Higher risk
            # r = (x - min) / (max - min)
            # x=min -> r=0; x=max -> r=1
            norm_values = (values - xmin) / denominator
        else:
            raise ValueError(f"Unknown directionality type '{col_type}' for ratio '{col}'.")

        # Fill NaNs with 0 (neutral risk)
        norm_values = norm_values.fillna(0.0)

        R[col] = norm_values

    return R

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 3: Verify all normalized values are in [0, 1]
# -------------------------------------------------------------------------------------------------------------------------------

def verify_and_clip_normalization(R: pd.DataFrame) -> pd.DataFrame:
    """
    Verifies that all normalized risk scores are within the [0, 1] interval.
    Clips values to handle minor floating-point deviations.

    Args:
        R (pd.DataFrame): The normalized risk matrix.

    Returns:
        pd.DataFrame: The verified and clipped matrix.
    """
    # Check bounds (allowing for small epsilon)
    epsilon = 1e-9
    min_val = R.min().min()
    max_val = R.max().max()

    if min_val < -epsilon or max_val > 1.0 + epsilon:
        # This indicates a logic error or severe numerical issue
        # We log/warn but proceed with clipping
        pass

    # Clip to [0, 1] strictly
    R_clipped = R.clip(lower=0.0, upper=1.0)

    return R_clipped

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_decision_matrix(
    X: pd.DataFrame,
    directionality: Dict[str, str]
) -> pd.DataFrame:
    """
    Orchestrator function for Task 20: Normalizes the decision matrix to risk scores.

    Pipeline:
    1. Compute extrema (min/max).
    2. Apply SAW normalization formulas based on directionality.
    3. Verify and clip results to [0, 1].

    Args:
        X (pd.DataFrame): Raw decision matrix.
        directionality (Dict): Directionality map.

    Returns:
        pd.DataFrame: Normalized risk matrix R.
    """
    # Step 1: Extrema
    x_min, x_max = compute_criterion_extrema(X)

    # Step 2: Normalize
    R_raw = calculate_normalized_scores(X, x_min, x_max, directionality)

    # Step 3: Verify/Clip
    R_final = verify_and_clip_normalization(R_raw)

    return R_final


In [None]:
# Task 21 – Apply global AHP weights to normalized risk scores

# ==============================================================================
# Task 21: Apply global AHP weights to normalized risk scores
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 1: Retrieve the global weight vector
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_weights_for_broadcasting(
    global_weights: np.ndarray,
    canonical_order: List[str]
) -> pd.Series:
    """
    Prepares the global weight vector for broadcasting against the normalized risk matrix.

    This function converts the raw numpy array of weights into a pandas Series indexed by
    the canonical ratio identifiers. This ensures that subsequent multiplication operations
    align weights to columns by label, preventing catastrophic errors due to column permutation.
    It also performs defensive validation on the weight vector properties.

    Args:
        global_weights (np.ndarray): The global weight vector (length 34), expected to sum to 1.0.
        canonical_order (List[str]): The ordered list of 34 ratio identifiers corresponding to the weights.

    Returns:
        pd.Series: A pandas Series of weights indexed by ratio ID.

    Raises:
        ValueError: If dimensions mismatch or weights do not sum to approximately 1.0.
    """
    # Validate dimensions
    if len(global_weights) != len(canonical_order):
        raise ValueError(
            f"Dimension mismatch: weights vector length ({len(global_weights)}) "
            f"does not match canonical order length ({len(canonical_order)})."
        )

    # Validate sum-to-one property (defensive check)
    total_weight = np.sum(global_weights)
    if not np.isclose(total_weight, 1.0, atol=1e-9):
        raise ValueError(f"Global weights do not sum to 1.0 (sum={total_weight}). Check AHP aggregation.")

    # Create Series with explicit index
    weights_series = pd.Series(data=global_weights, index=canonical_order, name="global_weights")

    return weights_series

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 2: Compute the weighted risk matrix V
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_weighted_matrix(
    R: pd.DataFrame,
    weights: pd.Series
) -> pd.DataFrame:
    """
    Computes the weighted risk matrix V by applying global weights to normalized scores.

    This function implements the core SAW aggregation step at the element level:
    v_{tj} = w_j * r_{tj}

    It ensures that the weights are correctly aligned to the columns of the normalized
    risk matrix R.

    Args:
        R (pd.DataFrame): The normalized risk matrix (T x 34), with values in [0, 1].
        weights (pd.Series): The global weights indexed by ratio ID.

    Returns:
        pd.DataFrame: The weighted risk matrix V, with the same dimensions and index as R.

    Raises:
        ValueError: If the columns of R do not match the index of the weights Series.
    """
    # Validate schema alignment
    r_cols = set(R.columns)
    w_index = set(weights.index)

    if r_cols != w_index:
        missing_in_r = w_index - r_cols
        missing_in_w = r_cols - w_index
        raise ValueError(
            f"Schema mismatch between Normalized Matrix R and Weights.\n"
            f"Missing in R: {missing_in_r}\n"
            f"Missing in Weights: {missing_in_w}"
        )

    # Perform element-wise multiplication
    # axis=1 ensures that for each row in R, the column 'col' is multiplied by weights['col']
    V = R.multiply(weights, axis=1)

    return V

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 3: Validate weighted risk contributions
# -------------------------------------------------------------------------------------------------------------------------------

def validate_weighted_matrix(
    V: pd.DataFrame,
    weights: pd.Series
) -> Dict[str, Any]:
    """
    Validates the integrity of the weighted risk matrix V.

    Performs the following checks:
    1. Element-wise Bound: v_{tj} <= w_j (since r_{tj} <= 1).
    2. Row-wise Bound: sum_j(v_{tj}) <= 1.0 (total risk score cannot exceed 100%).
    3. Non-negativity: v_{tj} >= 0.

    Args:
        V (pd.DataFrame): The weighted risk matrix.
        weights (pd.Series): The global weights used for calculation.

    Returns:
        Dict[str, Any]: A validation report containing summary statistics and check results.

    Raises:
        ValueError: If any validation check fails significantly (beyond floating point epsilon).
    """
    epsilon = 1e-9
    report = {}

    # Check 1: Non-negativity
    min_val = V.min().min()
    if min_val < -epsilon:
        raise ValueError(f"Weighted matrix V contains negative values (min={min_val}).")
    report["min_value"] = min_val

    # Check 2: Element-wise upper bound (v_tj <= w_j)
    # We subtract the weights from V; if V is correct, V - W <= 0 (approx)
    # We use broadcasting: V columns - weights
    diff = V.subtract(weights, axis=1)
    max_diff = diff.max().max()

    if max_diff > epsilon:
        raise ValueError(f"Weighted values exceed their theoretical maximum weights (max excess={max_diff}).")
    report["max_element_excess"] = max_diff

    # Check 3: Row sums (Total Risk Score per Year)
    row_sums = V.sum(axis=1)
    max_row_sum = row_sums.max()

    if max_row_sum > 1.0 + epsilon:
        raise ValueError(f"Total risk score for a year exceeds 1.0 (max={max_row_sum}).")

    report["max_year_risk_score"] = max_row_sum
    report["min_year_risk_score"] = row_sums.min()
    report["mean_year_risk_score"] = row_sums.mean()

    return report

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_weighted_risk_matrix(
    R: pd.DataFrame,
    global_weights: np.ndarray,
    canonical_order: List[str]
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Orchestrator function for Task 21: Computes and validates the weighted risk matrix V.

    Pipeline:
    1. Prepare and validate the weights Series.
    2. Compute V = R * w (broadcasting).
    3. Validate V against theoretical bounds.

    Args:
        R (pd.DataFrame): The normalized risk matrix (T x 34).
        global_weights (np.ndarray): The global weight vector (length 34).
        canonical_order (List[str]): The ordered list of ratio IDs.

    Returns:
        Tuple[pd.DataFrame, Dict[str, Any]]:
            - The validated weighted risk matrix V.
            - A dictionary containing validation metrics.
    """
    # Step 1: Prepare Weights
    weights_series = prepare_weights_for_broadcasting(global_weights, canonical_order)

    # Step 2: Calculate V
    V = calculate_weighted_matrix(R, weights_series)

    # Step 3: Validate
    validation_report = validate_weighted_matrix(V, weights_series)

    return V, validation_report


In [None]:
# Task 22 – Aggregate weighted risk scores to compute per-year composite risk scores

# ==================================================================================
# Task 22: Aggregate weighted risk scores to compute per-year composite risk scores
# ==================================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 1: Compute the raw composite risk score V_t for each year
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_composite_scores(V: pd.DataFrame) -> pd.Series:
    """
    Computes the raw composite risk score V_t for each fiscal year.

    Formula: V_t = sum_j(v_tj)
    This aggregates the weighted risk contributions of all 34 ratios into a single
    scalar risk score per year.

    Args:
        V (pd.DataFrame): The weighted risk matrix (T x 34).

    Returns:
        pd.Series: A Series of composite risk scores indexed by fiscal_year.
    """
    # Sum across columns (axis=1) to get total risk per year
    # min_count=0 ensures that if a row is all-NaN (unlikely), sum is 0, not NaN
    V_t = V.sum(axis=1, min_count=0)

    return V_t

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 2: Validate the range of V_t values
# -------------------------------------------------------------------------------------------------------------------------------

def validate_composite_scores(V_t: pd.Series) -> Dict[str, Any]:
    """
    Validates that the composite risk scores fall within the theoretical range [0, 1].

    Since normalized scores r_tj are in [0, 1] and weights sum to 1, the weighted sum
    must also be in [0, 1].

    Args:
        V_t (pd.Series): The composite risk scores.

    Returns:
        Dict[str, Any]: Validation report with min/max statistics.

    Raises:
        ValueError: If scores significantly exceed 1.0 (indicating weight or normalization error).
    """
    # Compute min and max score and set threshold
    min_score = V_t.min()
    max_score = V_t.max()
    epsilon = 1e-9

    if min_score < -epsilon:
        raise ValueError(f"Composite risk score contains negative values (min={min_score}).")

    if max_score > 1.0 + epsilon:
        # This is a critical integrity check for the SAW method
        raise ValueError(f"Composite risk score exceeds 1.0 (max={max_score}). Check weights sum.")

    return {
        "min_composite_score": min_score,
        "max_composite_score": max_score,
        "mean_composite_score": V_t.mean()
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 3: Assemble the vector of per-year composite scores
# -------------------------------------------------------------------------------------------------------------------------------

def finalize_composite_vector(V_t: pd.Series) -> pd.Series:
    """
    Finalizes the composite score vector by ensuring it is sorted chronologically.

    Args:
        V_t (pd.Series): The raw composite scores.

    Returns:
        pd.Series: The sorted composite score vector.
    """
    # Ensure chronological order for time-series analysis
    V_sorted = V_t.sort_index()

    return V_sorted

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_composite_risk_scores(V: pd.DataFrame) -> Tuple[pd.Series, Dict[str, Any]]:
    """
    Orchestrator function for Task 22: Computes per-year composite risk scores.

    Pipeline:
    1. Sum weighted matrix rows to get V_t.
    2. Validate V_t range [0, 1].
    3. Sort and finalize the vector.

    Args:
        V (pd.DataFrame): Weighted risk matrix.

    Returns:
        Tuple[pd.Series, Dict]:
            - Sorted composite risk scores V_t.
            - Validation report.
    """
    # Step 1: Calculate
    V_t_raw = calculate_composite_scores(V)

    # Step 2: Validate
    report = validate_composite_scores(V_t_raw)

    # Step 3: Finalize
    V_t_final = finalize_composite_vector(V_t_raw)

    return V_t_final, report


In [None]:
# Task 23 – Normalize composite risk scores across years to obtain relative risk indices

# ======================================================================================
# Task 23: Normalize composite risk scores across years to obtain relative risk indices
# ======================================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 1: Compute the sum of all yearly composite scores
# -------------------------------------------------------------------------------------------------------------------------------

def compute_total_risk_sum(V_t: pd.Series) -> float:
    """
    Computes the sum of composite risk scores across all years.

    Args:
        V_t (pd.Series): Composite risk scores indexed by fiscal_year.

    Returns:
        float: The total sum S.

    Raises:
        ValueError: If the sum is zero (degenerate case).
    """
    total_sum = V_t.sum()

    if total_sum == 0:
        raise ValueError("Total composite risk sum is zero. Cannot normalize.")

    return float(total_sum)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 2: Compute normalized risk share A_t for each year
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_relative_shares(V_t: pd.Series, total_sum: float) -> pd.Series:
    """
    Computes the relative risk share A_t for each year.

    Formula: A_t = V_t / S

    Args:
        V_t (pd.Series): Composite risk scores.
        total_sum (float): The sum of all scores.

    Returns:
        pd.Series: Relative risk indices A_t.
    """
    A_t = V_t / total_sum
    return A_t

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 3: Verify normalization properties
# -------------------------------------------------------------------------------------------------------------------------------

def verify_relative_indices(A_t: pd.Series) -> Dict[str, Any]:
    """
    Verifies that the relative risk indices sum to 1.0 and are non-negative.

    Args:
        A_t (pd.Series): Relative risk indices.

    Returns:
        Dict[str, Any]: Validation report.

    Raises:
        ValueError: If normalization properties are violated.
    """
    # Compute total, min and max
    total = A_t.sum()
    min_val = A_t.min()
    max_val = A_t.max()

    if not np.isclose(total, 1.0, atol=1e-9):
        raise ValueError(f"Relative risk indices do not sum to 1.0 (sum={total}).")

    if min_val < -1e-9:
        raise ValueError(f"Negative relative risk index found (min={min_val}).")

    return {
        "sum_A": total,
        "min_A": min_val,
        "max_A": max_val
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_relative_risk_indices(V_t: pd.Series) -> Tuple[pd.Series, Dict[str, Any]]:
    """
    Orchestrator function for Task 23: Computes relative risk indices A_t.

    Pipeline:
    1. Compute total sum S.
    2. Divide V_t by S to get A_t.
    3. Verify A_t properties.

    Args:
        V_t (pd.Series): Composite risk scores.

    Returns:
        Tuple[pd.Series, Dict]: Relative risk indices A_t and validation report.
    """
    # Step 1: Sum
    S = compute_total_risk_sum(V_t)

    # Step 2: Normalize
    A_t = calculate_relative_shares(V_t, S)

    # Step 3: Verify
    report = verify_relative_indices(A_t)

    return A_t, report


In [None]:
# Task 24 – Rank financial years by composite risk index and compare with study

# ==============================================================================
# Task 24: Rank financial years by composite risk index and compare with study
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 1: Sort years by descending A_t (or V_t)
# -------------------------------------------------------------------------------------------------------------------------------

def rank_years(A_t: pd.Series) -> pd.DataFrame:
    """
    Ranks financial years based on their normalized risk index A_t.

    Ranking is descending: Rank 1 = Highest Risk.

    Args:
        A_t (pd.Series): Normalized risk indices indexed by fiscal_year.

    Returns:
        pd.DataFrame: DataFrame containing A_t and the computed Rank, sorted by Rank.
    """
    # Create DataFrame
    ranking_df = A_t.to_frame(name="A_t")

    # Compute Rank (1 = Highest Risk)
    # method='min' assigns the same rank to ties, leaving gaps
    # ascending=False means higher A_t gets lower rank number (1)
    ranking_df["Rank"] = ranking_df["A_t"].rank(ascending=False, method='min').astype(int)

    # Sort by Rank (and then by year for deterministic tie-breaking if needed)
    ranking_df = ranking_df.sort_values(by=["Rank", "A_t"], ascending=[True, False])

    return ranking_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 2: Identify the most and least risky years
# -------------------------------------------------------------------------------------------------------------------------------

def identify_extreme_years(ranking_df: pd.DataFrame) -> Dict[str, int]:
    """
    Identifies the fiscal years with the highest and lowest financial risk.

    Args:
        ranking_df (pd.DataFrame): The ranked DataFrame from Step 1.

    Returns:
        Dict[str, int]: Dictionary with keys 'most_risky_year' and 'least_risky_year'.
    """
    # Most risky is Rank 1 (first row after sort)
    most_risky = ranking_df.index[0]

    # Least risky is last Rank (last row)
    least_risky = ranking_df.index[-1]

    return {
        "most_risky_year": int(most_risky),
        "least_risky_year": int(least_risky)
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 3: Cross-check ranking against the study's reported ordering
# -------------------------------------------------------------------------------------------------------------------------------

def compare_with_study_results(ranking_df: pd.DataFrame) -> pd.DataFrame:
    """
    Compares the computed ranking with the reference ranking reported in the study.

    Reference (from Table 12 of the paper):
    2016: 1, 2010: 2, 2012: 3, 2015: 4, 2013: 5,
    2008: 6, 2014: 7, 2011: 8, 2017: 9, 2009: 10.

    Args:
        ranking_df (pd.DataFrame): The computed ranking.

    Returns:
        pd.DataFrame: A comparison table with Computed Rank, Reference Rank, and Difference.
    """
    # Hardcoded reference from the study context
    reference_ranks = {
        2016: 1, 2010: 2, 2012: 3, 2015: 4, 2013: 5,
        2008: 6, 2014: 7, 2011: 8, 2017: 9, 2009: 10
    }

    comparison = ranking_df.copy()
    comparison["Reference_Rank"] = comparison.index.map(reference_ranks)

    # Calculate difference
    comparison["Rank_Diff"] = comparison["Rank"] - comparison["Reference_Rank"]

    # Reorder columns
    comparison = comparison[["A_t", "Rank", "Reference_Rank", "Rank_Diff"]]

    return comparison

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def rank_and_compare_years(A_t: pd.Series) -> Tuple[pd.DataFrame, Dict[str, int], pd.DataFrame]:
    """
    Orchestrator function for Task 24: Ranks years and validates against study results.

    Pipeline:
    1. Rank years by A_t.
    2. Identify extremes (max/min risk).
    3. Compare with published ground truth.

    Args:
        A_t (pd.Series): Normalized risk indices.

    Returns:
        Tuple[pd.DataFrame, Dict, pd.DataFrame]:
            - Ranked DataFrame.
            - Extremes dictionary.
            - Comparison DataFrame.
    """
    # Step 1: Rank
    ranking_df = rank_years(A_t)

    # Step 2: Extremes
    extremes = identify_extreme_years(ranking_df)

    # Step 3: Compare
    comparison_df = compare_with_study_results(ranking_df)

    return ranking_df, extremes, comparison_df


In [None]:
# Task 25 – Design an orchestrator function for the full AHP+SAW pipeline

# ==============================================================================
# Task 25: Design an orchestrator function for the full AHP+SAW pipeline
# ==============================================================================

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 1, 2, 3: Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_hafri_pipeline(
    raw_expert_survey_df: pd.DataFrame,
    raw_financial_statement_df: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Orchestrator function for the Heuristic-Augmented Financial Risk Index (HAFRI) pipeline.

    This function executes the end-to-end workflow:
    1.  **Validation & Cleaning**: Validates configuration, schemas, and data integrity for both survey and financial inputs.
    2.  **AHP Phase**: Constructs the hierarchy, builds pairwise matrices, computes consistency metrics, filters experts, and aggregates global weights.
    3.  **Ratio Phase**: Computes the raw decision matrix (X) of financial ratios.
    4.  **SAW Phase**: Normalizes ratios, applies AHP weights, computes composite risk scores, and ranks years.

    Args:
        raw_expert_survey_df (pd.DataFrame): Raw expert pairwise comparison data.
        raw_financial_statement_df (pd.DataFrame): Raw financial statement data.
        study_configuration (Dict[str, Any]): Complete study configuration dictionary.

    Returns:
        Dict[str, Any]: A dictionary containing all key intermediate and final results:
            - 'global_weights': The aggregated AHP weights.
            - 'decision_matrix': The raw ratio matrix X.
            - 'normalized_matrix': The risk-coded matrix R.
            - 'weighted_matrix': The weighted matrix V.
            - 'composite_scores': The per-year risk scores V_t.
            - 'relative_indices': The normalized risk shares A_t.
            - 'ranking': The final ranking of years.
            - 'comparison': Comparison with study results.
            - 'ahp_diagnostics': Consistency metrics and acceptance rates.
            - 'data_diagnostics': Financial data quality report.
    """
    logger.info("Starting HAFRI Pipeline...")

    # =========================================================================
    # PHASE 1: VALIDATION & CLEANING
    # =========================================================================
    logger.info("Phase 1: Validation & Cleaning")

    # 1. Validate Configuration
    validate_study_configuration(study_configuration)
    logger.info("Configuration validated.")

    # 2. Validate & Clean Expert Survey
    validate_raw_expert_survey(raw_expert_survey_df)
    # Note: validate_combinatorial_completeness is called inside clean_and_standardize_survey
    cleaned_survey_df = clean_and_standardize_survey(raw_expert_survey_df)
    logger.info(f"Expert survey cleaned. Rows: {len(cleaned_survey_df)}")

    # 3. Validate & Clean Financial Statements
    validated_financials = validate_financial_statements(raw_financial_statement_df)
    enriched_financials = enforce_financial_identities(validated_financials)
    data_report, zero_mask = handle_missing_and_zero_values(enriched_financials, study_configuration)
    logger.info("Financial statements validated and enriched.")

    # =========================================================================
    # PHASE 2: ANALYTIC HIERARCHY PROCESS (AHP)
    # =========================================================================
    logger.info("Phase 2: AHP Execution")

    # 1. Initialize Hierarchy
    hierarchy = initialize_ahp_hierarchy()

    # 2. Build Matrices
    matrices = build_ahp_matrices(cleaned_survey_df, hierarchy)

    # 3. Compute Local Weights & Consistency
    local_weights, column_sums = compute_ahp_local_weights(matrices)
    lambda_max, ci, ri = compute_ahp_consistency_metrics(
        local_weights, column_sums, hierarchy, study_configuration["ahp_parameters"]
    )
    cr_values, accepted_experts = filter_consistent_matrices(
        ci, ri, study_configuration["ahp_parameters"]
    )

    # Log acceptance
    for level, experts in accepted_experts.items():
        logger.info(f"Level '{level}': {len(experts)} experts accepted.")

    # 4. Aggregate Weights
    main_weights = aggregate_main_criteria_weights(local_weights, accepted_experts)
    sub_weights = aggregate_all_sub_criteria_weights(local_weights, accepted_experts)

    # 5. Compute Global Weights
    global_weights = compute_global_weights(main_weights, sub_weights, hierarchy)
    logger.info("Global AHP weights computed.")

    # =========================================================================
    # PHASE 3: RATIO COMPUTATION
    # =========================================================================
    logger.info("Phase 3: Ratio Computation")

    # 1. Prepare Logic
    canonical_order, ratio_specs = prepare_ratio_computation_logic(hierarchy, study_configuration)

    # 2. Compute Decision Matrix X
    X, x_report = compute_decision_matrix(enriched_financials, canonical_order, ratio_specs, zero_mask)

    # 3. Validate X
    X_final, valid_mask, x_diagnostics = validate_and_freeze_decision_matrix(X)
    logger.info("Decision matrix X computed and validated.")

    # =========================================================================
    # PHASE 4: SIMPLE ADDITIVE WEIGHTING (SAW)
    # =========================================================================
    logger.info("Phase 4: SAW Execution")

    # 1. Configure Directionality
    directionality_map = configure_saw_directionality(study_configuration["saw_parameters"], canonical_order)

    # 2. Normalize (X -> R)
    R = normalize_decision_matrix(X_final, directionality_map)

    # 3. Weighting (R -> V)
    V, v_report = compute_weighted_risk_matrix(R, global_weights, canonical_order)

    # 4. Aggregation (V -> V_t -> A_t)
    V_t, vt_report = compute_composite_risk_scores(V)
    A_t, at_report = compute_relative_risk_indices(V_t)

    # 5. Ranking & Comparison
    ranking_df, extremes, comparison_df = rank_and_compare_years(A_t)

    logger.info(f"Most Risky Year: {extremes['most_risky_year']}")
    logger.info(f"Least Risky Year: {extremes['least_risky_year']}")

    # =========================================================================
    # RETURN RESULTS
    # =========================================================================
    results = {
        "global_weights": global_weights,
        "decision_matrix": X_final,
        "normalized_matrix": R,
        "weighted_matrix": V,
        "composite_scores": V_t,
        "relative_indices": A_t,
        "ranking": ranking_df,
        "comparison": comparison_df,
        "ahp_diagnostics": {
            "cr_values": cr_values,
            "accepted_experts": accepted_experts,
            "lambda_max": lambda_max,
            "ci": ci
        },
        "data_diagnostics": {
            "financial_report": data_report,
            "ratio_report": x_report,
            "outliers": x_diagnostics
        }
    }

    logger.info("Pipeline completed successfully.")
    return results


In [None]:
# Task 26 – Conduct robustness and sensitivity analyses using the orchestrator

# ==============================================================================
# Task 26: Conduct robustness and sensitivity analyses using the orchestrator
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 1: Define robustness scenarios
# -------------------------------------------------------------------------------------------------------------------------------

def generate_robustness_scenarios(
    base_configuration: Dict[str, Any]
) -> Dict[str, Dict[str, Any]]:
    """
    Generates a set of configuration variants to test the robustness of the HAFRI model.

    This function creates deep copies of the baseline configuration and applies specific
    modifications to simulate different methodological assumptions or sensitivity conditions.

    Scenarios defined:
    1. **Baseline**: The original study configuration.
    2. **Strict_Consistency**: Lowers the AHP Consistency Ratio (CR) threshold to 0.05.
       This tests sensitivity to expert inclusion/exclusion based on stricter consistency standards.
    3. **Relaxed_Consistency**: Raises the AHP CR threshold to 0.20.
       This tests sensitivity to including more marginal expert judgments.
    4. **Alternative_Normalization**: Modifies the SAW normalization logic (conceptually)
       to test sensitivity to the scaling method (though the pipeline logic is fixed,
       this scenario acts as a placeholder for configuration-driven logic changes).

    Args:
        base_configuration (Dict[str, Any]): The validated baseline study configuration.

    Returns:
        Dict[str, Dict[str, Any]]: A dictionary mapping scenario names to their modified configurations.
    """
    scenarios: Dict[str, Dict[str, Any]] = {}

    # 1. Baseline Scenario
    scenarios["Baseline"] = copy.deepcopy(base_configuration)

    # 2. Strict Consistency Scenario (CR < 0.05)
    # This tests if the results hold when only the most consistent experts are used.
    config_strict = copy.deepcopy(base_configuration)
    # Navigate safely to the nested key
    if "ahp_parameters" in config_strict and "consistency_threshold" in config_strict["ahp_parameters"]:
        config_strict["ahp_parameters"]["consistency_threshold"]["max_allowed_value"] = 0.05
    scenarios["Strict_Consistency"] = config_strict

    # 3. Relaxed Consistency Scenario (CR < 0.20)
    # This tests if the results hold when we include experts with higher inconsistency.
    config_relaxed = copy.deepcopy(base_configuration)
    if "ahp_parameters" in config_relaxed and "consistency_threshold" in config_relaxed["ahp_parameters"]:
        config_relaxed["ahp_parameters"]["consistency_threshold"]["max_allowed_value"] = 0.20
    scenarios["Relaxed_Consistency"] = config_relaxed

    # 4. Alternative Normalization (Conceptual)
    # We flag this in the config. The pipeline would need to respect this flag.
    # For now, we set a metadata flag to indicate this intent.
    config_alt_norm = copy.deepcopy(base_configuration)
    config_alt_norm["saw_parameters"]["normalization_logic"]["method"] = "alternative"
    scenarios["Alternative_Normalization"] = config_alt_norm

    return scenarios

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 2: Execute orchestrator for each scenario
# -------------------------------------------------------------------------------------------------------------------------------

def execute_robustness_scenarios(
    scenarios: Dict[str, Dict[str, Any]],
    raw_expert_survey_df: pd.DataFrame,
    raw_financial_statement_df: pd.DataFrame
) -> Dict[str, Dict[str, Any]]:
    """
    Executes the full HAFRI pipeline for each defined robustness scenario.

    This function iterates through the scenario configurations, runs the pipeline,
    and captures the results. It includes error handling to ensure that a failure
    in one scenario (e.g., no experts meeting strict consistency) does not crash
    the entire analysis.

    Args:
        scenarios (Dict): Dictionary of scenario configurations.
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.
        raw_financial_statement_df (pd.DataFrame): The raw financial statement data.

    Returns:
        Dict[str, Dict[str, Any]]: A dictionary mapping scenario names to their pipeline results.
                                   If a scenario fails, the result dict contains an 'error' key.
    """
    scenario_results: Dict[str, Dict[str, Any]] = {}

    for name, config in scenarios.items():
        # print(f"Executing Scenario: {name}...") # In production, use logger
        try:
            # Execute the pipeline using the orchestrator from Task 25
            # We assume run_hafri_pipeline is available in the scope
            result = run_hafri_pipeline(
                raw_expert_survey_df,
                raw_financial_statement_df,
                config
            )
            scenario_results[name] = result
        except Exception as e:
            # Capture the error and continue with other scenarios
            # print(f"Scenario {name} failed: {e}") # In production, use logger
            scenario_results[name] = {"error": str(e)}

    return scenario_results

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 3: Quantify robustness of year rankings
# -------------------------------------------------------------------------------------------------------------------------------

def quantify_rank_stability(
    scenario_results: Dict[str, Dict[str, Any]]
) -> pd.DataFrame:
    """
    Aggregates and quantifies the stability of financial year rankings across all successful scenarios.

    Computes the following statistics for each fiscal year:
    - Mean Rank: The average ranking position across scenarios.
    - Std Dev Rank: The volatility of the ranking.
    - Min/Max Rank: The range of rankings observed.
    - Rank Count: Number of scenarios where the year was ranked (should match successful scenarios).

    Args:
        scenario_results (Dict): The dictionary of results from execute_robustness_scenarios.

    Returns:
        pd.DataFrame: A summary table indexed by 'fiscal_year' containing rank statistics,
                      sorted by Mean Rank (ascending).
    """
    rank_series_list: List[pd.Series] = []
    scenario_names: List[str] = []

    for name, result in scenario_results.items():
        # Skip failed scenarios
        if "error" in result:
            continue

        # Extract the 'Rank' column from the ranking DataFrame
        # The ranking DataFrame is indexed by fiscal_year
        if "ranking" in result and isinstance(result["ranking"], pd.DataFrame):
            ranks = result["ranking"]["Rank"]
            ranks.name = name  # Rename series to scenario name
            rank_series_list.append(ranks)
            scenario_names.append(name)

    if not rank_series_list:
        raise ValueError("No scenarios completed successfully. Cannot quantify robustness.")

    # Concatenate into a single DataFrame (rows=years, cols=scenarios)
    # axis=1 aligns by index (fiscal_year)
    all_ranks_df = pd.concat(rank_series_list, axis=1)

    # Compute row-wise statistics
    robustness_summary = pd.DataFrame(index=all_ranks_df.index)
    robustness_summary["Mean_Rank"] = all_ranks_df.mean(axis=1)
    robustness_summary["Std_Rank"] = all_ranks_df.std(axis=1)
    robustness_summary["Min_Rank"] = all_ranks_df.min(axis=1)
    robustness_summary["Max_Rank"] = all_ranks_df.max(axis=1)

    # Add the individual scenario ranks for detailed inspection
    robustness_summary = pd.concat([robustness_summary, all_ranks_df], axis=1)

    # Sort by Mean Rank to show the consensus ranking
    robustness_summary.sort_values("Mean_Rank", inplace=True)

    return robustness_summary

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def conduct_robustness_analysis(
    base_configuration: Dict[str, Any],
    raw_expert_survey_df: pd.DataFrame,
    raw_financial_statement_df: pd.DataFrame
) -> Tuple[Dict[str, Dict[str, Any]], pd.DataFrame]:
    """
    Orchestrator function for Task 26: Performs a comprehensive robustness analysis of the HAFRI model.

    Pipeline:
    1. Generate a set of robustness scenarios (Baseline, Strict/Relaxed Consistency, etc.).
    2. Execute the full HAFRI pipeline for each scenario, capturing results and errors.
    3. Aggregate and quantify the stability of the resulting year rankings.

    Args:
        base_configuration (Dict): The baseline study configuration.
        raw_expert_survey_df (pd.DataFrame): The raw expert survey data.
        raw_financial_statement_df (pd.DataFrame): The raw financial statement data.

    Returns:
        Tuple[Dict, pd.DataFrame]:
            - scenario_results: A dictionary of raw results for each scenario.
            - robustness_summary: A DataFrame summarizing rank stability statistics per year.
    """
    # Step 1: Generate Scenarios
    scenarios = generate_robustness_scenarios(base_configuration)

    # Step 2: Execute Scenarios
    scenario_results = execute_robustness_scenarios(
        scenarios,
        raw_expert_survey_df,
        raw_financial_statement_df
    )

    # Step 3: Quantify Stability
    robustness_summary = quantify_rank_stability(scenario_results)

    return scenario_results, robustness_summary


In [None]:
# Task 27 – Cross-check final outputs against the study's reported results

# ==============================================================================
# Task 27: Cross-check final outputs against the study's reported results
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 1: Compare global AHP weights
# -------------------------------------------------------------------------------------------------------------------------------

def compare_ahp_weights(
    global_weights: np.ndarray,
    hierarchy: 'AHPHierarchy'
) -> pd.DataFrame:
    """
    Aggregates global weights by main criteria group and compares them with the
    values reported in the study (Table 9/Figure 2).

    Reported values:
    - CFR: 45.9% (0.459)
    - LR: 24.3% (0.243)
    - IR: 15.2% (0.152)
    - CSR: 14.6% (0.146)

    Args:
        global_weights (np.ndarray): The computed global weight vector (length 34).
        hierarchy (AHPHierarchy): The hierarchy configuration.

    Returns:
        pd.DataFrame: Comparison table for main criteria weights.
    """
    # Expected values from the paper
    expected_main_weights = {
        "CFR": 0.459,
        "LR": 0.243,
        "IR": 0.152,
        "CSR": 0.146
    }

    # Aggregate computed weights by main group
    computed_main_weights = {}
    canonical_order = hierarchy.canonical_ratio_order

    # Create a map of ratio -> weight
    weight_map = {ratio: w for ratio, w in zip(canonical_order, global_weights)}

    for main_crit in ["CFR", "LR", "IR", "CSR"]:
        # Get all sub-criteria for this main group
        # We need to find the sub-level name first
        # Mapping: CSR -> Sub_CSR
        sub_level = f"Sub_{main_crit}"
        sub_criteria = hierarchy.get_criteria(sub_level)

        # Sum global weights of sub-criteria
        total_weight = sum(weight_map[s] for s in sub_criteria)
        computed_main_weights[main_crit] = total_weight

    # Create comparison DataFrame
    df = pd.DataFrame([
        {"Criterion": k, "Computed": v, "Reported": expected_main_weights[k]}
        for k, v in computed_main_weights.items()
    ])

    df["Difference"] = df["Computed"] - df["Reported"]
    df["Abs_Diff"] = df["Difference"].abs()

    return df.set_index("Criterion")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 2: Compare composite risk scores and ranking
# -------------------------------------------------------------------------------------------------------------------------------

def compare_risk_scores(
    ranking_df: pd.DataFrame
) -> pd.DataFrame:
    """
    Compares the computed normalized risk indices (A_t) for key years with the
    values reported in the study.

    Reported values (approximate from text/tables):
    - 2016: 0.133
    - 2009: 0.080

    Args:
        ranking_df (pd.DataFrame): The computed ranking DataFrame containing 'A_t'.

    Returns:
        pd.DataFrame: Comparison table for key years.
    """
    # Key years mentioned with specific values in the text
    key_years = {
        2016: 0.133,
        2009: 0.080
    }

    comparison_data = []

    for year, reported_val in key_years.items():
        if year in ranking_df.index:
            computed_val = ranking_df.loc[year, "A_t"]
            comparison_data.append({
                "Fiscal_Year": year,
                "Computed_A_t": computed_val,
                "Reported_A_t": reported_val,
                "Difference": computed_val - reported_val
            })

    return pd.DataFrame(comparison_data).set_index("Fiscal_Year")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 3: Investigate and document any discrepancies
# -------------------------------------------------------------------------------------------------------------------------------

def generate_discrepancy_report(
    weight_comparison: pd.DataFrame,
    score_comparison: pd.DataFrame,
    rank_comparison: pd.DataFrame
) -> Dict[str, Any]:
    """
    Analyzes the comparison tables and generates a summary of discrepancies.

    Args:
        weight_comparison (pd.DataFrame): From Step 1.
        score_comparison (pd.DataFrame): From Step 2.
        rank_comparison (pd.DataFrame): From Task 24 (passed in via orchestrator).

    Returns:
        Dict[str, Any]: A structured report of findings.
    """
    # Weight analysis
    max_weight_diff = weight_comparison["Abs_Diff"].max()
    weight_status = "MATCH" if max_weight_diff < 0.01 else "MISMATCH"

    # Rank analysis
    # Check if 2016 is Rank 1 and 2009 is Rank 10
    try:
        rank_2016 = rank_comparison.loc[2016, "Rank"]
        rank_2009 = rank_comparison.loc[2009, "Rank"]
        rank_status = "MATCH" if (rank_2016 == 1 and rank_2009 == 10) else "MISMATCH"
    except KeyError:
        rank_status = "ERROR (Missing Years)"

    return {
        "weight_consistency": {
            "status": weight_status,
            "max_diff": max_weight_diff
        },
        "rank_consistency": {
            "status": rank_status,
            "rank_2016": int(rank_2016) if 'rank_2016' in locals() else None,
            "rank_2009": int(rank_2009) if 'rank_2009' in locals() else None
        },
        "score_consistency": score_comparison.to_dict(orient="index")
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_results_against_study(
    global_weights: np.ndarray,
    hierarchy: 'AHPHierarchy',
    ranking_df: pd.DataFrame,
    comparison_df: pd.DataFrame
) -> Dict[str, Any]:
    """
    Orchestrator function for Task 27: Validates computed results against the study's reported figures.

    Pipeline:
    1. Compare aggregated main criteria weights.
    2. Compare specific A_t scores for key years.
    3. Generate a discrepancy report summarizing findings.

    Args:
        global_weights (np.ndarray): Computed global weights.
        hierarchy (AHPHierarchy): Hierarchy configuration.
        ranking_df (pd.DataFrame): Computed ranking.
        comparison_df (pd.DataFrame): Rank comparison from Task 24.

    Returns:
        Dict[str, Any]: Validation results including comparison tables and summary report.
    """
    # Step 1: Weights
    weight_comp = compare_ahp_weights(global_weights, hierarchy)

    # Step 2: Scores
    score_comp = compare_risk_scores(ranking_df)

    # Step 3: Report
    report = generate_discrepancy_report(weight_comp, score_comp, comparison_df)

    return {
        "weight_comparison": weight_comp,
        "score_comparison": score_comp,
        "summary_report": report
    }


In [None]:
# Task 28 – Document all implementation choices and package final outputs

# ==============================================================================
# Task 28: Document all implementation choices and package final outputs
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 1: Create a comprehensive technical report
# -------------------------------------------------------------------------------------------------------------------------------

def generate_technical_report(
    results: Dict[str, Any],
    study_configuration: Dict[str, Any]
) -> str:
    """
    Generates a comprehensive technical report in Markdown format.

    The report documents:
    1. Study Metadata and Scope.
    2. Methodology (AHP and SAW equations).
    3. Implementation Details (Data cleaning, assumptions).
    4. Results (Global weights, Rankings).
    5. Validation (Comparison with original study).

    Args:
        results (Dict): The final results dictionary from the pipeline.
        study_configuration (Dict): The study configuration.

    Returns:
        str: The complete technical report as a Markdown string.
    """
    meta = study_configuration["metadata"]

    report = f"""# Technical Report: {meta['study_title']}

    ## 1. Executive Summary
    This report documents the construction and validation of the Heuristic-Augmented Financial Risk Index (HAFRI) for {meta['target_entity']}.

    The model integrates expert heuristics via the Analytic Hierarchy Process (AHP) with objective financial data using Simple Additive Weighting (SAW).

    ## 2. Methodology

    ### 2.1 Analytic Hierarchy Process (AHP)
    Weights were derived from pairwise comparisons provided by {meta['expert_panel']['count']} experts.
    - **Consistency Check**: Matrices with Consistency Ratio (CR) > 0.10 were rejected.
    - **Aggregation**: Arithmetic mean of individual weight vectors.
    - **Global Weights**: Computed via hierarchical composition: $w_{{global}} = w_{{main}} \\times w_{{local}}$.

    ### 2.2 Simple Additive Weighting (SAW)
    Financial ratios were normalized to a [0, 1] risk scale:
    - **Benefit Criteria**: $r_{{tj}} = \\frac{{x_j^+ - x_{{tj}}}}{{x_j^+ - x_j^-}}$
    - **Cost Criteria**: $r_{{tj}} = \\frac{{x_{{tj}} - x_j^-}}{{x_j^+ - x_j^-}}$

    Composite Risk Score: $V_t = \\sum_j w_j r_{{tj}}$

    ## 3. Results

    ### 3.1 Global Weights (Top 5)
    (See full CSV for details)
    """
    # Add top weights
    weights = pd.Series(results["global_weights"], index=results["decision_matrix"].columns)
    top_weights = weights.sort_values(ascending=False).head(5)
    for idx, val in top_weights.items():
        report += f"- **{idx}**: {val:.4f}\n"

    report += """
    ### 3.2 Financial Year Ranking
    """
    # Add ranking table
    ranking = results["ranking"][["A_t", "Rank"]]
    report += ranking.to_markdown()

    report += """
    ## 4. Validation
    Comparison with original study results:
    """
    # Add comparison summary
    comp_summary = results["comparison"]
    report += comp_summary.to_markdown()

    return report

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 2: Persist all intermediate and final data artifacts
# -------------------------------------------------------------------------------------------------------------------------------

def serialize_artifacts(
    results: Dict[str, Any],
    study_configuration: Dict[str, Any]
) -> Dict[str, str]:
    """
    Serializes all key data artifacts into string formats (CSV/JSON) suitable for file storage.

    Args:
        results (Dict): Pipeline results.
        study_configuration (Dict): Configuration.

    Returns:
        Dict[str, str]: Mapping of filenames to content strings.
    """
    artifacts = {}

    # 1. Configuration
    artifacts["study_configuration.json"] = json.dumps(study_configuration, indent=4)

    # 2. Global Weights
    weights_series = pd.Series(results["global_weights"], index=results["decision_matrix"].columns)
    artifacts["global_weights.csv"] = weights_series.to_csv(header=["Weight"])

    # 3. Decision Matrix (X)
    artifacts["decision_matrix_X.csv"] = results["decision_matrix"].to_csv()

    # 4. Normalized Matrix (R)
    artifacts["normalized_matrix_R.csv"] = results["normalized_matrix"].to_csv()

    # 5. Weighted Matrix (V)
    artifacts["weighted_matrix_V.csv"] = results["weighted_matrix"].to_csv()

    # 6. Final Ranking
    artifacts["final_ranking.csv"] = results["ranking"].to_csv()

    # 7. Validation Comparison
    artifacts["validation_comparison.csv"] = results["comparison"].to_csv()

    return artifacts

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 3: Prepare reproducibility package for external validation
# -------------------------------------------------------------------------------------------------------------------------------

def generate_readme() -> str:
    """
    Generates a README.md file with instructions for reproducing the analysis.

    Returns:
        str: Content of README.md.
    """
    return """# HAFRI Reproducibility Package

    This package contains all data, configuration, and results for the Heuristic-Augmented Financial Risk Index.

    ## Contents
    - `study_configuration.json`: Full parameter set.
    - `global_weights.csv`: Computed AHP weights.
    - `decision_matrix_X.csv`: Raw financial ratios.
    - `normalized_matrix_R.csv`: Risk-coded scores [0,1].
    - `final_ranking.csv`: Risk ranking of fiscal years.
    - `technical_report.md`: Detailed methodology and analysis.

    ## Reproduction Steps
    1. Load `study_configuration.json`.
    2. Ingest raw survey and financial data (schema defined in config).
    3. Run the `run_hafri_pipeline` orchestrator.
    4. Compare outputs with the provided CSVs.

    ## Requirements
    - Python 3.8+
    - pandas
    - numpy
    """

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def package_project_outputs(
    results: Dict[str, Any],
    study_configuration: Dict[str, Any]
) -> Dict[str, str]:
    """
    Orchestrator function for Task 28: Generates documentation and serializes artifacts.

    Pipeline:
    1. Generate Technical Report.
    2. Serialize Data Artifacts (CSV/JSON).
    3. Generate README.

    Args:
        results (Dict): Pipeline results.
        study_configuration (Dict): Study configuration.

    Returns:
        Dict[str, str]: A dictionary mapping filenames to their content, ready for writing to disk.
    """
    # Step 1: Report
    report_content = generate_technical_report(results, study_configuration)

    # Step 2: Artifacts
    package = serialize_artifacts(results, study_configuration)

    # Step 3: README
    readme_content = generate_readme()

    # Add documents to package
    package["technical_report.md"] = report_content
    package["README.md"] = readme_content

    return package


In [None]:
# Top-Level Orchestrator Function

# ==============================================================================
# Top-Level Orchestrator: HAFRI Master Pipeline
# ==============================================================================

def execute_hafri_master_pipeline(
    raw_expert_survey_df: pd.DataFrame,
    raw_financial_statement_df: pd.DataFrame,
    study_configuration: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Master orchestrator function for the Heuristic-Augmented Financial Risk Index (HAFRI) project.

    This function sequentially executes the four major phases of the project:
    1.  **Baseline Analysis (Task 25)**: Runs the full AHP+SAW pipeline to generate risk indices and rankings.
    2.  **Robustness Analysis (Task 26)**: Conducts sensitivity analysis across multiple scenarios.
    3.  **Validation (Task 27)**: Cross-checks the baseline results against the original study's reported figures.
    4.  **Packaging (Task 28)**: Generates a comprehensive technical report and serializes all data artifacts.

    Args:
        raw_expert_survey_df (pd.DataFrame): The raw expert pairwise comparison data.
        raw_financial_statement_df (pd.DataFrame): The raw financial statement data.
        study_configuration (Dict[str, Any]): The complete study configuration dictionary.

    Returns:
        Dict[str, Any]: A master dictionary containing the artifacts from all phases:
            - 'baseline_results': Output from Task 25 (weights, matrices, rankings).
            - 'robustness_results': Output from Task 26 (scenario results, stability summary).
            - 'validation_results': Output from Task 27 (comparison tables, discrepancy report).
            - 'final_package': Output from Task 28 (serialized files, report text).
    """
    logger = logging.getLogger(__name__)
    logger.info("Initializing HAFRI Master Pipeline...")

    # -------------------------------------------------------------------------
    # Phase 1: Baseline Analysis (Task 25)
    # -------------------------------------------------------------------------
    logger.info(">>> Executing Phase 1: Baseline Analysis (Task 25)")
    baseline_results = run_hafri_pipeline(
        raw_expert_survey_df,
        raw_financial_statement_df,
        study_configuration
    )

    # -------------------------------------------------------------------------
    # Phase 2: Robustness Analysis (Task 26)
    # -------------------------------------------------------------------------
    logger.info(">>> Executing Phase 2: Robustness Analysis (Task 26)")
    scenario_results, robustness_summary = conduct_robustness_analysis(
        study_configuration,
        raw_expert_survey_df,
        raw_financial_statement_df
    )

    robustness_results = {
        "scenario_details": scenario_results,
        "stability_summary": robustness_summary
    }

    # -------------------------------------------------------------------------
    # Phase 3: Validation (Task 27)
    # -------------------------------------------------------------------------
    logger.info(">>> Executing Phase 3: Validation against Study (Task 27)")

    # Reconstruct the hierarchy object to pass to the validator
    hierarchy = initialize_ahp_hierarchy()

    validation_results = validate_results_against_study(
        baseline_results["global_weights"],
        hierarchy,
        baseline_results["ranking"],
        baseline_results["comparison"]
    )

    # -------------------------------------------------------------------------
    # Phase 4: Packaging (Task 28)
    # -------------------------------------------------------------------------
    logger.info(">>> Executing Phase 4: Packaging Outputs (Task 28)")

    # Combine relevant results for the report generator
    # The report generator expects 'results' (baseline) and 'comparison' (from validation)
    # We update baseline_results with the detailed validation info for the report
    baseline_results["comparison_details"] = validation_results

    final_package = package_project_outputs(
        baseline_results,
        study_configuration
    )

    # -------------------------------------------------------------------------
    # Final Assembly
    # -------------------------------------------------------------------------
    master_artifacts = {
        "baseline_results": baseline_results,
        "robustness_results": robustness_results,
        "validation_results": validation_results,
        "final_package": final_package
    }

    logger.info("HAFRI Master Pipeline completed successfully.")
    return master_artifacts
