# **`README.md`**

# AIMM-X: An Explainable Market Integrity Monitoring System with Multi-Source Attention Signals and Transparent Scoring

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2601.15304v1-b31b1b.svg)](https://arxiv.org/abs/2601.15304v1)
[![Journal](https://img.shields.io/badge/Journal-ArXiv%20Preprint-003366)](https://arxiv.org/abs/2601.15304v1)
[![Year](https://img.shields.io/badge/Year-2026-purple)](https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system)
[![Discipline](https://img.shields.io/badge/Discipline-Market%20Microstructure%20%7C%20RegTech-00529B)](https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system)
[![Data Sources](https://img.shields.io/badge/Data-Polygon.io%20%7C%20Reddit%20%7C%20Wikipedia-lightgrey)](https://polygon.io/)
[![Core Method](https://img.shields.io/badge/Method-Hysteresis%20Segmentation-orange)](https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system)
[![Analysis](https://img.shields.io/badge/Analysis-Factor%20Decomposition-red)](https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system)
[![Validation](https://img.shields.io/badge/Validation-Retrospective%20Case%20Studies-green)](https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system)
[![Robustness](https://img.shields.io/badge/Robustness-Rolling%20Baseline%20Z--Scores-yellow)](https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%230C55A5.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![YAML](https://img.shields.io/badge/YAML-%23CB171E.svg?style=flat&logo=yaml&logoColor=white)](https://yaml.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
[![Open Source](https://img.shields.io/badge/Open%20Source-%E2%9D%A4-brightgreen)](https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system)

**Repository:** `https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system`

**Owner:** 2026 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2026 paper entitled **"An Explainable Market Integrity Monitoring System with Multi-Source Attention Signals and Transparent Scoring"** by:

*   **Sandeep Neela** (Independent Researcher)

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from the ingestion and rigorous validation of market microstructure and attention data to the detection of suspicious trading windows via hysteresis segmentation, culminating in the generation of interpretable integrity scores and factor attributions.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `run_aimm_x_pipeline`](#key-callable-run_aimm_x_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Neela (2026). The core of this repository is the iPython Notebook `explainable_market_integrity_monitoring_system_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline addresses the critical challenge of **market integrity monitoring** by moving away from opaque, proprietary "black-box" models toward a transparent, auditable "glass-box" approach.

The paper argues that effective surveillance requires explainability—analysts must understand *why* a window was flagged—and accessibility to public data sources. This codebase operationalizes the proposed solution: **AIMM-X**, a system that:
-   **Validates** data integrity using strict OHLC consistency checks ($H_t \ge \max(O_t, C_t)$) and precise missingness semantics (NaN vs. 0).
-   **Fuses** multi-source attention signals (Reddit, StockTwits, News, Wikipedia, Google Trends) into a unified metric of public interest.
-   **Detects** anomalies using a robust **Hysteresis State Machine** (Schmitt Trigger) that prevents alert fragmentation.
-   **Scores** windows using a linear **Integrity Score ($M$)** decomposed into six interpretable factors ($\phi_1 \dots \phi_6$), enabling clear attribution of alerts to price shocks, volatility anomalies, or attention spikes.

## Theoretical Background

The implemented methods combine techniques from Financial Econometrics, Signal Processing, and Explainable AI.

**1. Multi-Source Attention Fusion ($A_{i,t}$):**
A unified attention signal is constructed by aggregating normalized proxies from diverse sources, capturing the "hype" dimension of market activity.
$$ A_{i,t} = \sum_{s \in \mathcal{S}} w_s \cdot \tilde{a}_{s,i,t} $$
where $\tilde{a}_{s,i,t}$ represents the rolling z-score of source $s$ for ticker $i$ at time $t$.

**2. Statistical Deviation Detection:**
The system employs dynamic baselines to adapt to changing market regimes, computing standardized deviations (z-scores) for returns ($r$), volatility ($\sigma$), and attention ($A$).
$$ z_{i,t}^{(x)} = \frac{x_{i,t} - \mu_{i,t}^{(x)}}{\hat{\sigma}_{i,t}^{(x)} + \epsilon} $$
A composite strength score $s_{i,t}$ aggregates these deviations to drive detection.

**3. Hysteresis-Based Segmentation:**
To avoid "chattering" (rapid on/off switching of alerts due to noise), the system uses dual-threshold hysteresis logic:
-   **Trigger:** A window opens when $s_{i,t} > \theta_{\text{high}}$.
-   **Sustain:** A window remains open while $s_{i,t} > \theta_{\text{low}}$.
-   **Exit:** A window closes only after $s_{i,t} \le \theta_{\text{low}}$ for a specified gap tolerance $g$.

**4. Interpretable Integrity Score ($M$):**
Detected windows are ranked by a score $M(w)$ that is fully decomposable into additive evidence factors:
$$ M(w) = \sum_{k=1}^{6} \omega_k \cdot \phi_k(w) $$
Factors include Return Shock Intensity ($\phi_1$), Volatility Anomaly ($\phi_2$), Attention Spike Magnitude ($\phi_3$), and Co-movement Alignment ($\phi_4$).

## Features

The provided iPython Notebook (`explainable_market_integrity_monitoring_system_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The pipeline is decomposed into 17 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters (thresholds, weights, lookback windows) are managed in an external `config.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks schema integrity, OHLC consistency, and exchange calendar alignment.
-   **Deterministic Execution:** Enforces reproducibility through seed control, deterministic sorting, and rigorous logging of all stochastic outputs.
-   **Comprehensive Audit Logging:** Generates detailed logs of every processing step, including quarantine counts and filter statistics.
-   **Reproducible Artifacts:** Generates structured `PipelineResult` objects containing raw window lists, filtered top-N tables, and factor summary statistics.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Configuration & Validation (Task 1):** Loads and validates the study configuration, enforcing parameter constraints and determinism requirements.
2.  **Data Ingestion & Cleansing (Tasks 2-3):** Validates panel schema, enforces OHLC consistency, and strictly handles missingness semantics (NaN vs 0).
3.  **Calendar Enforcement (Task 4):** Aligns data to the canonical NYSE/Nasdaq trading session grid.
4.  **Attention Processing (Tasks 5-7):** Aligns, normalizes, and fuses multi-source attention signals into a unified metric.
5.  **Feature Engineering (Tasks 8-9):** Computes log returns and rolling realized volatility proxies.
6.  **Deviation Detection (Tasks 10-11):** Computes rolling baselines, z-scores, and the composite strength score.
7.  **Window Segmentation (Task 12):** Applies the hysteresis state machine to detect suspicious time intervals.
8.  **Scoring & Attribution (Tasks 13-14):** Computes $\phi$-factors and the composite Integrity Score $M$ with full decomposition.
9.  **Ranking & Filtering (Task 15):** Ranks windows by score and applies warmup/artifact filters.
10. **Artifact Generation (Task 16):** Produces final output tables and summary statistics.
11. **Orchestration (Task 17):** Unifies all components into a single `run_aimm_x_pipeline` function.

## Core Components (Notebook Structure)

The notebook is structured as a logical pipeline with modular orchestrator functions for each of the 17 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `run_aimm_x_pipeline`

The project is designed around a single, top-level user-facing interface function:

-   **`run_aimm_x_pipeline`:** This master orchestrator function runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, managing data flow between validation, cleansing, detection, scoring, and reporting modules.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scipy`, `pyyaml`.
-   Optional dependencies: `exchange_calendars` (for precise trading session generation).

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system.git
    cd explainable_market_integrity_monitoring_system
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scipy pyyaml exchange_calendars
    ```

## Input Data Structure

The pipeline requires a primary DataFrame `df_raw_panel` with a MultiIndex `(date, ticker)` and the following columns:

**Market Microstructure:**
1.  **`open_price`**: Float.
2.  **`high_price`**: Float, $\ge \max(Open, Close)$.
3.  **`low_price`**: Float, $\le \min(Open, Close)$.
4.  **`close_price`**: Float, $>0$.
5.  **`volume`**: Float/Int, $>0$.

**Attention Signals (Nullable):**
1.  **`reddit_posts`**: Float (count).
2.  **`stocktwits_msgs`**: Float (count).
3.  **`wiki_views`**: Float (count).
4.  **`news_articles`**: Float (count).
5.  **`google_trends`**: Float (index).

*Note: `NaN` in attention columns represents "No Coverage", while `0.0` represents "No Activity".*

## Usage

The notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell, which demonstrates how to use the top-level `run_aimm_x_pipeline` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load the master configuration from the YAML file.
    config = load_study_configuration("config.yaml")
    
    # 2. Load raw datasets (Example using synthetic generator provided in the notebook)
    # In production, load from CSV/Parquet: pd.read_parquet(...)
    df_raw_panel = generate_synthetic_panel(config)

    # 3. Execute the entire replication study.
    result = run_aimm_x_pipeline(df_raw_panel, config)
    
    # 4. Access results
    print(result.df_top_n.head())
    print(result.audit_log)
```

## Output Structure

The pipeline returns a `PipelineResult` object containing:
-   **`config_snapshot`**: The resolved configuration dictionary used for the run.
-   **`audit_log`**: A structured log of execution metadata, validation stats, and step completion.
-   **`df_windows_raw`**: The complete set of detected windows with all scores and factors.
-   **`df_windows_filtered`**: The subset of windows passing quality filters (warmup, artifacts).
-   **`df_top_n`**: The top-ranked suspicious windows formatted for reporting.
-   **`df_phi_summary`**: Summary statistics for factor contributions.
-   **`intermediate_series`**: Dictionary containing computed time-series ($r$, $\sigma$, $A$, $s$, z-scores) for debugging.

## Project Structure

```
explainable_market_integrity_monitoring_system/
│
├── explainable_market_integrity_monitoring_system_draft.ipynb   # Main implementation notebook
├── config.yaml                                                  # Master configuration file
├── requirements.txt                                             # Python package dependencies
│
├── LICENSE                                                      # MIT Project License File
└── README.md                                                    # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify study parameters such as:
-   **Universe:** `universe_tickers` list.
-   **Detection Logic:** `baseline_window_B`, `theta_high`, `theta_low`, `gap_tolerance_g`.
-   **Scoring Weights:** `alpha` weights for composite score, `omega` weights for integrity score.
-   **Filters:** `exclude_warmup`, `max_z_score_cutoff`.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **High-Frequency Data:** Adapting the pipeline for 5-minute or 1-minute bars.
-   **Real-Time API Integration:** Connecting to live feeds for Reddit/Twitter data.
-   **Advanced Normalization:** Implementing robust scalers (e.g., Median Absolute Deviation) for fat-tailed distributions.
-   **Causal Inference:** Integrating Granger causality tests to determine lead-lag relationships between attention and price.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{neela2026aimmx,
  title={AIMM-X: An Explainable Market Integrity Monitoring System Using Multi-Source Attention Signals and Transparent Scoring},
  author={Neela, Sandeep},
  journal={arXiv preprint arXiv:2601.15304v1},
  year={2026}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2026). Explainable Market Integrity Monitoring System: An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/explainable_market_integrity_monitoring_system
```

## Acknowledgments

-   Credit to **Sandeep Neela** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, and PyYAML**.

--

*This README was generated based on the structure and content of the `explainable_market_integrity_monitoring_system_draft.ipynb` notebook and follows best practices for research software documentation.*

# Paper

Title: "*An Explainable Market Integrity Monitoring System with Multi-Source Attention Signals and Transparent Scoring*"

Authors: Sandeep Neela

E-Journal Submission Date: 10 January 2026

Link: https://arxiv.org/abs/2601.15304v1

Abstract:

Market integrity monitoring is difficult because suspicious price/volume behavior can arise from many benign mechanisms, while modern detection systems often rely on opaque models that are hard to audit and communicate. We present AIMM-X, an explainable monitoring pipeline that combines market microstructure-style signals derived from OHLCV time series with multi-source public attention signals (e.g., news and online discussion proxies) to surface time windows that merit analyst review. The system detects candidate anomalous windows using transparent thresholding and aggregation, then assigns an interpretable integrity score decomposed into a small set of additive components, allowing practitioners to trace why a window was flagged and which factors drove the score. We provide an end-to-end, reproducible implementation that downloads data, constructs attention features, builds unified panels, detects windows, computes component signals, and generates summary figures/tables. Our goal is not to label manipulation, but to provide a practical, auditable screening tool that supports downstream investigation by compliance teams, exchanges, or researchers.

# Summary

### **Executive Summary**

The paper introduces **AIMM-X**, an open-source, explainable framework for detecting market manipulation and structural anomalies. Addressing the "black-box" nature and data exclusivity of proprietary surveillance systems (e.g., NASDAQ Smarts), the authors propose a methodology relying solely on **publicly accessible data** (OHLCV and social attention signals). The system identifies "suspicious windows"—intervals where price, volatility, and public attention exhibit statistically significant co-movement—and ranks them using a decomposable **Integrity Score ($M$)**. The framework is positioned not as an accusation engine, but as a **triage tool** for analysts, prioritizing transparency and reproducibility over opaque efficacy.

### **Motivation and Problem Statement**

The research identifies a critical gap in current market surveillance capabilities:
*   **The Reproducibility Crisis:** Existing detection literature relies on proprietary order-book data (Level 3) and trader IDs, making independent validation impossible for academic researchers.
*   **Black-Box Opacity:** Modern ML anomaly detection often lacks interpretability, failing to explain *why* a specific trading window was flagged—a requirement for regulatory enforcement.
*   **The "GameStop" Paradigm:** Traditional volatility alerts fail to capture the coordinated, attention-driven dynamics characteristic of modern retail-driven market events (e.g., meme stock rallies).

**Objective:** To build an auditable pipeline that surfaces anomalies using only public data, providing a "white-box" alternative to exchange-grade surveillance.

### **Methodology and Architecture**

The AIMM-X pipeline operates in four distinct stages:

#### **A. Panel Construction & Feature Engineering**
The system integrates financial time series with multi-source attention signals.
*   **Market Data:** Daily OHLCV (Open, High, Low, Close, Volume) from Polygon.io.
*   **Attention Signals ($A_{i,t}$):** A fused signal derived from five sources: Reddit, StockTwits, Wikipedia, News, and Google Trends. (Note: For this preprint, stylized proxies calibrated to literature were used).
*   **Derived Metrics:** Log returns ($r_{i,t}$) and rolling volatility ($\sigma_{i,t}$).

#### **B. Statistical Deviation Detection**
The system employs a rolling baseline approach to adapt to changing market regimes.
*   **Baselines:** Rolling mean ($\mu$) and standard deviation ($\hat{\sigma}$) calculated over a window $B=20$ days.
*   **Z-Scores:** Standardized deviations are computed for returns ($z^{(r)}$), volatility ($z^{(\sigma)}$), and attention ($z^{(A)}$).
*   **Composite Strength Score ($s_{i,t}$):** A weighted sum of the absolute Z-scores across channels.

#### **C. Hysteresis-Based Segmentation**
To avoid fragmented alerts, the system uses **hysteresis thresholding** (similar to Canny edge detection in computer vision):
*   **$\theta_{high}$ (3.0):** The threshold required to *initiate* a suspicious window.
*   **$\theta_{low}$ (2.0):** The lower threshold allowing a window to *continue*.
*   **Constraints:** Windows must meet minimum length requirements and bridge small gaps (gap tolerance).

#### **D. The Interpretable Integrity Score ($M$)**
This is the core econometric contribution. Detected windows are ranked by a score $M(w)$, which is a linear combination of six interpretable factors ($\phi$):
$$M(w) = \sum_{k=1}^{6} \omega_k \cdot \phi_k(w)$$

*   **$\phi_1$ (Return Shock):** Magnitude of abnormal returns (squared Z-scores).
*   **$\phi_2$ (Volatility Anomaly):** Unusual volatility independent of direction.
*   **$\phi_3$ (Attention Spike):** Magnitude of social/news attention surge.
*   **$\phi_4$ (Co-movement):** Correlation between price/volatility and attention (detecting coordination).
*   **$\phi_5$ (Recurrence):** Frequency of similar windows in short succession.
*   **$\phi_6$ (Disagreement):** Penalty for divergence between attention sources (e.g., Reddit spikes while News is silent).


### **Experimental Design**

*   **Universe:** 24 tickers selected to stress-test the system, including Meme stocks (GME, AMC), Large-cap Tech (META, NVDA), Crypto-exposed (MSTR, COIN), and ETFs (SPY, TLT).
*   **Period:** January 8, 2024 – December 31, 2024.
*   **Data Resolution:** Daily bars (a limitation acknowledged for future high-frequency integration).


### **Key Results and Findings**

The system detected **233 suspicious windows** across the 24 tickers.

*   **Factor Dominance:** In the current configuration, $\phi_1$ (Return Shock) dominated the scoring variance due to a lack of normalization, leading to a heavy-tailed score distribution.
*   **Top Detection (META, Jan 10-12, 2024):** The highest-scoring window ($M \approx 7.1M$).
    *   *Analysis:* Coincided with market-wide inflation data volatility.
    *   *Verdict:* False positive for manipulation, but true positive for "market stress."
*   **Meme Stock Dynamics (GME, May 2-17, 2024):**
    *   Detected a 12-bar window characterized by sustained price-attention co-movement.
    *   Demonstrated the system's ability to capture "pump" dynamics distinct from sudden news shocks.
*   **Crypto Correlation (MSTR):**
    *   Windows aligned with Bitcoin volatility, demonstrating the system's sensitivity to sector-specific drivers.


### **Critical Analysis & Limitations**

As a reviewer of the methodology, several points regarding rigor and validity stand out:

*   **The Ground Truth Problem:** The paper rightly acknowledges that "ground truth" in manipulation is unobservable without regulatory enforcement data. The authors propose a "triangulation" validation strategy (consistency, face validity, and retrospective enforcement checks).
*   **Data Granularity:** The use of daily bars is a significant constraint. Intraday manipulation (e.g., momentum ignition) requires minute-level or tick-level data, which is planned for Phase 2.
*   **Proxy Data:** The use of "stylized attention proxies" rather than live API feeds in this preprint limits the immediate empirical applicability, though it validates the *pipeline logic*.
*   **Triage vs. Verdict:** The authors ethically frame AIMM-X as a triage system. It produces statistical evidence for human review, not definitive proof of wrongdoing, mitigating risks of false accusation.


### **Conclusion and Future Roadmap**

AIMM-X represents a step toward **democratizing market surveillance**. By decoupling detection from proprietary data, it enables academic scrutiny of surveillance logic.

**Future Work Proposed:**
1.  Integration of 5-minute OHLCV bars to localize anomalies.
2.  Implementation of authenticated APIs for real-time attention data.
3.  **Factor Normalization:** Standardizing $\phi$ factors to prevent return shocks from masking attention anomalies.
4.  **Causal Inference:** Moving beyond correlation to Granger causality tests between attention and price.

**Final Assessment:** The paper provides a robust, mathematically sound framework for *explainable* financial anomaly detection. While currently limited by data resolution, the architecture's transparency offers a significant contribution to the field of computational finance and regulatory technology (RegTech).

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  AIMM-X: An Explainable Market Integrity Monitoring System
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "An Explainable Market Integrity Monitoring
#  System with Multi-Source Attention Signals and Transparent Scoring" by
#  Sandeep Neela (2026). It delivers a computationally tractable system for
#  surveillance triage, enabling the detection of suspicious trading windows
#  characterized by the joint deviation of price, volatility, and public attention.
#
#  Core Methodological Components:
#  • Multi-source attention signal fusion via weighted aggregation
#  • Robust rolling baseline estimation for regime-adaptive anomaly detection
#  • Hysteresis-based segmentation (Schmitt Trigger) for window identification
#  • Interpretable Integrity Score (M) decomposition into additive phi-factors
#  • Rank-based filtering to surface high-priority alerts for analyst review
#
#  Technical Implementation Features:
#  • Vectorized pandas/numpy operations for efficient panel data processing
#  • Strict handling of look-ahead bias via lagged baseline construction
#  • Precise missingness semantics (NaN vs Zero) for sparse attention data
#  • Comprehensive audit logging and artifact generation for reproducibility
#  • Modular architecture supporting independent validation of each pipeline stage
#
#  Paper Reference:
#  Neela, S. (2026). An Explainable Market Integrity Monitoring System with
#  Multi-Source Attention Signals and Transparent Scoring.
#  arXiv preprint arXiv:2601.15304v1.
#  https://arxiv.org/abs/2601.15304v1
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

import logging
import sys
from typing import (
    Any,
    Dict,
    List,
    NamedTuple,
    Optional,
    Tuple,
    Union
)

import numpy as np
import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay

# Optional dependency for precise exchange calendar alignment
try:
    import exchange_calendars as xcals
except ImportError:
    xcals = None

# Configure logging for the pipeline
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


# Implementation

## Draft 1

## **Discussion of Inputs, Processes, Outputs, and Research Role of Key Callables**

Here is the granular analysis of the Inputs, Processes, Outputs, and Research Role for each of the 17 final callables in the AIMM-X pipeline:

### 1. `validate_study_config` (Task 1 Orchestrator)

*   **Inputs:**
    *   `config` (Dictionary): The raw JSON-like configuration object containing metadata, schemas, algorithm parameters, and reproducibility settings.
*   **Processes:**
    *   **Structural Verification:** Checks for the presence of all 11 required top-level keys (e.g., `meta`, `deviation_detection`, `scoring_model`).
    *   **Constraint Validation:** Enforces methodological constraints such as universe size ($N=24$), hysteresis thresholds ($\theta_{\text{high}} > \theta_{\text{low}}$), and weight positivity ($\alpha \ge 0, \omega \ge 0$).
    *   **Determinism Check:** Verifies the existence of reproducibility artifacts like explicit session lists and random seeds.
*   **Outputs:**
    *   `validated_config` (Dictionary): The validated configuration object, guaranteed to be structurally sound and methodologically consistent.
*   **Research Role:**
    *   Implements the **Configuration Management** requirement (Section 3.5), ensuring that all parameters governing the pipeline are "specified in `config.json`, enabling reproducible runs and systematic parameter sweeps." It enforces the logical consistency of the detection thresholds defined in Section 4.3.

### 2. `validate_panel_schema` (Task 2 Orchestrator)

*   **Inputs:**
    *   `df_raw_panel` (DataFrame): The ingested raw data containing market microstructure and attention signals.
    *   `config` (Dictionary): The validated study configuration.
*   **Processes:**
    *   **Index Inspection:** Verifies the DataFrame possesses a MultiIndex with levels `['date', 'ticker']` that is unique and monotonic.
    *   **Column Audit:** Confirms the presence and correct data types of all required columns (e.g., `close_price`, `reddit_posts`), specifically ensuring attention columns support `NaN` for missing coverage.
    *   **Universe Verification:** Checks that the set of tickers in the data exactly matches the 24-ticker universe defined in the configuration.
*   **Outputs:**
    *   `is_valid` (Boolean): A flag indicating whether the schema is compliant. Raises an exception if false.
*   **Research Role:**
    *   Enforces the **Panel Construction** prerequisites (Section 4.1), ensuring the data structure supports the "unified time-series panel" required for vectorized operations. It validates the **Ticker Universe Selection** (Section 3.1) to ensure the study operates on the specific set of 24 high-attention securities.

### 3. `cleanse_panel` (Task 3 Orchestrator)

*   **Inputs:**
    *   `df_raw_panel` (DataFrame): The raw input panel.
    *   `config` (Dictionary): Configuration defining column names and types.
*   **Processes:**
    *   **OHLC Consistency Check:** Identifies rows where $H_{i,t} < \max(O_{i,t}, C_{i,t})$ or $L_{i,t} > \min(O_{i,t}, C_{i,t})$.
    *   **Liquidity Validation:** Flags rows with non-positive close prices ($C_{i,t} \le 0$) or zero volume ($V_{i,t} \le 0$).
    *   **Quarantine:** Segregates invalid rows into a separate DataFrame.
    *   **Type Enforcement:** Casts attention columns to `float64` to strictly distinguish between zero activity ($0.0$) and missing coverage (`NaN`).
*   **Outputs:**
    *   `df_clean` (DataFrame): The subset of methodologically valid rows.
    *   `df_quarantine` (DataFrame): The subset of invalid rows for audit.
*   **Research Role:**
    *   Implements the **Data Quality and Preprocessing** steps (Section 3.2), specifically the checks for "price discontinuities," "plausible volume figures," and "logical constraints" on OHLC relationships. It ensures the "Missingness handling" logic (Section 3.3) is supported by the data types.

### 4. `enforce_trading_calendar` (Task 4 Orchestrator)

*   **Inputs:**
    *   `df_clean` (DataFrame): The cleansed panel data.
    *   `config` (Dictionary): Configuration containing the canonical session list or generation rules.
*   **Processes:**
    *   **Calendar Generation:** Constructs the authoritative list of NYSE/Nasdaq trading sessions for the study period (excluding weekends and holidays).
    *   **Alignment Check:** Iterates through every ticker to verify exact one-to-one mapping between data rows and canonical sessions.
    *   **Audit Reporting:** Identifies and logs any missing sessions (gaps) or extra sessions (calendar mismatches).
*   **Outputs:**
    *   `report` (Dictionary): A summary of alignment status and specific mismatches per ticker.
*   **Research Role:**
    *   Ensures the **Temporal Coverage** fidelity (Section 3.2), guaranteeing the analysis covers the exact "248 trading days per ticker" specified in the experimental design. This prevents artifacts from non-trading days affecting rolling baseline calculations.

### 5. `align_attention_sources` (Task 5 Orchestrator)

*   **Inputs:**
    *   `df_clean` (DataFrame): The cleansed panel data.
    *   `config` (Dictionary): Configuration defining source mappings and alignment rules.
*   **Processes:**
    *   **Extraction:** Isolates raw attention columns mapped to canonical names (e.g., `reddit`, `wiki`).
    *   **Rule Application:** Applies source-specific filling logic. Crucially, it applies forward-filling (`ffill`) *only* to sources where it is methodologically appropriate (e.g., step-function data like Wikipedia views), while preserving raw sparsity for event-driven sources (e.g., Reddit posts).
    *   **Semantic Validation:** Confirms that `NaN` (no coverage) and `0.0` (zero activity) remain distinct.
*   **Outputs:**
    *   `aligned_series` (Dictionary of Series): The aligned attention time-series ready for normalization.
*   **Research Role:**
    *   Implements the **Resampling and Alignment** logic (Section 3.3), ensuring that "all sources align temporally with OHLCV data" while respecting the specific "natural cadences" of different attention proxies.

### 6. `normalize_attention_sources` (Task 6 Orchestrator)

*   **Inputs:**
    *   `aligned_series` (Dictionary of Series): The aligned raw attention signals.
    *   `config` (Dictionary): Configuration defining the baseline window $B$ and stability constant $\epsilon$.
*   **Processes:**
    *   **Rolling Normalization:** For each source $s$, computes a rolling z-score using a strictly lagged baseline to prevent look-ahead bias.
    *   **Transformation:** Applies the formula $\tilde{a}_{s,i,t} = (a_{s,i,t} - \mu_{t}) / (\sigma_{t} + \epsilon)$.
*   **Outputs:**
    *   `normalized_dict` (Dictionary of Series): The normalized signals $\tilde{a}_{s,i,t}$.
*   **Research Role:**
    *   Implements the **Attention Signal Construction** requirement (Section 3.3) to produce "resampled and normalized source signals" $\tilde{a}_{s,t}$ suitable for the weighted sum fusion in Equation (1).

### 7. `fuse_attention` (Task 7 Orchestrator)

*   **Inputs:**
    *   `normalized_dict` (Dictionary of Series): The normalized attention signals.
    *   `config` (Dictionary): Configuration containing fusion weights $w_s$.
*   **Processes:**
    *   **Weighted Aggregation:** Computes the linear combination of normalized sources.
    *   **Strict Propagation:** Enforces logic where if *any* source is `NaN` (missing coverage), the fused result is `NaN`.
*   **Outputs:**
    *   `A_series` (Series): The fused attention signal $A_{i,t}$.
*   **Research Role:**
    *   Accurately implements **Equation (1)** from Section 3.3:
        $$ A_t = \sum_{s \in \mathcal{S}} w_s \cdot \tilde{a}_{s,t} $$
        This creates the unified "Attention" channel used in the multi-source detection logic.

### 8. `compute_log_returns` (Task 8 Orchestrator)

*   **Inputs:**
    *   `df_clean` (DataFrame): The cleansed panel data containing `close_price`.
    *   `config` (Dictionary): Configuration (passed for consistency).
*   **Processes:**
    *   **Log Transformation:** Computes the natural logarithm of the ratio of consecutive close prices.
    *   **Grouping:** Ensures calculations are isolated per ticker.
*   **Outputs:**
    *   `r_series` (Series): The log return series $r_{i,t}$.
*   **Research Role:**
    *   Accurately implements **Equation (2)** from Section 4.1:
        $$ r_{i,t} = \log \left( \frac{C_{i,t}}{C_{i,t-1}} \right) $$
        This transforms non-stationary prices into stationary returns for statistical analysis.

### 9. `compute_volatility` (Task 9 Orchestrator)

*   **Inputs:**
    *   `r_series` (Series): The log return series.
    *   `config` (Dictionary): Configuration defining lookback $L$ and $\epsilon$.
*   **Processes:**
    *   **Lagging:** Shifts returns by 1 to ensure the volatility estimate for time $t$ uses only past data ($t-1 \dots t-L$).
    *   **RMS Calculation:** Computes the rolling root-mean-square of the lagged returns.
*   **Outputs:**
    *   `sigma_series` (Series): The realized volatility proxy $\sigma_{i,t}$.
*   **Research Role:**
    *   Accurately implements **Equation (3)** from Section 4.1:
        $$ \sigma_{i,t} = \sqrt{\frac{1}{L} \sum_{j=1}^{L} r_{i,t-j}^2 + \epsilon} $$
        This provides the "Volatility proxy" used as a distinct channel in the detection model.

### 10. `compute_baselines_and_zscores` (Task 10 Orchestrator)

*   **Inputs:**
    *   `r_series`, `sigma_series`, `A_series` (Series): The three data channels.
    *   `config` (Dictionary): Configuration defining baseline window $B$.
*   **Processes:**
    *   **Baseline Estimation:** Computes rolling mean $\mu^{(x)}$ and standard deviation $\hat{\sigma}^{(x)}$ over a lagged window of length $B$.
    *   **Standardization:** Computes the z-score for each observation relative to its baseline.
*   **Outputs:**
    *   `z_r`, `z_sigma`, `z_A` (Series): The standardized deviation series.
*   **Research Role:**
    *   Accurately implements **Equations (4), (5), and (6)** from Section 4.2:
        $$ z_{i,t}^{(x)} = \frac{x_{i,t} - \mu_{i,t}^{(x)}}{\hat{\sigma}_{i,t}^{(x)} + \epsilon} $$
        This is the core "Statistical Deviation Metric" that normalizes diverse signals into a common unitless scale for aggregation.

### 11. `compute_composite_strength` (Task 11 Orchestrator)

*   **Inputs:**
    *   `z_r`, `z_sigma`, `z_A` (Series): The z-score series.
    *   `config` (Dictionary): Configuration defining weights $\alpha_r, \alpha_\sigma, \alpha_A$.
*   **Processes:**
    *   **Magnitude Extraction:** Takes the absolute value of return z-scores (if configured for two-sided detection).
    *   **Weighted Summation:** Aggregates the z-scores into a single scalar strength value.
*   **Outputs:**
    *   `s_series` (Series): The composite strength score $s_{i,t}$.
*   **Research Role:**
    *   Accurately implements **Equation (7)** from Section 4.3:
        $$ s_{i,t} = \alpha_r|z_{i,t}^{(r)}| + \alpha_\sigma z_{i,t}^{(\sigma)} + \alpha_A z_{i,t}^{(A)} $$
        This "Multi-channel fusion" step creates the primary signal used for window segmentation.

### 12. `segment_windows_hysteresis` (Task 12 Orchestrator)

*   **Inputs:**
    *   `s_series` (Series): The composite strength score.
    *   `config` (Dictionary): Configuration defining thresholds $\theta_{\text{high}}, \theta_{\text{low}}$ and gap tolerance $g$.
*   **Processes:**
    *   **Schmitt Trigger Logic:** Iterates through the time series, triggering a window when $s > \theta_{\text{high}}$ and sustaining it while $s > \theta_{\text{low}}$, allowing for short gaps of length $\le g$.
    *   **Aggregation:** Collects all detected windows across all tickers.
*   **Outputs:**
    *   `df_windows_raw` (DataFrame): A list of detected suspicious windows defined by start and end timestamps.
*   **Research Role:**
    *   Implements the **Hysteresis-based segmentation** algorithm described in Section 4.3. This converts the continuous strength signal into discrete "Suspicious Windows" for analysis.

### 13. `compute_phi_factors` (Task 13 Orchestrator)

*   **Inputs:**
    *   `df_windows_raw` (DataFrame): The detected windows.
    *   `z_r`, `z_sigma`, `z_A` (Series): The underlying z-scores.
    *   `config` (Dictionary): Configuration.
*   **Processes:**
    *   **Slicing:** Extracts the z-score data corresponding to the time interval of each window.
    *   **Factor Computation:** Calculates the six interpretable factors ($\phi_1 \dots \phi_6$) based on the sliced data (e.g., sum of squares, correlation).
*   **Outputs:**
    *   `df_phi` (DataFrame): The windows DataFrame enriched with factor values.
*   **Research Role:**
    *   Accurately implements the **Phi-Signal Definitions** (Equations 8–13) from Section 4.4.1. This step generates the "evidence" (Return shock, Volatility anomaly, Attention spike, Alignment) that explains *why* a window was flagged.

### 14. `compute_integrity_scores` (Task 14 Orchestrator)

*   **Inputs:**
    *   `df_phi` (DataFrame): Windows with computed factors.
    *   `config` (Dictionary): Configuration defining weights $\omega_k$.
*   **Processes:**
    *   **Linear Combination:** Computes the weighted sum of phi factors.
    *   **Attribution:** Stores the individual contribution of each factor to the total score.
*   **Outputs:**
    *   `df_scored` (DataFrame): Windows with the final Integrity Score $M$ and decomposition columns.
*   **Research Role:**
    *   Accurately implements **Equation (14)** and the **Factor Decomposition** logic (Equation 16) from Section 4.4.2:
        $$ M(w) = \sum_{k=1}^{6} \omega_k \cdot \phi_k(w) $$
        This produces the "Interpretable Integrity Score" used to rank anomalies.

### 15. `rank_and_filter_windows` (Task 15 Orchestrator)

*   **Inputs:**
    *   `df_scored` (DataFrame): Scored windows.
    *   `z_r`, `z_sigma`, `z_A` (Series): Z-scores for artifact checking.
    *   `config` (Dictionary): Configuration defining filters.
*   **Processes:**
    *   **Ranking:** Computes the percentile rank of each window's score $M$.
    *   **Warmup Filtering:** Excludes windows occurring before baselines are fully established.
    *   **Artifact Filtering:** Excludes windows containing extreme z-score outliers (e.g., $> 20\sigma$).
*   **Outputs:**
    *   `df_ranked` (DataFrame): The full set of ranked windows.
    *   `df_filtered` (DataFrame): The subset of windows passing quality filters.
*   **Research Role:**
    *   Accurately implements **Equation (15)** for **Rank percentiles** (Section 4.4.2) and applies the "warm-up periods" and "artifact filtering" described in Section 5.3 to produce a clean list for reporting.

### 16. `export_artifacts` (Task 16 Orchestrator)

*   **Inputs:**
    *   `df_ranked`, `df_filtered` (DataFrame): The processed window sets.
    *   `config` (Dictionary): Configuration.
*   **Processes:**
    *   **Formatting:** Selects and orders columns for the final output tables.
    *   **Top-N Selection:** Extracts the highest-scoring windows for the summary report.
    *   **Statistics:** Computes summary statistics (mean, max, nonzero%) for the phi factors.
*   **Outputs:**
    *   `artifacts` (Dictionary): A collection of DataFrames representing the study's final deliverables.
*   **Research Role:**
    *   Generates the **Empirical Contributions** (Section 1.4), specifically the "window lists, rankings, factor attributions, and case study reports" (Table 4, Table 5) required to communicate findings to analysts.

### 17. `run_aimm_x_pipeline` (Task 17 Top-Level Orchestrator)

*   **Inputs:**
    *   `df_raw_panel` (DataFrame): The raw input data.
    *   `config` (Dictionary): The study configuration.
*   **Processes:**
    *   **Sequential Execution:** Calls callables 1 through 16 in the strict dependency order defined by the pipeline architecture.
    *   **State Management:** Passes intermediate results (e.g., `df_clean`, `z_scores`) between stages.
    *   **Audit Logging:** Records the success/failure and metadata of each step.
*   **Outputs:**
    *   `PipelineResult` (NamedTuple): A comprehensive container holding the config snapshot, audit log, all data artifacts, and intermediate series.
*   **Research Role:**
    *   Implements the **End-to-End Pipeline** (Algorithm 1 in Section 4.5), providing the "reproducible end-to-end pipeline" that integrates data ingestion, detection, scoring, and reporting into a single executable unit.

<br><br>

## **Usage Example**

Below is a Python code snippet which uses synthetically generated data to illustrate accurate use of the AIMM-X Pipeline callables:

```python
#!/usr/bin/env python3
# ==============================================================================
# AIMM-X Pipeline Usage Example
# Context: "The Democratization of Market Integrity Verification"
# ==============================================================================

import logging
import sys
import yaml
import numpy as np
import pandas as pd
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay
# Import all essential Python modules

# Configure logging to stdout for demonstration
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("AIMM-X_Demo")

# ==============================================================================
# Step 1: Configuration Management
# ==============================================================================

# ------------------------------------------------------------------------------
# Step 1 Implementation: Read config.yaml into memory
# ------------------------------------------------------------------------------
def load_study_configuration(filepath: str = "config.yaml") -> Dict[str, Any]:
    """
    Reads the YAML configuration file from disk and returns it as a Python dictionary.

    This function serves as the entry point for the configuration management system,
    loading the hierarchical parameter set that governs the entire AIMM-X pipeline.
    It ensures that the configuration is accessible as a standard Python dictionary
    for downstream validation and usage.

    Parameters
    ----------
    filepath : str, optional
        The path to the YAML configuration file. Defaults to "config.yaml".

    Returns
    -------
    Dict[str, Any]
        The parsed configuration dictionary containing metadata, schemas, and parameters.

    Raises
    ------
    FileNotFoundError
        If the specified configuration file does not exist at the given path.
    yaml.YAMLError
        If the file contains invalid YAML syntax that cannot be parsed.
    """
    try:
        # Open the configuration file in read mode using a context manager for safety
        with open(filepath, 'r') as f:
            # Parse the YAML content into a Python dictionary using safe_load to prevent code execution
            config = yaml.safe_load(f)
        
        # Log successful loading of the configuration for audit purposes
        logger.info(f"Successfully loaded configuration from {filepath}")
        
        # Return the parsed configuration dictionary
        return config

    except FileNotFoundError:
        # Log an error if the file is missing, which is a critical failure for the pipeline
        logger.error(f"Configuration file not found at {filepath}")
        # Re-raise the exception to halt execution
        raise

    except yaml.YAMLError as e:
        # Log an error if the YAML syntax is invalid
        logger.error(f"Error parsing YAML file: {e}")
        # Re-raise the exception to halt execution
        raise

# ==============================================================================
# Step 2: Synthetic Data Generation (df_raw_panel)
# ==============================================================================

def generate_synthetic_panel(config: Dict[str, Any]) -> pd.DataFrame:
    """
    Generates a professional-grade synthetic DataFrame matching the AIMM-X schema.

    This function creates a realistic synthetic dataset for testing and demonstration
    purposes. It strictly enforces the methodological constraints defined in the
    AIMM-X paper, including exchange trading calendar alignment, OHLC consistency,
    and specific missingness semantics for attention signals.

    Processes:
    1.  **Calendar Generation:** Constructs a date index approximating the NYSE calendar,
        excluding weekends and US Federal holidays.
    2.  **Microstructure Simulation:** Generates price paths using Geometric Brownian Motion (GBM)
        and derives Open, High, Low, Close (OHLC) values that satisfy logical constraints
        (e.g., High >= max(Open, Close)). Generates strictly positive Volume.
    3.  **Attention Signal Simulation:** Generates synthetic attention data using Poisson
        processes for sparse sources (Reddit, News) and log-normal distributions for
        continuous sources (Wikipedia).
    4.  **Missingness Injection:** Randomly injects NaNs into attention columns to simulate
        "No Coverage" (API downtime), distinct from "No Activity" (0).
    5.  **Assembly:** Concatenates per-ticker data into a single MultiIndex DataFrame.

    Parameters
    ----------
    config : Dict[str, Any]
        The study configuration dictionary containing universe definition, date range,
        and reproducibility seeds.

    Returns
    -------
    pd.DataFrame
        The synthetic raw panel DataFrame (`df_raw_panel`) with MultiIndex ['date', 'ticker'].

    Raises
    ------
    KeyError
        If required configuration keys (e.g., 'meta', 'universe_tickers') are missing.
    """
    # Log the start of the synthetic data generation process
    logger.info("Generating synthetic panel data...")
    
    # 1. Calendar Generation (Approximating NYSE)
    # Extract start and end dates from the configuration
    start_date = config["meta"]["date_range"]["start"]
    end_date = config["meta"]["date_range"]["end"]
    
    # Create a CustomBusinessDay offset using the US Federal Holiday Calendar
    # This ensures the generated dates align with a realistic trading calendar (Constraint A)
    us_bd = CustomBusinessDay(calendar=USFederalHolidayCalendar())
    
    # Generate the range of dates using the custom offset
    dates = pd.date_range(start=start_date, end=end_date, freq=us_bd)
    
    # Calculate the number of dates for array sizing
    n_dates = len(dates)
    
    # Extract the list of tickers from the configuration
    tickers = config["meta"]["universe_tickers"]
    
    # Initialize a list to hold per-ticker DataFrames
    frames = []
    
    # Set the random seed for reproducibility as specified in the config
    np.random.seed(config["preprint_attention_proxy_reproducibility"]["random_seed"])

    # Iterate through each ticker to generate its specific data
    for ticker in tickers:
        # --- Market Microstructure Generation ---
        # Parameters for Geometric Brownian Motion (GBM)
        dt = 1/252  # Time step (1 trading day)
        mu = 0.10   # Annualized drift
        sigma = 0.30 # Annualized volatility
        
        # Generate an initial price uniformly between $10 and $200
        s0 = np.random.uniform(10, 200)
        
        # Generate daily log returns using a normal distribution
        # Equation: r_t ~ N((mu - 0.5 * sigma^2) * dt, sigma * sqrt(dt))
        returns = np.random.normal((mu - 0.5 * sigma**2) * dt, sigma * np.sqrt(dt), n_dates)
        
        # Calculate the price path using the cumulative sum of log returns
        # Equation: P_t = P_0 * exp(sum(r_1...r_t))
        price_path = s0 * np.exp(np.cumsum(returns))
        
        # Derive OHLC values to strictly satisfy constraints
        # Open price is modeled as close to the previous day's close (or current price path)
        open_prices = price_path * np.random.uniform(0.99, 1.01, n_dates)
        
        # Close price is set to the generated price path
        close_prices = price_path
        
        # High price must be >= max(Open, Close) to satisfy QC constraints
        # Calculate the maximum of Open and Close for each day
        max_oc = np.maximum(open_prices, close_prices)
        # Generate High prices slightly above this maximum
        high_prices = max_oc * np.random.uniform(1.001, 1.02, n_dates)
        
        # Low price must be <= min(Open, Close) to satisfy QC constraints
        # Calculate the minimum of Open and Close for each day
        min_oc = np.minimum(open_prices, close_prices)
        # Generate Low prices slightly below this minimum
        low_prices = min_oc * np.random.uniform(0.98, 0.999, n_dates)
        
        # Generate Volume using a log-normal distribution to simulate realistic trading volume
        volume = np.random.lognormal(mean=14, sigma=0.5, size=n_dates).astype(int)
        # Enforce strictly positive volume (Constraint B)
        volume = np.maximum(volume, 100)
        
        # Calculate Volume Weighted Average Price (VWAP) approximation
        # Note: This is a simplified approximation (Typical Price) for synthetic purposes
        vwap = (high_prices + low_prices + close_prices) / 3
        
        # Generate Number of Transactions as a fraction of volume
        txs = (volume / np.random.uniform(50, 200, n_dates)).astype(int)

        # --- Attention Signal Generation ---
        # Generate sparse, Poisson-like data for Reddit, StockTwits, and News
        # 0 means "no activity" (valid observation of zero events)
        reddit = np.random.poisson(lam=5, size=n_dates).astype(float)
        stocktwits = np.random.poisson(lam=20, size=n_dates).astype(float)
        news = np.random.poisson(lam=1, size=n_dates).astype(float)
        
        # Generate step-like, continuous data for Wikipedia and Google Trends
        wiki = np.random.lognormal(mean=8, sigma=1, size=n_dates)
        trends = np.random.uniform(0, 100, size=n_dates)
        
        # Inject Missingness (NaN) to simulate API downtime (Constraint C)
        # This distinguishes "No Coverage" (NaN) from "No Activity" (0)
        # Mask 5% of data as NaN for each attention source
        for arr in [reddit, stocktwits, news, wiki, trends]:
            # Create a boolean mask where True indicates a missing value (5% probability)
            mask = np.random.choice([True, False], size=n_dates, p=[0.05, 0.95])
            # Apply the mask to set values to NaN
            arr[mask] = np.nan

        # --- Assembly ---
        # Create a DataFrame for the current ticker with all generated columns
        df_ticker = pd.DataFrame({
            "open_price": open_prices,
            "high_price": high_prices,
            "low_price": low_prices,
            "close_price": close_prices,
            "volume": volume,
            "volume_weighted_average_price": vwap,
            "number_of_transactions": txs,
            "reddit_posts": reddit,
            "stocktwits_msgs": stocktwits,
            "wiki_views": wiki,
            "news_articles": news,
            "google_trends": trends
        }, index=dates)
        
        # Add the ticker column to the DataFrame
        df_ticker["ticker"] = ticker
        
        # Reset the index to make 'date' a column, rename it, and then set the MultiIndex
        df_ticker = df_ticker.reset_index().rename(columns={"index": "date"})
        df_ticker = df_ticker.set_index(["date", "ticker"])
        
        # Append the ticker's DataFrame to the list
        frames.append(df_ticker)

    # Concatenate all ticker DataFrames into a single panel
    df_raw_panel = pd.concat(frames)
    
    # Sort the MultiIndex by ticker then date to ensure monotonicity (Constraint D)
    df_raw_panel = df_raw_panel.sort_index(level=["ticker", "date"])
    
    # Log the shape of the generated panel for verification
    logger.info(f"Synthetic panel generated. Shape: {df_raw_panel.shape}")
    
    # Return the final synthetic DataFrame
    return df_raw_panel

# ==============================================================================
# Step 3: Pipeline Execution
# ==============================================================================

if __name__ == "__main__":
    # 1. Setup Environment
    create_dummy_config_file()
    
    # 2. Load Configuration
    config = load_study_configuration("config.yaml")
    
    # 3. Generate Data
    df_raw_panel = generate_synthetic_panel(config)
    
    # 4. Execute Pipeline
    # Note: Assuming that the required Python modules have been imported
    # Note: Assuming that all the orchestrator callables (and their utilities) are available in the namespace
    # Note: Assuming 'run_aimm_x_pipeline' is available in the namespace
    # (In a real script, this would be imported from the module)
    try:
        logger.info("Initiating AIMM-X Pipeline...")
        result = run_aimm_x_pipeline(df_raw_panel, config)
        
        # 5. Inspect Results
        print("\n" + "="*80)
        print("AIMM-X PIPELINE EXECUTION REPORT")
        print("="*80)
        
        print(f"\n[Audit Log Summary]")
        print(f"Steps Completed: {result.audit_log['steps_completed']}")
        print(f"Raw Windows Detected: {result.audit_log.get('raw_windows_count', 0)}")
        print(f"Filtered Windows: {result.audit_log.get('filtered_windows_count', 0)}")
        
        print(f"\n[Top 5 Suspicious Windows (Triage List)]")
        if not result.df_top_n.empty:
            print(result.df_top_n.head(5).to_string(index=False))
        else:
            print("No windows met the reporting criteria.")
            
        print(f"\n[Factor Contribution Stats]")
        if not result.df_phi_summary.empty:
            print(result.df_phi_summary.to_string())
            
        print("\n" + "="*80)
        logger.info("Pipeline execution successful.")
        
    except Exception as e:
        logger.critical(f"Pipeline execution failed: {e}")
```

<br><br>
## **Implemented Callables**

In [None]:
# Task 1 — Validate the methodological configuration dictionary

# ==============================================================================
# Task 1: Validate and parse the study configuration dictionary
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1:  Load the `study_parameters` dictionary and verify structural completeness.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_config_structure(config: Dict[str, Any]) -> bool:
    """
    Verifies that the configuration dictionary contains all required top-level keys
    and that they are of the correct type (dictionaries).

    This implements the structural validation required to ensure the configuration
    schema matches the AIMM-X blueprint.

    Parameters
    ----------
    config : Dict[str, Any]
        The master configuration dictionary to validate.

    Returns
    -------
    bool
        True if structure is valid. Raises ValueError otherwise.

    Raises
    ------
    ValueError
        If any required key is missing or is not a dictionary.
    """
    # Define the required top-level keys as specified in the blueprint
    required_keys: List[str] = [
        "meta",
        "trading_calendar",
        "input_schemas",
        "attention_processing",
        "feature_engineering",
        "deviation_detection",
        "composite_strength_score",
        "segmentation_algorithm",
        "scoring_model",
        "reporting_and_ranking",
        "preprint_attention_proxy_reproducibility"
    ]

    # Check for missing keys
    # Set difference: required - present
    missing_keys = set(required_keys) - set(config.keys())

    if missing_keys:
        error_msg = f"Configuration is missing required top-level keys: {missing_keys}"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Check for type correctness (all top-level values must be dicts)
    for key in required_keys:
        if not isinstance(config[key], dict):
            error_msg = f"Top-level key '{key}' must be a dictionary, got {type(config[key])}."
            logger.error(error_msg)
            raise ValueError(error_msg)

    logger.info("Task 1, Step 1: Configuration structural validation passed.")
    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Validate numerical parameter ranges and types.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_parameter_constraints(config: Dict[str, Any]) -> bool:
    """
    Validates critical nested parameters and cross-field coherence constraints
    derived from the AIMM-X methodology (Sections 4.3, 4.4).

    Checks:
    1. Universe integrity (24 tickers).
    2. Hysteresis threshold logic (theta_high > theta_low > 0).
    3. Baseline estimation consistency (B == min_periods).
    4. Weight positivity for composite score and integrity score.

    Parameters
    ----------
    config : Dict[str, Any]
        The master configuration dictionary.

    Returns
    -------
    bool
        True if all constraints are satisfied. Raises ValueError otherwise.

    Raises
    ------
    ValueError
        If any methodological constraint is violated.
    """
    # 1. Universe Integrity
    # Extract tickers
    tickers = config["meta"]["universe_tickers"]
    if not isinstance(tickers, list):
        raise ValueError("meta.universe_tickers must be a list.")

    # Constraint: len == 24
    if len(tickers) != 24:
        raise ValueError(f"Universe must contain exactly 24 tickers, found {len(tickers)}.")

    # Constraint: Unique and Uppercase
    if len(set(tickers)) != len(tickers):
        raise ValueError("Universe tickers must be unique.")

    for t in tickers:
        if not isinstance(t, str) or not t.isupper():
            raise ValueError(f"Ticker '{t}' must be an uppercase string.")

    # 2. Threshold Coherence (Hysteresis)
    # Extract parameters
    seg_params = config["segmentation_algorithm"]["parameters"]
    theta_high = seg_params["theta_high"]
    theta_low = seg_params["theta_low"]

    # Constraint: theta_high > theta_low > 0
    if not (isinstance(theta_high, (int, float)) and isinstance(theta_low, (int, float))):
         raise ValueError("Segmentation thresholds must be numeric.")

    if not (theta_high > theta_low):
        raise ValueError(f"theta_high ({theta_high}) must be strictly greater than theta_low ({theta_low}).")

    if theta_low <= 0:
        raise ValueError(f"theta_low ({theta_low}) must be positive.")

    # 3. Baseline Coherence
    # Extract parameters
    dev_params = config["deviation_detection"]
    B = dev_params["baseline_window_B"]
    min_periods = dev_params["min_periods"]
    use_lagged = dev_params["use_lagged_baseline_only"]
    lag = dev_params["lag"]

    # Constraint: B == min_periods (for this specific implementation)
    if B != min_periods:
        raise ValueError(f"baseline_window_B ({B}) must equal min_periods ({min_periods}) for consistency.")

    # Constraint: Lagged baseline logic
    if use_lagged and lag < 1:
        raise ValueError("If use_lagged_baseline_only is True, lag must be >= 1.")

    # 4. Weight Positivity
    # Composite Score Weights (Eq. 7)
    alpha_weights = config["composite_strength_score"]["weights_alpha"]
    for key, val in alpha_weights.items():
        if val < 0:
            raise ValueError(f"Composite weight {key} ({val}) must be non-negative.")

    # Integrity Score Weights (Eq. 14)
    omega_weights = config["scoring_model"]["integrity_score_M"]["phi_weights_omega"]
    for key, val in omega_weights.items():
        if val < 0:
            raise ValueError(f"Integrity score weight {key} ({val}) must be non-negative.")

    logger.info("Task 1, Step 2: Parameter constraint validation passed.")
    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Validate string-based model identifiers and create a configuration snapshot.
# -------------------------------------------------------------------------------------------------------------------------------
def check_determinism_fields(config: Dict[str, Any]) -> bool:
    """
    Checks for unresolved fields that are critical for deterministic reproduction.
    Logs warnings or raises errors if critical configuration is missing (None).

    Fields checked:
    1. trading_calendar.explicit_sessions
    2. attention_processing.normalization_definition_for_tilde_a.explicit_formula_or_reference
    3. preprint_attention_proxy_reproducibility.random_seed (if synthetic mode)

    Parameters
    ----------
    config : Dict[str, Any]
        The master configuration dictionary.

    Returns
    -------
    bool
        True if checks complete (warnings may be logged).

    Raises
    ------
    ValueError
        If strict mode is enforced (implied for 'cutting-edge exactitude').
    """
    # 1. Explicit Sessions
    explicit_sessions = config["trading_calendar"].get("explicit_sessions")
    if explicit_sessions is None:
        logger.warning(
            "CRITICAL: 'trading_calendar.explicit_sessions' is None. "
            "Exact reproduction requires a persisted list of session dates. "
            "The pipeline will attempt to generate this dynamically in Task 4."
        )
    elif not isinstance(explicit_sessions, list):
        raise ValueError("'explicit_sessions' must be a list of date strings or None.")

    # 2. Normalization Formula
    norm_def = config["attention_processing"]["normalization_definition_for_tilde_a"]
    explicit_formula = norm_def.get("explicit_formula_or_reference")
    if explicit_formula is None:
        logger.warning(
            "CRITICAL: 'normalization_definition_for_tilde_a.explicit_formula_or_reference' is None. "
            "Eq. (1) calculation may vary if normalization logic is not explicitly frozen."
        )

    # 3. Random Seed (if synthetic)
    proxy_config = config["preprint_attention_proxy_reproducibility"]
    if proxy_config.get("mode") == "synthetic_proxies":
        seed = proxy_config.get("random_seed")
        if seed is None:
            logger.warning(
                "CRITICAL: Synthetic proxy mode is active but 'random_seed' is None. "
                "Results will not be deterministic."
            )

    logger.info("Task 1, Step 3: Determinism field check completed.")
    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def validate_study_config(config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the validation of the AIMM-X study configuration.

    Executes a sequence of validation steps to ensure structural integrity,
    parameter coherence, and reproducibility readiness before any data processing begins.

    Sequence:
    1. validate_config_structure: Checks top-level keys and types.
    2. validate_parameter_constraints: Checks numerical logic and universe definitions.
    3. check_determinism_fields: Flags missing reproducibility artifacts.

    Parameters
    ----------
    config : Dict[str, Any]
        The raw configuration dictionary.

    Returns
    -------
    Dict[str, Any]
        The validated configuration dictionary (passed through).

    Raises
    ------
    ValueError
        If any validation step fails.
    """
    logger.info("Starting Task 1: Configuration Validation")

    # Step 1: Structure
    validate_config_structure(config)

    # Step 2: Constraints
    validate_parameter_constraints(config)

    # Step 3: Determinism
    check_determinism_fields(config)

    logger.info("Task 1 Completed: Configuration is valid.")
    return config


In [None]:
# Task 2: Validate the appropriateness of ingested panel data (df_raw_panel)

# ==============================================================================
# Task 2: Validate the appropriateness of ingested panel data (`df_raw_panel`)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Validate MultiIndex structure and ordering.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_multiindex_structure(df: pd.DataFrame, config: Dict[str, Any]) -> bool:
    """
    Verifies that the DataFrame has a strictly compliant MultiIndex structure:
    Levels must be ['date', 'ticker'], unique, and sorted.

    This ensures that subsequent vectorized operations (rolling windows, grouping)
    perform correctly and deterministically.

    Parameters
    ----------
    df : pd.DataFrame
        The raw panel DataFrame.
    config : Dict[str, Any]
        The study configuration dictionary (used for schema verification).

    Returns
    -------
    bool
        True if structure is valid. Raises ValueError otherwise.

    Raises
    ------
    ValueError
        If index levels, uniqueness, or sorting constraints are violated.
    """
    # 1. Check Index Type
    if not isinstance(df.index, pd.MultiIndex):
        raise ValueError(f"DataFrame must have a MultiIndex, got {type(df.index)}.")

    # 2. Check Level Names and Order
    expected_levels = config["input_schemas"]["panel_index"]["index_levels"]
    if df.index.names != expected_levels:
        raise ValueError(f"Index levels must be {expected_levels}, got {df.index.names}.")

    # 3. Check Level Dtypes
    # Level 0: date (datetime64[ns])
    if not pd.api.types.is_datetime64_any_dtype(df.index.levels[0]):
        raise ValueError(f"Level 'date' must be datetime64, got {df.index.levels[0].dtype}.")

    # Level 1: ticker (object/string)
    if not (pd.api.types.is_object_dtype(df.index.levels[1]) or pd.api.types.is_string_dtype(df.index.levels[1])):
        raise ValueError(f"Level 'ticker' must be object or string, got {df.index.levels[1].dtype}.")

    # 4. Check Uniqueness
    if not df.index.is_unique:
        duplicates = df[df.index.duplicated()].index.tolist()[:5]
        raise ValueError(f"Index must be unique. Found duplicates: {duplicates}...")

    # 5. Check Sortedness (Monotonicity)
    # Let's check if it is sorted according to its structure.
    if not df.index.is_monotonic_increasing:
        raise ValueError("Index must be strictly sorted (monotonic increasing). Please sort before ingestion.")

    logger.info("Task 2, Step 1: MultiIndex structure validation passed.")
    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Validate presence and dtype of all required columns.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_column_schema(df: pd.DataFrame, config: Dict[str, Any]) -> bool:
    """
    Verifies that all required market microstructure and attention columns are present
    and possess compatible data types.

    Parameters
    ----------
    df : pd.DataFrame
        The raw panel DataFrame.
    config : Dict[str, Any]
        The study configuration dictionary.

    Returns
    -------
    bool
        True if schema is valid. Raises ValueError otherwise.

    Raises
    ------
    ValueError
        If columns are missing or have incompatible types.
    """
    # Extract expected columns from config
    market_cols = config["input_schemas"]["market_microstructure_columns"]["columns"]
    attention_cols = config["input_schemas"]["attention_source_columns"]["columns"]

    all_required_cols = {**market_cols, **attention_cols}

    # 1. Check for Missing Columns
    missing_cols = set(all_required_cols.keys()) - set(df.columns)
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")

    # 2. Check Dtypes
    for col_name, specs in all_required_cols.items():
        actual_dtype = df[col_name].dtype
        expected_dtype_desc = specs["dtype"]

        # Loose check for float/int compatibility
        is_float = pd.api.types.is_float_dtype(actual_dtype)
        is_int = pd.api.types.is_integer_dtype(actual_dtype)
        is_nullable_int = isinstance(actual_dtype, pd.Int64Dtype)

        if "float" in expected_dtype_desc and not (is_float or is_int): # Int often acceptable for float cols
             raise ValueError(f"Column '{col_name}' expected float-like, got {actual_dtype}.")

        if "int" in expected_dtype_desc and not (is_int or is_nullable_int or is_float): # Float acceptable if holding NaNs
             raise ValueError(f"Column '{col_name}' expected int-like, got {actual_dtype}.")

        # Specific check for nullable attention columns
        if col_name in attention_cols:
             # Must support NaN (float or Int64)
             if not (is_float or is_nullable_int):
                 raise ValueError(f"Attention column '{col_name}' must support NaNs (float or Int64), got {actual_dtype}.")

    logger.info("Task 2, Step 2: Column schema validation passed.")
    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Validate ticker universe membership and session coverage.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_universe_coverage(df: pd.DataFrame, config: Dict[str, Any]) -> bool:
    """
    Verifies that the DataFrame contains exactly the tickers specified in the universe
    and checks for session coverage consistency if required.

    Parameters
    ----------
    df : pd.DataFrame
        The raw panel DataFrame.
    config : Dict[str, Any]
        The study configuration dictionary.

    Returns
    -------
    bool
        True if universe is valid. Raises ValueError otherwise.

    Raises
    ------
    ValueError
        If universe set does not match config or if critical session gaps exist.
    """
    # 1. Validate Universe Membership
    expected_tickers = set(config["meta"]["universe_tickers"])
    actual_tickers = set(df.index.get_level_values("ticker").unique())

    # Check for missing tickers
    missing_tickers = expected_tickers - actual_tickers
    if missing_tickers:
        raise ValueError(f"Panel is missing required tickers: {missing_tickers}")

    # Check for extra tickers
    extra_tickers = actual_tickers - expected_tickers
    if extra_tickers:
        raise ValueError(f"Panel contains extra tickers not in universe: {extra_tickers}")

    # 2. Validate Session Coverage (Optional but recommended)
    if config["trading_calendar"].get("require_exact_session_coverage_per_ticker"):
        explicit_sessions = config["trading_calendar"].get("explicit_sessions")

        if explicit_sessions:
            expected_dates = pd.to_datetime(explicit_sessions)
            expected_count = len(expected_dates)

            # Group by ticker and count dates
            # Note: This is a scalar check. Task 4 does the granular date-by-date check.
            # Here we just ensure gross counts are reasonable to fail fast.
            counts = df.groupby("ticker").size()

            mismatched_counts = counts[counts != expected_count]
            if not mismatched_counts.empty:
                logger.warning(
                    f"Session count mismatch for tickers: {mismatched_counts.to_dict()}. "
                    f"Expected {expected_count}. Detailed alignment check will occur in Task 4."
                )
                # We don't raise hard error here to allow Task 4 to generate the detailed audit report.

    logger.info("Task 2, Step 3: Universe coverage validation passed.")
    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def validate_panel_schema(df_raw_panel: pd.DataFrame, config: Dict[str, Any]) -> bool:
    """
    Orchestrates the validation of the ingested panel data structure.

    Ensures the DataFrame is structurally sound, contains all required columns with
    correct types, and covers the specified ticker universe before processing begins.

    Sequence:
    1. validate_multiindex_structure
    2. validate_column_schema
    3. validate_universe_coverage

    Parameters
    ----------
    df_raw_panel : pd.DataFrame
        The raw input DataFrame.
    config : Dict[str, Any]
        The validated configuration dictionary.

    Returns
    -------
    bool
        True if all validations pass.

    Raises
    ------
    ValueError
        If any validation step fails.
    """
    logger.info("Starting Task 2: Panel Schema Validation")

    # Step 1: Index Structure
    validate_multiindex_structure(df_raw_panel, config)

    # Step 2: Columns and Dtypes
    validate_column_schema(df_raw_panel, config)

    # Step 3: Universe Content
    validate_universe_coverage(df_raw_panel, config)

    logger.info("Task 2 Completed: Panel schema is valid.")
    return True


In [None]:
# Task 3: Cleanse the panel data (row-level quality enforcement)

# ==============================================================================
# Task 3: Cleanse the panel data (row-level quality enforcement)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Enforce OHLC consistency constraints per row.
# -------------------------------------------------------------------------------------------------------------------------------
def identify_ohlc_violations(df: pd.DataFrame) -> pd.Series:
    """
    Identifies rows where High/Low prices are inconsistent with Open/Close prices.

    Logic:
    1. High must be >= max(Open, Close)
    2. Low must be <= min(Open, Close)

    Parameters
    ----------
    df : pd.DataFrame
        The raw panel DataFrame containing 'open_price', 'high_price', 'low_price', 'close_price'.

    Returns
    -------
    pd.Series
        A boolean Series (indexed matching df) where True indicates a violation.
    """
    # Extract series for vectorized comparison
    # Use .get() or direct access; validation in Task 2 ensures columns exist.
    O = df["open_price"]
    H = df["high_price"]
    L = df["low_price"]
    C = df["close_price"]

    # Constraint 1: High >= max(Open, Close)
    # We use a small epsilon for float comparison if needed, but standard OHLC is usually precise.
    # Strict inequality violation: H < max(O, C)
    max_oc = np.maximum(O, C)
    violation_high = H < max_oc

    # Constraint 2: Low <= min(Open, Close)
    # Strict inequality violation: L > min(O, C)
    min_oc = np.minimum(O, C)
    violation_low = L > min_oc

    # Combine violations
    is_invalid = violation_high | violation_low

    # Log statistics
    num_violations = is_invalid.sum()
    if num_violations > 0:
        logger.warning(f"Task 3, Step 1: Found {num_violations} rows with OHLC inconsistencies.")

    return is_invalid


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Enforce positive price and volume constraints.
# -------------------------------------------------------------------------------------------------------------------------------
def identify_price_volume_violations(df: pd.DataFrame) -> pd.Series:
    """
    Identifies rows with non-positive Close prices or non-positive Volume.

    Logic:
    1. Close > 0 (Required for log returns)
    2. Volume > 0 (Required for liquid universe assumption)

    Parameters
    ----------
    df : pd.DataFrame
        The raw panel DataFrame.

    Returns
    -------
    pd.Series
        A boolean Series where True indicates a violation.
    """
    C = df["close_price"]
    V = df["volume"]

    # Constraint 1: Close > 0
    # Violation: C <= 0 or NaN
    violation_close = (C <= 0) | C.isna()

    # Constraint 2: Volume > 0
    # Violation: V <= 0 or NaN
    violation_volume = (V <= 0) | V.isna()

    # Combine violations
    is_invalid = violation_close | violation_volume

    # Log statistics
    num_violations = is_invalid.sum()
    if num_violations > 0:
        logger.warning(f"Task 3, Step 2: Found {num_violations} rows with invalid Price/Volume.")

    return is_invalid


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Normalize attention encoding and validate missingness semantics.
# -------------------------------------------------------------------------------------------------------------------------------
def enforce_attention_dtypes(df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
    """
    Ensures attention columns are cast to float64 to support NaN (missing coverage)
    vs 0.0 (no activity).

    Parameters
    ----------
    df : pd.DataFrame
        The cleansed panel DataFrame (valid rows only).
    config : Dict[str, Any]
        Configuration dictionary defining attention columns.

    Returns
    -------
    pd.DataFrame
        The DataFrame with attention columns cast to float64.
    """
    df_out = df.copy()
    attention_cols = list(config["input_schemas"]["attention_source_columns"]["columns"].keys())

    for col in attention_cols:
        if col in df_out.columns:
            # Force float64 to handle NaNs and subsequent z-score math
            # This preserves NaN as NaN and 0 as 0.0
            df_out[col] = df_out[col].astype("float64")

    logger.info("Task 3, Step 3: Attention columns cast to float64.")
    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def cleanse_panel(df_raw_panel: pd.DataFrame, config: Dict[str, Any]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates the row-level cleansing of the panel data.

    1. Identifies OHLC inconsistencies.
    2. Identifies invalid Price/Volume entries.
    3. Quarantines invalid rows.
    4. Enforces float dtypes for attention columns in the clean set.

    Parameters
    ----------
    df_raw_panel : pd.DataFrame
        The raw input DataFrame.
    config : Dict[str, Any]
        The study configuration.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        (df_clean, df_quarantine)
        df_clean: The subset of rows passing all QC checks, with correct dtypes.
        df_quarantine: The subset of rows that failed QC.
    """
    logger.info("Starting Task 3: Panel Cleansing")

    # Step 1: OHLC Checks
    mask_ohlc = identify_ohlc_violations(df_raw_panel)

    # Step 2: Price/Volume Checks
    mask_pv = identify_price_volume_violations(df_raw_panel)

    # Combine masks (True = Invalid)
    mask_quarantine = mask_ohlc | mask_pv

    # Split DataFrames
    df_quarantine = df_raw_panel[mask_quarantine].copy()
    df_clean_raw = df_raw_panel[~mask_quarantine].copy()

    # Log results
    total_rows = len(df_raw_panel)
    quarantined_rows = len(df_quarantine)
    clean_rows = len(df_clean_raw)

    logger.info(f"Total rows: {total_rows}")
    logger.info(f"Quarantined rows: {quarantined_rows} ({quarantined_rows/total_rows:.2%})")
    logger.info(f"Clean rows: {clean_rows}")

    # Step 3: Enforce Dtypes on Clean Data
    df_clean = enforce_attention_dtypes(df_clean_raw, config)

    logger.info("Task 3 Completed: Panel cleansed.")
    return df_clean, df_quarantine


In [None]:
# Task 4: Enforce the canonical trading-session grid

# ==============================================================================
# Task 4: Enforce the canonical trading-session grid
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Construct or load the canonical session list.
# -------------------------------------------------------------------------------------------------------------------------------
def get_canonical_sessions(config: Dict[str, Any]) -> pd.DatetimeIndex:
    """
    Retrieves or generates the canonical list of exchange trading sessions.

    Priority:
    1. Use 'explicit_sessions' from config if present.
    2. Generate using 'exchange_calendars' (XNYS) if available.
    3. Fallback to pandas USFederalHolidayCalendar (approximation, logged as warning).

    Parameters
    ----------
    config : Dict[str, Any]
        The study configuration.

    Returns
    -------
    pd.DatetimeIndex
        The sorted index of valid trading dates.

    Raises
    ------
    RuntimeError
        If sessions cannot be determined.
    """
    # 1. Try explicit list from config
    explicit = config["trading_calendar"].get("explicit_sessions")
    if explicit is not None:
        logger.info("Task 4, Step 1: Using explicit session list from config.")
        return pd.to_datetime(explicit).sort_values()

    # 2. Try generating via exchange_calendars
    start_date = config["meta"]["date_range"]["start"]
    end_date = config["meta"]["date_range"]["end"]
    calendar_name = config["trading_calendar"]["calendar_name"]  # e.g., "XNYS"

    try:
        logger.info(f"Task 4, Step 1: Generating {calendar_name} calendar via exchange_calendars.")
        nyse = xcals.get_calendar(calendar_name)
        sessions = nyse.sessions_in_range(start_date, end_date)
        return sessions
    except ImportError:
        logger.warning("Task 4, Step 1: 'exchange_calendars' not found. Attempting pandas fallback.")
    except Exception as e:
        logger.warning(f"Task 4, Step 1: Exchange calendar generation failed: {e}")

    # 3. Fallback: Pandas US Federal Holidays (Approximation for NYSE)
    # Note: This might miss special exchange closures (e.g., mourning days), but serves as a robust fallback.
    logger.warning("Task 4, Step 1: Using Pandas USFederalHolidayCalendar fallback. Verify accuracy for special closures.")
    us_bd = CustomBusinessDay(calendar=USFederalHolidayCalendar())
    sessions = pd.date_range(start=start_date, end=end_date, freq=us_bd)
    return sessions


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Validate per-ticker alignment to the canonical grid.
# -------------------------------------------------------------------------------------------------------------------------------
def check_ticker_alignment(
    df: pd.DataFrame,
    canonical_sessions: pd.DatetimeIndex
) -> Dict[str, Dict[str, List[pd.Timestamp]]]:
    """
    Checks every ticker in the DataFrame against the canonical session list.
    Identifies missing sessions (gaps) and extra sessions (calendar mismatches).

    Parameters
    ----------
    df : pd.DataFrame
        The cleansed panel DataFrame.
    canonical_sessions : pd.DatetimeIndex
        The authoritative list of trading dates.

    Returns
    -------
    Dict[str, Dict[str, List[pd.Timestamp]]]
        A dictionary keyed by ticker, containing 'missing' and 'extra' lists of dates.
    """
    alignment_report = {}
    expected_set = set(canonical_sessions)

    # Get unique tickers
    tickers = df.index.get_level_values("ticker").unique()

    for ticker in tickers:
        # Extract actual dates for this ticker
        # Note: We assume index is (date, ticker) based on Task 2 validation
        # Slicing via xs or boolean mask
        actual_dates = df.xs(ticker, level="ticker").index
        actual_set = set(actual_dates)

        # Calculate differences
        missing = sorted(list(expected_set - actual_set))
        extra = sorted(list(actual_set - expected_set))

        if missing or extra:
            alignment_report[ticker] = {
                "missing": missing,
                "extra": extra
            }

    return alignment_report


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Produce a session coverage audit report.
# -------------------------------------------------------------------------------------------------------------------------------
def generate_audit_report(
    alignment_data: Dict[str, Dict[str, List[pd.Timestamp]]],
    config: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Summarizes the alignment check results. Raises error if strict coverage is required
    and gaps are found.

    Parameters
    ----------
    alignment_data : Dict
        Output from check_ticker_alignment.
    config : Dict
        Study configuration.

    Returns
    -------
    Dict[str, Any]
        Summary statistics of the audit.

    Raises
    ------
    ValueError
        If require_exact_session_coverage_per_ticker is True and mismatches exist.
    """
    total_mismatched_tickers = len(alignment_data)

    summary = {
        "status": "PASS" if total_mismatched_tickers == 0 else "FAIL",
        "mismatched_tickers_count": total_mismatched_tickers,
        "details": {}
    }

    # Iterate through tickers
    for ticker, diffs in alignment_data.items():
        n_missing = len(diffs["missing"])
        n_extra = len(diffs["extra"])

        # Format dates for readability
        missing_str = [d.strftime("%Y-%m-%d") for d in diffs["missing"][:5]]
        if n_missing > 5: missing_str.append("...")

        extra_str = [d.strftime("%Y-%m-%d") for d in diffs["extra"][:5]]
        if n_extra > 5: extra_str.append("...")

        summary["details"][ticker] = {
            "missing_count": n_missing,
            "missing_examples": missing_str,
            "extra_count": n_extra,
            "extra_examples": extra_str
        }

        logger.warning(
            f"Ticker {ticker}: Missing {n_missing} sessions, Extra {n_extra} sessions."
        )

    # Enforce strictness
    if config["trading_calendar"]["require_exact_session_coverage_per_ticker"]:
        if total_mismatched_tickers > 0:
            error_msg = (
                f"Strict session coverage required. Found {total_mismatched_tickers} tickers "
                "with calendar mismatches. See log for details."
            )
            logger.error(error_msg)
            raise ValueError(error_msg)

    logger.info("Task 4, Step 3: Audit report generated.")
    return summary


# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def enforce_trading_calendar(df_clean: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrates the validation of the trading calendar alignment.

    1. Generates/Loads the canonical session list.
    2. Checks every ticker for exact alignment.
    3. Generates an audit report and enforces strictness if configured.

    Parameters
    ----------
    df_clean : pd.DataFrame
        The cleansed panel DataFrame.
    config : Dict[str, Any]
        The study configuration.

    Returns
    -------
    Dict[str, Any]
        The audit report summary.
    """
    logger.info("Starting Task 4: Trading Calendar Enforcement")

    # Step 1: Get Canonical Sessions
    sessions = get_canonical_sessions(config)
    logger.info(f"Canonical calendar established: {len(sessions)} sessions from {sessions[0].date()} to {sessions[-1].date()}.")

    # Step 2: Check Alignment
    alignment_data = check_ticker_alignment(df_clean, sessions)

    # Step 3: Audit and Enforce
    report = generate_audit_report(alignment_data, config)

    logger.info("Task 4 Completed: Calendar alignment verified.")
    return report


In [None]:
# Task 5: Prepare per-source attention series for fusion (alignment rules)

# ==============================================================================
# Task 5: Prepare per-source attention series for fusion (alignment rules)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1: Extract raw attention columns and map to canonical source names.
# -------------------------------------------------------------------------------------------------------------------------------
def extract_raw_attention_series(
    df: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, pd.Series]:
    """
    Extracts raw attention columns from the cleansed DataFrame and maps them
    to their canonical source names (e.g., 'reddit', 'stocktwits') as defined
    in the configuration.

    Parameters
    ----------
    df : pd.DataFrame
        The cleansed panel DataFrame.
    config : Dict[str, Any]
        The study configuration containing 'source_to_raw_column' mapping.

    Returns
    -------
    Dict[str, pd.Series]
        A dictionary where keys are canonical source names and values are
        the corresponding raw pandas Series (indexed by date, ticker).
    """
    source_map = config["attention_processing"]["source_to_raw_column"]
    extracted_series = {}

    for canonical_name, raw_col_name in source_map.items():
        if raw_col_name not in df.columns:
            # This should have been caught in Task 2, but defensive coding is best practice.
            raise ValueError(f"Expected column '{raw_col_name}' for source '{canonical_name}' not found in DataFrame.")

        # Extract and ensure float64 (preserving NaN/0 distinction)
        # .copy() is essential to prevent SettingWithCopyWarning during subsequent transformations
        series = df[raw_col_name].copy().astype("float64")
        extracted_series[canonical_name] = series

    logger.info(f"Task 5, Step 1: Extracted {len(extracted_series)} raw attention series.")
    return extracted_series


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 2: Apply source-specific resampling rules (within-ticker).
# -------------------------------------------------------------------------------------------------------------------------------
def apply_alignment_rules(
    raw_series_dict: Dict[str, pd.Series],
    config: Dict[str, Any]
) -> Dict[str, pd.Series]:
    """
    Applies source-specific alignment and filling rules.

    Crucially, this handles the 'step-function' nature of some sources (Wikipedia, Trends)
    by allowing forward-filling, while preserving the 'event-driven' nature of others
    (Reddit, News) by strictly forbidding it.

    Logic:
    1. Group by ticker (to prevent cross-ticker contamination).
    2. If 'ffill_allowed' is True: Apply forward fill.
    3. If 'ffill_allowed' is False: Do nothing (keep raw NaNs/0s).

    Parameters
    ----------
    raw_series_dict : Dict[str, pd.Series]
        Dictionary of raw attention series.
    config : Dict[str, Any]
        Configuration containing 'alignment_rules_per_source'.

    Returns
    -------
    Dict[str, pd.Series]
        Dictionary of aligned/filled attention series.
    """
    rules = config["attention_processing"]["alignment_rules_per_source"]
    aligned_series_dict = {}

    for source, series in raw_series_dict.items():
        rule = rules.get(source)
        if not rule:
            raise ValueError(f"No alignment rule defined for source '{source}'.")

        # Group by ticker to ensure operations are isolated per asset
        # We assume index level 1 is 'ticker' based on Task 2 validation
        grouped = series.groupby(level="ticker", group_keys=False)

        if rule["ffill_allowed"]:
            # Apply forward fill within each ticker group
            # This propagates the last valid observation forward
            # Leading NaNs remain NaN (correctly representing 'no coverage yet')
            aligned_series = grouped.ffill()
            logger.debug(f"Applied ffill to source '{source}'.")
        else:
            # No fill; keep original series
            aligned_series = series
            logger.debug(f"No ffill applied to source '{source}'.")

        aligned_series_dict[source] = aligned_series

    logger.info("Task 5, Step 2: Alignment rules applied.")
    return aligned_series_dict


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Preserve missingness semantics post-alignment.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_missingness_semantics(
    aligned_series_dict: Dict[str, pd.Series]
) -> bool:
    """
    Validates that the distinction between 0.0 (zero activity) and NaN (no coverage)
    has been preserved after alignment.

    Parameters
    ----------
    aligned_series_dict : Dict[str, pd.Series]
        The processed attention series.

    Returns
    -------
    bool
        True if semantics appear valid (i.e., series are not empty, dtypes are float).
    """
    for source, series in aligned_series_dict.items():
        # Check dtype
        if not pd.api.types.is_float_dtype(series.dtype):
            raise TypeError(f"Source '{source}' must be float64, got {series.dtype}.")

        # Log statistics for audit
        n_total = len(series)
        n_nan = series.isna().sum()
        n_zero = (series == 0.0).sum()
        n_positive = (series > 0.0).sum()

        logger.info(
            f"Source '{source}' Stats: "
            f"Total={n_total}, NaN={n_nan} ({n_nan/n_total:.1%}), "
            f"Zero={n_zero} ({n_zero/n_total:.1%}), "
            f"Pos={n_positive} ({n_positive/n_total:.1%})"
        )

        # Sanity check: We shouldn't have lost all data
        if n_total > 0 and n_nan == n_total:
            logger.warning(f"Source '{source}' is entirely NaN after alignment. Check input data.")

    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def align_attention_sources(
    df_clean: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, pd.Series]:
    """
    Orchestrates the preparation of attention series for fusion.

    1. Extracts raw columns mapped to canonical names.
    2. Applies per-source alignment rules (specifically forward-filling where permitted).
    3. Validates that missingness semantics (NaN vs 0) are preserved.

    Parameters
    ----------
    df_clean : pd.DataFrame
        The cleansed panel DataFrame.
    config : Dict[str, Any]
        The study configuration.

    Returns
    -------
    Dict[str, pd.Series]
        A dictionary of aligned, float64 attention series ready for normalization.
    """
    logger.info("Starting Task 5: Attention Alignment")

    # Step 1: Extract
    raw_series = extract_raw_attention_series(df_clean, config)

    # Step 2: Align/Fill
    aligned_series = apply_alignment_rules(raw_series, config)

    # Step 3: Validate
    validate_missingness_semantics(aligned_series)

    logger.info("Task 5 Completed: Attention sources aligned.")
    return aligned_series


In [None]:
# Task 6: Normalize attention sources into (\tilde{a}_{s,i,t})

# ==============================================================================
# Task 6: Normalize attention sources into \(\tilde{a}_{s,i,t}\)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1 & 2: Define and apply rolling normalization per source.
# -------------------------------------------------------------------------------------------------------------------------------
def compute_rolling_zscore_normalization(
    series: pd.Series,
    window: int,
    min_periods: int,
    epsilon: float
) -> pd.Series:
    """
    Computes the rolling z-score for a single series (grouped by ticker) to serve
    as the normalized signal \(\tilde{a}_{s,i,t}\).

    Formula:
    \[
    \mu_{t} = \text{Mean}(x_{t-1} \dots x_{t-B})
    \sigma_{t} = \text{Std}(x_{t-1} \dots x_{t-B})
    \tilde{a}_{t} = \frac{x_{t} - \mu_{t}}{\sigma_{t} + \epsilon}
    \]

    This ensures:
    1. No look-ahead bias (baseline uses lagged data).
    2. Stationarity (local mean/std adjustment).
    3. Comparability across sources (unitless scale).

    Parameters
    ----------
    series : pd.Series
        Input attention series (must be indexed by date, ticker).
    window : int
        Rolling window size (B).
    min_periods : int
        Minimum observations required.
    epsilon : float
        Small constant to prevent division by zero.

    Returns
    -------
    pd.Series
        Normalized series \(\tilde{a}\).
    """
    # Group by ticker to isolate assets
    # We assume level 1 is ticker based on previous validation
    grouped = series.groupby(level="ticker", group_keys=False)

    # Compute Rolling Baseline (Lagged)
    # shift(1) ensures we use t-1...t-B to calculate stats for t
    # This prevents the anomaly at t from polluting the baseline at t
    rolled = grouped.shift(1).rolling(window=window, min_periods=min_periods)

    mu = rolled.mean()
    sigma = rolled.std()

    # Compute Z-Score
    # Note: x_t is the current value, mu/sigma are from history
    z_score = (series - mu) / (sigma + epsilon)

    return z_score


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Validate normalized series properties.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_normalized_series(
    normalized_dict: Dict[str, pd.Series]
) -> bool:
    """
    Validates that the normalized series exhibit expected statistical properties
    (centered near 0, unit variance) and contain no infinite values.

    Parameters
    ----------
    normalized_dict : Dict[str, pd.Series]
        Dictionary of normalized attention series.

    Returns
    -------
    bool
        True if valid.
    """
    for source, series in normalized_dict.items():
        # Check for Inf
        if np.isinf(series).any():
            n_inf = np.isinf(series).sum()
            logger.error(f"Source '{source}' contains {n_inf} infinite values after normalization.")

            # Replace inf with NaN to prevent downstream crashes, but flag it
            # In a strict pipeline, we might raise, but here we patch for robustness
            series.replace([np.inf, -np.inf], np.nan, inplace=True)

        # Check stats (ignoring NaNs from warmup)
        mean_val = series.mean()
        std_val = series.std()

        logger.info(
            f"Normalized '{source}': Mean={mean_val:.4f}, Std={std_val:.4f} "
            f"(Expected ~0.0, ~1.0)"
        )

    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def normalize_attention_sources(
    aligned_series_dict: Dict[str, pd.Series],
    config: Dict[str, Any]
) -> Dict[str, pd.Series]:
    """
    Orchestrates the normalization of attention series.

    1. Loads baseline parameters (B, epsilon).
    2. Applies lagged rolling z-score normalization to each source.
    3. Validates output statistics.

    Parameters
    ----------
    aligned_series_dict : Dict[str, pd.Series]
        Dictionary of aligned attention series.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, pd.Series]
        Dictionary of normalized \(\tilde{a}_{s,i,t}\) series.
    """
    logger.info("Starting Task 6: Attention Normalization")

    # Load parameters
    # We use the same window B as the main detection engine for consistency
    B = config["deviation_detection"]["baseline_window_B"]
    min_periods = config["deviation_detection"]["min_periods"]
    epsilon = config["deviation_detection"]["epsilon"]

    normalized_dict = {}

    for source, series in aligned_series_dict.items():
        logger.debug(f"Normalizing source '{source}' with window={B}...")

        tilde_a = compute_rolling_zscore_normalization(
            series,
            window=B,
            min_periods=min_periods,
            epsilon=epsilon
        )

        normalized_dict[source] = tilde_a

    # Validate
    validate_normalized_series(normalized_dict)

    logger.info("Task 6 Completed: Attention sources normalized.")
    return normalized_dict


In [None]:
# Task 7: Fuse normalized attention into \(A_{i,t}\) (Eq. 1)

# ==============================================================================
# Task 7: Fuse normalized attention into \(A_{i,t}\) (Eq. 1)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1 & 2: Compute weighted sum with strict NaN propagation.
# -------------------------------------------------------------------------------------------------------------------------------
def compute_fused_signal(
    normalized_dict: Dict[str, pd.Series],
    weights: Dict[str, float]
) -> pd.Series:
    """
    Computes the fused attention signal \(A_{i,t}\) using a weighted sum.

    Equation:
    \[
    A_{i,t} = \sum_{s \in \mathcal{S}} w_s \cdot \tilde{a}_{s,i,t}
    \]

    Constraints:
    - Strict NaN propagation: If any source is NaN (missing coverage), the fused
      signal is NaN. This prevents partial signals from being misinterpreted as
      low attention.

    Parameters
    ----------
    normalized_dict : Dict[str, pd.Series]
        Dictionary of normalized attention series.
    weights : Dict[str, float]
        Dictionary of fusion weights.

    Returns
    -------
    pd.Series
        The fused attention signal \(A_{i,t}\), indexed by (date, ticker).
    """
    # 1. Align all series into a DataFrame
    # This ensures indices match exactly; missing indices become NaN
    df_components = pd.DataFrame(normalized_dict)

    # 2. Apply weights
    # Multiply each column by its weight
    for source, weight in weights.items():
        if source in df_components.columns:
            df_components[source] *= weight
        else:
            # Should be caught by validation, but defensive check
            raise ValueError(f"Weight provided for source '{source}' but source series is missing.")

    # 3. Compute Sum with Strict NaN Propagation
    # skipna=False ensures that if any component is NaN, the sum is NaN
    A_series = df_components.sum(axis=1, skipna=False)

    return A_series


# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 3: Validate fused signal properties.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_fused_signal(A_series: pd.Series) -> bool:
    """
    Validates the fused attention signal.

    Parameters
    ----------
    A_series : pd.Series
        The fused signal.

    Returns
    -------
    bool
        True if valid.
    """
    n_total = len(A_series)
    n_nan = A_series.isna().sum()

    logger.info(
        f"Fused Signal Stats: Total={n_total}, Valid={n_total - n_nan}, "
        f"NaN={n_nan} ({n_nan/n_total:.1%})"
    )

    if n_total > 0 and n_nan == n_total:
        logger.warning("Fused signal is entirely NaN. Check source coverage overlap.")

    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def fuse_attention(
    normalized_dict: Dict[str, pd.Series],
    config: Dict[str, Any]
) -> pd.Series:
    """
    Orchestrates the fusion of normalized attention signals.

    1. Loads fusion weights.
    2. Computes weighted sum with strict NaN propagation (Eq. 1).
    3. Validates the result.

    Parameters
    ----------
    normalized_dict : Dict[str, pd.Series]
        Dictionary of normalized attention series.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.Series
        The fused attention signal \(A_{i,t}\).
    """
    logger.info("Starting Task 7: Attention Fusion")

    # Load weights
    weights = config["attention_processing"]["attention_fusion"]["weights_w_s"]

    # Validate weight keys match source keys
    source_keys = set(normalized_dict.keys())
    weight_keys = set(weights.keys())

    if not weight_keys.issubset(source_keys):
        missing = weight_keys - source_keys
        raise ValueError(f"Weights defined for missing sources: {missing}")

    # Compute
    A_series = compute_fused_signal(normalized_dict, weights)

    # Validate
    validate_fused_signal(A_series)

    logger.info("Task 7 Completed: Attention fused.")
    return A_series


In [None]:
# Task 8: Compute log returns \(r_{i,t}\) (Eq. 2)

# ==============================================================================
# Task 8: Compute log returns \(r_{i,t}\) (Eq. 2)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1 & 2: Compute log returns per ticker.
# -------------------------------------------------------------------------------------------------------------------------------
def calculate_log_returns(df: pd.DataFrame) -> pd.Series:
    """
    Computes the daily log returns for each ticker.

    Equation:
    \[
    r_{i,t} = \log\left(\frac{C_{i,t}}{C_{i,t-1}}\right)
    \]

    Parameters
    ----------
    df : pd.DataFrame
        The cleansed panel DataFrame containing 'close_price'.

    Returns
    -------
    pd.Series
        Log returns indexed by (date, ticker). First observation per ticker is NaN.
    """
    # Extract close prices
    # Ensure float64 for precision
    close_prices = df["close_price"].astype("float64")

    # Group by ticker to isolate assets
    # We assume level 1 is ticker based on Task 2 validation
    grouped = close_prices.groupby(level="ticker", group_keys=False)

    # Compute Log Returns
    # r_t = ln(P_t) - ln(P_{t-1})
    # This is numerically equivalent to ln(P_t / P_{t-1}) but often more stable
    # shift(1) aligns P_{t-1} with P_t
    prev_close = grouped.shift(1)

    # Calculate
    # Note: Task 3 enforced close_price > 0, so log is safe
    r_series = np.log(close_prices / prev_close)

    return r_series


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 3: Validate return series properties.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_returns(r_series: pd.Series) -> bool:
    """
    Validates the computed log returns.

    Checks:
    1. NaN count matches number of tickers (one lost day per ticker).
    2. No infinite values.
    3. Flags extreme outliers (> 50% daily move) for audit.

    Parameters
    ----------
    r_series : pd.Series
        The log return series.

    Returns
    -------
    bool
        True if valid.
    """
    # 1. Check NaNs
    n_tickers = r_series.index.get_level_values("ticker").nunique()
    n_nans = r_series.isna().sum()

    # We expect exactly 1 NaN per ticker (the first day)
    if n_nans != n_tickers:
        logger.debug(
            f"Return NaNs: {n_nans} (Expected {n_tickers} if continuous). "
            "Discrepancy implies gaps or short history."
        )

    # 2. Check Inf
    if np.isinf(r_series).any():
        n_inf = np.isinf(r_series).sum()
        logger.error(f"Found {n_inf} infinite returns. Check for zero prices.")
        # Patch infs
        r_series.replace([np.inf, -np.inf], np.nan, inplace=True)

    # 3. Check Outliers
    # Log returns > 0.4 (~50%) or < -0.7 (~-50%)
    outliers = r_series[np.abs(r_series) > 0.5]
    if not outliers.empty:
        logger.warning(
            f"Found {len(outliers)} extreme returns (>50% magnitude). "
            f"Examples: {outliers.head().to_dict()}"
        )

    logger.info(
        f"Returns Computed: Mean={r_series.mean():.5f}, Std={r_series.std():.5f}"
    )
    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_log_returns(
    df_clean: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.Series:
    """
    Orchestrates the computation of log returns.

    1. Computes daily log returns per ticker (Eq. 2).
    2. Validates the resulting series.

    Parameters
    ----------
    df_clean : pd.DataFrame
        The cleansed panel DataFrame.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.Series
        Log returns \(r_{i,t}\).
    """
    logger.info("Starting Task 8: Log Return Computation")

    # Step 1 & 2: Compute
    r_series = calculate_log_returns(df_clean)

    # Step 3: Validate
    validate_returns(r_series)

    logger.info("Task 8 Completed: Log returns computed.")
    return r_series


In [None]:
# Task 9: Compute volatility proxy \(\sigma_{i,t}\) (Eq. 3)

# ==============================================================================
# Task 9: Compute volatility proxy \(\sigma_{i,t}\) (Eq. 3)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 1 & 2: Compute rolling realized volatility.
# -------------------------------------------------------------------------------------------------------------------------------
def calculate_realized_volatility(
    r_series: pd.Series,
    lookback: int,
    epsilon: float
) -> pd.Series:
    """
    Computes the rolling realized volatility proxy based on lagged squared returns.

    Equation:
    \[
    \sigma_{i,t} = \sqrt{\frac{1}{L} \sum_{j=1}^{L} r_{i,t-j}^2 + \epsilon}
    \]

    Key Implementation Details:
    - Strictly Lagged: The volatility estimate for time t uses returns from t-1 to t-L.
      This ensures no look-ahead bias and that \(\sigma_t\) represents the volatility
      regime *entering* the trading session.
    - Squared Returns: Uses simple mean of squared returns (RMS) as the estimator.

    Parameters
    ----------
    r_series : pd.Series
        Log returns indexed by (date, ticker).
    lookback : int
        Window size L.
    epsilon : float
        Small constant for numerical stability.

    Returns
    -------
    pd.Series
        Volatility proxy \(\sigma_{i,t}\).
    """
    # Group by ticker
    grouped = r_series.groupby(level="ticker", group_keys=False)

    # 1. Shift by 1 to enforce strict lag (t-1 ... t-L)
    lagged_r = grouped.shift(1)

    # 2. Square the returns
    r_squared = lagged_r.pow(2)

    # 3. Compute Rolling Mean of Squared Returns
    # min_periods=lookback ensures we don't produce values until full window is available
    rolling_variance_proxy = r_squared.rolling(window=lookback, min_periods=lookback).mean()

    # 4. Add epsilon and take square root
    sigma_series = (rolling_variance_proxy + epsilon).pow(0.5)

    return sigma_series


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 3: Validate volatility series properties.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_volatility(sigma_series: pd.Series) -> bool:
    """
    Validates the computed volatility proxy.

    Checks:
    1. Non-negativity.
    2. No infinite values.
    3. Reasonable magnitude (warn if mean volatility is extremely high/low).

    Parameters
    ----------
    sigma_series : pd.Series
        The volatility series.

    Returns
    -------
    bool
        True if valid.
    """
    # 1. Check Non-negativity
    if (sigma_series < 0).any():
        # Should be impossible due to square root
        raise ValueError("Found negative volatility values. Implementation error.")

    # 2. Check Inf
    if np.isinf(sigma_series).any():
        n_inf = np.isinf(sigma_series).sum()
        logger.error(f"Found {n_inf} infinite volatility values.")
        sigma_series.replace([np.inf, -np.inf], np.nan, inplace=True)

    # 3. Check Magnitude
    mean_vol = sigma_series.mean()
    # Daily vol of 1% (0.01) to 5% (0.05) is typical.
    # 0.01 daily ~ 16% annualized. 0.05 daily ~ 80% annualized.
    logger.info(f"Volatility Computed: Mean={mean_vol:.5f}, Std={sigma_series.std():.5f}")

    if mean_vol > 0.10:
        logger.warning(f"Mean daily volatility is very high ({mean_vol:.2%}). Check return scaling.")

    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_volatility(
    r_series: pd.Series,
    config: Dict[str, Any]
) -> pd.Series:
    """
    Orchestrates the computation of realized volatility.

    1. Loads parameters (L, epsilon).
    2. Computes rolling RMS of lagged returns (Eq. 3).
    3. Validates the result.

    Parameters
    ----------
    r_series : pd.Series
        Log returns.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.Series
        Volatility proxy \(\sigma_{i,t}\).
    """
    logger.info("Starting Task 9: Volatility Computation")

    # Load parameters
    vol_config = config["feature_engineering"]["volatility"]
    L = vol_config["lookback_L"]
    epsilon = vol_config["epsilon"]

    # Compute
    sigma_series = calculate_realized_volatility(r_series, L, epsilon)

    # Validate
    validate_volatility(sigma_series)

    logger.info("Task 9 Completed: Volatility computed.")
    return sigma_series


In [None]:
# Task 10: Compute rolling baselines and z-scores (Eq. 4, 5, 6)

# ==============================================================================
# Task 10: Compute rolling baselines and z-scores (Eq. 4, 5, 6)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1 & 2 & 3: Generic Z-Score Computation Logic.
# -------------------------------------------------------------------------------------------------------------------------------
def calculate_rolling_zscore(
    series: pd.Series,
    window: int,
    min_periods: int,
    epsilon: float
) -> pd.Series:
    """
    Computes the rolling z-score for a generic time series channel.

    Equations:
    \[
    \mu_{t} = \text{Mean}(x_{t-1} \dots x_{t-B})
    \hat{\sigma}_{t} = \sqrt{\text{Var}(x_{t-1} \dots x_{t-B}) + \epsilon}
    z_{t} = \frac{x_{t} - \mu_{t}}{\hat{\sigma}_{t} + \epsilon}
    \]

    Parameters
    ----------
    series : pd.Series
        Input series (r, sigma, or A).
    window : int
        Baseline window B.
    min_periods : int
        Minimum observations.
    epsilon : float
        Stability constant.

    Returns
    -------
    pd.Series
        Z-score series.
    """
    # Group by ticker
    grouped = series.groupby(level="ticker", group_keys=False)

    # Shift by 1 to enforce strict lag (t-1 ... t-B)
    lagged = grouped.shift(1)

    # Compute Rolling Stats
    # We compute var() to add epsilon inside the sqrt as per Eq. 5
    rolled = lagged.rolling(window=window, min_periods=min_periods)

    mu = rolled.mean()
    variance = rolled.var()

    # Eq. 5: Sigma with epsilon inside sqrt
    sigma_hat = (variance + epsilon).pow(0.5)

    # Eq. 6: Z-score with epsilon in denominator
    # Note: x_t is the current value (unshifted)
    z_score = (series - mu) / (sigma_hat + epsilon)

    return z_score


# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_baselines_and_zscores(
    r_series: pd.Series,
    sigma_series: pd.Series,
    A_series: pd.Series,
    config: Dict[str, Any]
) -> Tuple[pd.Series, pd.Series, pd.Series]:
    """
    Orchestrates the computation of z-scores for all three channels.

    1. Loads baseline parameters.
    2. Computes z-scores for Returns, Volatility, and Attention.
    3. Validates outputs.

    Parameters
    ----------
    r_series : pd.Series
        Log returns.
    sigma_series : pd.Series
        Volatility proxy.
    A_series : pd.Series
        Fused attention.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Tuple[pd.Series, pd.Series, pd.Series]
        (z_r, z_sigma, z_A)
    """
    logger.info("Starting Task 10: Baseline & Z-Score Computation")

    # Load parameters
    dev_config = config["deviation_detection"]
    B = dev_config["baseline_window_B"]
    min_periods = dev_config["min_periods"]
    epsilon = dev_config["epsilon"]

    # Compute Z-Scores
    logger.debug(f"Computing Z-scores with B={B}...")

    z_r = calculate_rolling_zscore(r_series, B, min_periods, epsilon)
    z_sigma = calculate_rolling_zscore(sigma_series, B, min_periods, epsilon)
    z_A = calculate_rolling_zscore(A_series, B, min_periods, epsilon)

    # Basic Validation
    for name, z in [("Returns", z_r), ("Volatility", z_sigma), ("Attention", z_A)]:
        n_valid = z.count()
        logger.info(f"{name} Z-Score: {n_valid} valid observations.")

        if np.isinf(z).any():
            logger.warning(f"{name} Z-Score contains infinite values. Replacing with NaN.")
            z.replace([np.inf, -np.inf], np.nan, inplace=True)

    logger.info("Task 10 Completed: Z-scores computed.")
    return z_r, z_sigma, z_A


In [None]:
# Task 11: Compute composite strength score \(s_{i,t}\) (Eq. 7)

# ==============================================================================
# Task 11: Compute composite strength score \(s_{i,t}\) (Eq. 7)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1 & 2: Compute composite strength score.
# -------------------------------------------------------------------------------------------------------------------------------
def calculate_composite_score(
    z_r: pd.Series,
    z_sigma: pd.Series,
    z_A: pd.Series,
    config: Dict[str, Any]
) -> pd.Series:
    """
    Computes the composite anomaly strength score \(s_{i,t}\).

    Equation:
    \[
    s_{i,t} = \alpha_r |z_{i,t}^{(r)}| + \alpha_\sigma z_{i,t}^{(\sigma)} + \alpha_A z_{i,t}^{(A)}
    \]

    Parameters
    ----------
    z_r : pd.Series
        Return z-scores.
    z_sigma : pd.Series
        Volatility z-scores.
    z_A : pd.Series
        Attention z-scores.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.Series
        Composite strength score \(s_{i,t}\).
    """
    # Load weights
    weights = config["composite_strength_score"]["weights_alpha"]
    alpha_r = weights["alpha_r"]
    alpha_sigma = weights["alpha_sigma"]
    alpha_A = weights["alpha_A"]

    two_sided_r = config["composite_strength_score"]["return_channel_is_two_sided"]

    # Compute components
    # Returns: Absolute value if two-sided (standard for anomaly detection)
    term_r = z_r.abs() if two_sided_r else z_r
    term_r = term_r * alpha_r

    # Volatility: Direct (high vol is anomalous)
    term_sigma = z_sigma * alpha_sigma

    # Attention: Direct (high attention is anomalous)
    term_A = z_A * alpha_A

    # Sum
    s_series = term_r + term_sigma + term_A

    return s_series


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 3: Validate composite score series.
# -------------------------------------------------------------------------------------------------------------------------------
def validate_composite_score(s_series: pd.Series) -> bool:
    """
    Validates the composite strength score.

    Parameters
    ----------
    s_series : pd.Series
        The composite score.

    Returns
    -------
    bool
        True if valid.
    """
    n_valid = s_series.count()

    if n_valid == 0:
        logger.warning("Composite score is empty (all NaN). Check input z-scores.")
        return False

    if np.isinf(s_series).any():
        n_inf = np.isinf(s_series).sum()
        logger.error(f"Composite score contains {n_inf} infinite values.")
        s_series.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Log distribution stats
    logger.info(
        f"Composite Score Stats: Mean={s_series.mean():.4f}, "
        f"Max={s_series.max():.4f}, Min={s_series.min():.4f}"
    )

    return True


# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_composite_strength(
    z_r: pd.Series,
    z_sigma: pd.Series,
    z_A: pd.Series,
    config: Dict[str, Any]
) -> pd.Series:
    """
    Orchestrates the computation of the composite strength score.

    1. Loads weights.
    2. Computes weighted sum of z-scores (Eq. 7).
    3. Validates result.

    Parameters
    ----------
    z_r : pd.Series
        Return z-scores.
    z_sigma : pd.Series
        Volatility z-scores.
    z_A : pd.Series
        Attention z-scores.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.Series
        Composite strength score \(s_{i,t}\).
    """
    logger.info("Starting Task 11: Composite Score Computation")

    # Compute
    s_series = calculate_composite_score(z_r, z_sigma, z_A, config)

    # Validate
    validate_composite_score(s_series)

    logger.info("Task 11 Completed: Composite score computed.")
    return s_series


In [None]:
# Task 12: Segment suspicious windows using hysteresis (Algorithm 1)

# ==============================================================================
# Task 12: Segment suspicious windows using hysteresis (Algorithm 1)
# ==============================================================================

class SuspiciousWindow(NamedTuple):
    """
    Represents a contiguous time interval identified as suspicious by the
    hysteresis segmentation algorithm.

    This structure serves as the fundamental unit of detection in the AIMM-X
    pipeline. It captures the temporal extent of an anomaly for a specific asset,
    which is subsequently scored and ranked.

    Parameters
    ----------
    ticker : str
        The unique identifier of the asset (e.g., "GME").
    t_start : pd.Timestamp
        The timestamp marking the beginning of the suspicious window (inclusive).
        Corresponds to the first bar where the composite score exceeded theta_high.
    t_end : pd.Timestamp
        The timestamp marking the end of the suspicious window (inclusive).
        Corresponds to the last bar where the composite score exceeded theta_low
        before the exit condition was met.
    length_bars : int
        The duration of the window in trading sessions (bars).
        Calculated as the count of valid trading sessions in [t_start, t_end].
    """
    ticker: str
    t_start: pd.Timestamp
    t_end: pd.Timestamp
    length_bars: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1 & 2: Implement the hysteresis state machine.
# -------------------------------------------------------------------------------------------------------------------------------
def apply_hysteresis_logic(
    dates: np.ndarray,
    scores: np.ndarray,
    ticker: str,
    theta_high: float,
    theta_low: float,
    gap_tolerance: int,
    min_len: int
) -> List[SuspiciousWindow]:
    """
    Applies the Schmitt Trigger (Hysteresis) logic to segment windows for a single ticker.

    Algorithm:
    1. Scan strictly chronological scores.
    2. Trigger 'WINDOW' state if score > theta_high.
    3. Maintain 'WINDOW' state if score > theta_low.
    4. Allow dips <= theta_low for up to 'gap_tolerance' consecutive bars.
    5. Close window if gap limit exceeded or series ends.
    6. Filter windows shorter than 'min_len'.

    Parameters
    ----------
    dates : np.ndarray
        Array of pd.Timestamp.
    scores : np.ndarray
        Array of float scores corresponding to dates.
    ticker : str
        Ticker symbol.
    theta_high : float
        Trigger threshold.
    theta_low : float
        Sustain threshold.
    gap_tolerance : int
        Max consecutive bars below theta_low allowed within a window.
    min_len : int
        Minimum window length (inclusive of start/end).

    Returns
    -------
    List[SuspiciousWindow]
        Detected windows.
    """
    windows = []

    # State variables
    in_window = False
    window_start_idx = -1
    last_valid_idx = -1  # Last index where score > theta_low
    gap_counter = 0

    n = len(scores)

    for i in range(n):
        s = scores[i]

        # Skip NaNs (treat as low score / gap)
        if np.isnan(s):
            is_high = False
            is_low = False
        else:
            is_high = s > theta_high
            is_low = s > theta_low

        if not in_window:
            # Attempt to start window
            if is_high:
                in_window = True
                window_start_idx = i
                last_valid_idx = i
                gap_counter = 0
        else:
            # Already in window
            if is_low:
                # Signal is strong enough to sustain
                last_valid_idx = i
                gap_counter = 0  # Reset gap
            else:
                # Signal dipped
                gap_counter += 1

                # Check exit condition
                if gap_counter > gap_tolerance:
                    # Close window at last_valid_idx
                    # Length is inclusive: end - start + 1
                    # Note: last_valid_idx is relative to the start of this array

                    # Check length constraint
                    # The window effectively ended at last_valid_idx
                    # But we only realize it now (at i)

                    # Edge case: if window started but immediately dipped and never recovered,
                    # last_valid_idx might be the start_idx.

                    win_len = last_valid_idx - window_start_idx + 1

                    if win_len >= min_len:
                        windows.append(SuspiciousWindow(
                            ticker=ticker,
                            t_start=dates[window_start_idx],
                            t_end=dates[last_valid_idx],
                            length_bars=win_len
                        ))

                    # Reset state
                    in_window = False
                    gap_counter = 0
                    window_start_idx = -1
                    last_valid_idx = -1

    # Handle window active at end of series
    if in_window:
        win_len = last_valid_idx - window_start_idx + 1
        if win_len >= min_len:
            windows.append(SuspiciousWindow(
                ticker=ticker,
                t_start=dates[window_start_idx],
                t_end=dates[last_valid_idx],
                length_bars=win_len
            ))

    return windows


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Aggregate windows across all tickers.
# -------------------------------------------------------------------------------------------------------------------------------
def aggregate_windows(
    s_series: pd.Series,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Applies segmentation to all tickers and aggregates results.

    Parameters
    ----------
    s_series : pd.Series
        Composite strength scores.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        DataFrame of detected windows.
    """
    # Load params
    params = config["segmentation_algorithm"]["parameters"]
    theta_high = params["theta_high"]
    theta_low = params["theta_low"]
    gap = params["gap_tolerance_g"]
    min_len = params["min_window_len_Lmin"]

    all_windows = []

    # Iterate by ticker
    # Assuming index level 1 is ticker
    for ticker, sub_df in s_series.groupby(level="ticker"):
        # Extract arrays
        # sub_df index is (date, ticker), we need just dates
        dates = sub_df.index.get_level_values("date").values
        scores = sub_df.values

        ticker_windows = apply_hysteresis_logic(
            dates, scores, str(ticker),
            theta_high, theta_low, gap, min_len
        )
        all_windows.extend(ticker_windows)

    # Convert to DataFrame
    if not all_windows:
        logger.warning("No suspicious windows detected.")
        return pd.DataFrame(columns=["ticker", "t_start", "t_end", "length_bars"])

    df_windows = pd.DataFrame(all_windows)
    return df_windows


# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def segment_windows_hysteresis(
    s_series: pd.Series,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the segmentation of suspicious windows.

    1. Loads segmentation parameters.
    2. Applies hysteresis logic per ticker.
    3. Aggregates and returns the window list.

    Parameters
    ----------
    s_series : pd.Series
        Composite strength score.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Detected windows with columns [ticker, t_start, t_end, length_bars].
    """
    logger.info("Starting Task 12: Window Segmentation")

    # Aggregate windows
    df_windows = aggregate_windows(s_series, config)

    logger.info(f"Task 12 Completed: Detected {len(df_windows)} windows.")
    return df_windows


In [None]:
# Task 13: Compute (\phi)-signals for each window (Eq. 8–13)

# ==============================================================================
# Task 13: Compute \(\phi\)-signals for each window (Eq. 8–13)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1 & 2: Compute enabled factors (phi_1, phi_2, phi_3, phi_4).
# -------------------------------------------------------------------------------------------------------------------------------
def calculate_window_factors(
    ticker: str,
    t_start: pd.Timestamp,
    t_end: pd.Timestamp,
    z_r_series: pd.Series,
    z_sigma_series: pd.Series,
    z_A_series: pd.Series
) -> Dict[str, float]:
    """
    Computes the interpretable evidence factors (phi) for a single window.

    Equations:
    - phi_1: Sum of squared return z-scores (Shock Intensity).
    - phi_2: Sum of positive volatility z-scores (Volatility Anomaly).
    - phi_3: Sum of positive attention z-scores (Attention Spike).
    - phi_4: Average correlation of Attention with Returns and Volatility (Alignment).

    Parameters
    ----------
    ticker : str
        Ticker symbol.
    t_start : pd.Timestamp
        Window start date.
    t_end : pd.Timestamp
        Window end date.
    z_r_series : pd.Series
        Global return z-scores.
    z_sigma_series : pd.Series
        Global volatility z-scores.
    z_A_series : pd.Series
        Global attention z-scores.

    Returns
    -------
    Dict[str, float]
        Dictionary containing phi_1, phi_2, phi_3, phi_4.
    """

    # Construct slice lookup
    try:
        # Extract window data
        # Note: xs creates a copy, which is fine for small windows
        w_z_r = z_r_series.xs(ticker, level="ticker").loc[t_start:t_end]
        w_z_sigma = z_sigma_series.xs(ticker, level="ticker").loc[t_start:t_end]
        w_z_A = z_A_series.xs(ticker, level="ticker").loc[t_start:t_end]
    except KeyError:
        # Should not happen if window detection logic is correct
        logger.error(f"Data missing for window {ticker} {t_start}-{t_end}")
        return {"phi_1": 0.0, "phi_2": 0.0, "phi_3": 0.0, "phi_4": 0.0}

    # Phi 1: Return Shock Intensity (Sum of Squares)
    phi_1 = (w_z_r ** 2).sum()

    # Phi 2: Volatility Anomaly (Sum of Positive Deviations)
    phi_2 = w_z_sigma.clip(lower=0).sum()

    # Phi 3: Attention Spike Magnitude (Sum of Positive Deviations)
    phi_3 = w_z_A.clip(lower=0).sum()

    # Phi 4: Co-movement Alignment (Correlation)
    # Handle degenerate cases (len < 2 or constant variance)
    if len(w_z_r) < 2:
        phi_4 = 0.0
    else:
        # Compute correlations
        # fillna(0) handles cases where std is 0 (constant signal) -> corr is NaN
        corr_r_A = w_z_r.corr(w_z_A)
        corr_sigma_A = w_z_sigma.corr(w_z_A)

        # Treat NaN correlation as 0 (no alignment)
        if np.isnan(corr_r_A): corr_r_A = 0.0
        if np.isnan(corr_sigma_A): corr_sigma_A = 0.0

        phi_4 = 0.5 * (corr_r_A + corr_sigma_A)

    return {
        "phi_1": phi_1,
        "phi_2": phi_2,
        "phi_3": phi_3,
        "phi_4": phi_4
    }


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 3: Set \(\phi_5\) and \(\phi_6\) to zero.
# -------------------------------------------------------------------------------------------------------------------------------
def add_disabled_factors(phi_dict: Dict[str, float]) -> Dict[str, float]:
    """
    Adds placeholder values for disabled factors phi_5 and phi_6.

    Parameters
    ----------
    phi_dict : Dict[str, float]
        Dictionary of computed factors.

    Returns
    -------
    Dict[str, float]
        Updated dictionary.
    """
    phi_dict["phi_5"] = 0.0
    phi_dict["phi_6"] = 0.0
    return phi_dict


# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_phi_factors(
    df_windows: pd.DataFrame,
    z_r_series: pd.Series,
    z_sigma_series: pd.Series,
    z_A_series: pd.Series,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the computation of phi-signals for all detected windows.

    1. Iterates through each window in the DataFrame.
    2. Slices the underlying z-score series.
    3. Computes phi_1 through phi_6.
    4. Appends results to the DataFrame.

    Parameters
    ----------
    df_windows : pd.DataFrame
        DataFrame of detected windows (from Task 12).
    z_r_series : pd.Series
        Return z-scores.
    z_sigma_series : pd.Series
        Volatility z-scores.
    z_A_series : pd.Series
        Attention z-scores.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        The input DataFrame enriched with columns 'phi_1' ... 'phi_6'.
    """
    logger.info("Starting Task 13: Phi Factor Computation")

    if df_windows.empty:
        logger.warning("No windows to score.")
        # Return empty DF with correct columns
        cols = df_windows.columns.tolist() + [f"phi_{i}" for i in range(1, 7)]
        return pd.DataFrame(columns=cols)

    # Pre-sort z-scores to ensure efficient slicing if not already sorted
    # (Task 2 validated sorting, but xs() relies on it for performance)
    phi_results = []

    for idx, row in df_windows.iterrows():
        ticker = row["ticker"]
        t_start = row["t_start"]
        t_end = row["t_end"]

        # Compute enabled factors
        factors = calculate_window_factors(
            ticker, t_start, t_end,
            z_r_series, z_sigma_series, z_A_series
        )

        # Add disabled factors
        factors = add_disabled_factors(factors)

        phi_results.append(factors)

    # Create DataFrame from results
    df_phi = pd.DataFrame(phi_results, index=df_windows.index)

    # Concatenate with original window info
    df_enriched = pd.concat([df_windows, df_phi], axis=1)

    logger.info("Task 13 Completed: Phi factors computed.")
    return df_enriched


In [None]:
# Task 14: Compute Integrity Score \(M(w)\) and factor decomposition (Eq. 14)

# ==============================================================================
# Task 14: Compute Integrity Score \(M(w)\) and factor decomposition (Eq. 14)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1 & 2 & 3: Compute scores and contributions.
# -------------------------------------------------------------------------------------------------------------------------------
def calculate_scores_and_contributions(
    df_windows: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Computes the composite Integrity Score M(w) and individual factor contributions.

    Equation:
    \[
    M(w) = \sum_{k=1}^{6} \omega_k \cdot \phi_k(w)
    \]

    Parameters
    ----------
    df_windows : pd.DataFrame
        DataFrame containing windows and phi factors.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        DataFrame enriched with 'M' and 'contrib_k' columns.
    """
    if df_windows.empty:
        # Return empty with correct schema
        cols = df_windows.columns.tolist() + ["M"] + [f"contrib_{i}" for i in range(1, 7)]
        return pd.DataFrame(columns=cols)

    df_scored = df_windows.copy()

    # Load weights
    weights = config["scoring_model"]["integrity_score_M"]["phi_weights_omega"]

    # Initialize M
    M_series = pd.Series(0.0, index=df_scored.index)

    # Compute contributions and sum
    for k in range(1, 7):
        phi_col = f"phi_{k}"
        weight_key = f"omega_{k}"
        contrib_col = f"contrib_{k}"

        weight = weights.get(weight_key, 0.0)

        if phi_col in df_scored.columns:
            # Compute contribution: w_k * phi_k
            contribution = df_scored[phi_col] * weight
            df_scored[contrib_col] = contribution

            # Add to total score
            M_series += contribution
        else:
            logger.warning(f"Phi column '{phi_col}' missing. Treating as 0.")
            df_scored[contrib_col] = 0.0

    df_scored["M"] = M_series

    return df_scored


# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def compute_integrity_scores(
    df_windows: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Orchestrates the scoring of suspicious windows.

    1. Loads weights.
    2. Calculates per-factor contributions.
    3. Aggregates into total Integrity Score M.

    Parameters
    ----------
    df_windows : pd.DataFrame
        DataFrame with phi factors.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Scored windows.
    """
    logger.info("Starting Task 14: Integrity Scoring")

    # Compute scores and contributions
    df_scored = calculate_scores_and_contributions(df_windows, config)

    # Log stats
    if not df_scored.empty:
        logger.info(
            f"Scoring Complete. Max M={df_scored['M'].max():.2f}, "
            f"Mean M={df_scored['M'].mean():.2f}"
        )

    logger.info("Task 14 Completed: Windows scored.")
    return df_scored


In [None]:
# Task 15: Compute rank percentile and apply reporting filters (Eq. 15)

# ==============================================================================
# Task 15: Compute rank percentile and apply reporting filters (Eq. 15)
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 1: Compute rank percentile using Eq. (15).
# -------------------------------------------------------------------------------------------------------------------------------
def compute_rank_percentiles(df: pd.DataFrame) -> pd.DataFrame:
    """
    Computes the rank percentile for each window based on its Integrity Score M.

    Equation:
    \[
    \text{rank\_pct}(w) = \frac{\#\{w' : M(w') < M(w)\}}{N_{\text{windows}}}
    \]

    Parameters
    ----------
    df : pd.DataFrame
        Scored windows DataFrame.

    Returns
    -------
    pd.DataFrame
        DataFrame with 'rank_pct' column added.
    """
    if df.empty:
        df_out = df.copy()
        df_out["rank_pct"] = []
        return df_out

    # Get ranking parameters
    df_out = df.copy()
    N = len(df_out)

    # Apply ranks
    ranks = df_out["M"].rank(method="min")
    df_out["rank_pct"] = (ranks - 1) / N

    return df_out


# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 2: Apply warmup exclusion filter.
# -------------------------------------------------------------------------------------------------------------------------------
def get_warmup_mask(
    df_windows: pd.DataFrame,
    z_series_list: List[pd.Series]
) -> pd.Series:
    """
    Identifies windows that overlap with the warmup period (where baselines are incomplete).

    Logic:
    For each ticker, determine the first valid timestamp across all z-score channels.
    Any window starting before this timestamp is flagged for exclusion.

    Parameters
    ----------
    df_windows : pd.DataFrame
        The windows DataFrame.
    z_series_list : List[pd.Series]
        List of z-score series (r, sigma, A).

    Returns
    -------
    pd.Series
        Boolean mask (True = Keep, False = Exclude).
    """
    if df_windows.empty:
        return pd.Series(dtype=bool)

    # Determine valid start date per ticker
    # We need the latest 'first valid index' across all channels
    # (i.e., we need ALL channels to be valid)
    ticker_start_dates = {}

    # Get unique tickers from windows to optimize
    tickers = df_windows["ticker"].unique()

    for ticker in tickers:
        starts = []
        for z_series in z_series_list:
            try:
                # Slice ticker
                ts = z_series.xs(ticker, level="ticker")
                first_valid = ts.first_valid_index()
                if first_valid:
                    starts.append(first_valid)
            except KeyError:
                pass

        if starts:
            # We need the latest of the start dates to ensure ALL are valid
            ticker_start_dates[ticker] = max(starts)
        else:
            # No valid data?
            ticker_start_dates[ticker] = pd.Timestamp.max

    # Apply mask
    # Keep if t_start >= valid_start
    keep_mask = df_windows.apply(
        lambda row: row["t_start"] >= ticker_start_dates.get(row["ticker"], pd.Timestamp.max),
        axis=1
    )

    return keep_mask


# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 3: Apply z-score artifact exclusion filter.
# -------------------------------------------------------------------------------------------------------------------------------
def get_artifact_mask(
    df_windows: pd.DataFrame,
    z_series_list: List[pd.Series],
    cutoff: float
) -> pd.Series:
    """
    Identifies windows containing extreme z-score artifacts (e.g., > 20.0).

    Parameters
    ----------
    df_windows : pd.DataFrame
        The windows DataFrame.
    z_series_list : List[pd.Series]
        List of z-score series.
    cutoff : float
        The z-score magnitude threshold.

    Returns
    -------
    pd.Series
        Boolean mask (True = Keep, False = Exclude).
    """
    if df_windows.empty:
        return pd.Series(dtype=bool)

    keep_mask = []

    # Iterate through each window
    for idx, row in df_windows.iterrows():
        ticker = row["ticker"]
        t_start = row["t_start"]
        t_end = row["t_end"]

        is_artifact = False

        for z_series in z_series_list:
            try:
                # Slice window
                # Use abs() for two-sided check
                window_data = z_series.xs(ticker, level="ticker").loc[t_start:t_end]
                if (window_data.abs() > cutoff).any():
                    is_artifact = True
                    break
            except KeyError:
                pass

        keep_mask.append(not is_artifact)

    return pd.Series(keep_mask, index=df_windows.index)


# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def rank_and_filter_windows(
    df_scored: pd.DataFrame,
    z_r: pd.Series,
    z_sigma: pd.Series,
    z_A: pd.Series,
    config: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Orchestrates ranking and filtering.

    1. Computes rank percentiles on the full set.
    2. Applies warmup exclusion.
    3. Applies artifact exclusion.
    4. Returns both raw (ranked) and filtered datasets.

    Parameters
    ----------
    df_scored : pd.DataFrame
        Scored windows.
    z_r, z_sigma, z_A : pd.Series
        Z-score series for filter checks.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame]
        (df_raw_ranked, df_filtered)
    """
    logger.info("Starting Task 15: Ranking and Filtering")

    # 1. Rank
    df_raw_ranked = compute_rank_percentiles(df_scored)

    # 2. Warmup Filter
    filter_config = config["reporting_and_ranking"]["table_and_artifact_filters"]

    if filter_config["exclude_warmup"]:
        mask_warmup = get_warmup_mask(df_raw_ranked, [z_r, z_sigma, z_A])
        n_warmup = (~mask_warmup).sum()
        logger.info(f"Warmup Filter: Excluded {n_warmup} windows.")
    else:
        mask_warmup = pd.Series(True, index=df_raw_ranked.index)

    # 3. Artifact Filter
    cutoff = filter_config["max_z_score_cutoff"]
    mask_artifact = get_artifact_mask(df_raw_ranked, [z_r, z_sigma, z_A], cutoff)
    n_artifact = (~mask_artifact).sum()
    logger.info(f"Artifact Filter (> {cutoff} sigma): Excluded {n_artifact} windows.")

    # 4. Apply Combined Filter
    final_mask = mask_warmup & mask_artifact
    df_filtered = df_raw_ranked[final_mask].copy()

    logger.info(f"Task 15 Completed: {len(df_filtered)} windows remain after filtering.")
    return df_raw_ranked, df_filtered


In [None]:
# Task 16: Produce study output artifacts

# ==============================================================================
# Task 16: Produce study output artifacts
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 1: Produce the complete window list (raw, unfiltered).
# -------------------------------------------------------------------------------------------------------------------------------
def prepare_raw_window_list(df_raw_ranked: pd.DataFrame) -> pd.DataFrame:
    """
    Prepares the complete list of detected windows with all scores and factors.

    Parameters
    ----------
    df_raw_ranked : pd.DataFrame
        The ranked windows DataFrame.

    Returns
    -------
    pd.DataFrame
        The complete artifact.
    """
    # Ensure column order for readability
    base_cols = ["ticker", "t_start", "t_end", "length_bars", "M", "rank_pct"]
    phi_cols = [f"phi_{i}" for i in range(1, 7)]
    contrib_cols = [f"contrib_{i}" for i in range(1, 7)]

    # Select existing columns (handle case where some might be missing if upstream failed)
    cols = [c for c in base_cols + phi_cols + contrib_cols if c in df_raw_ranked.columns]

    return df_raw_ranked[cols].copy()


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2: Produce the Top-N ranked windows table (filtered).
# -------------------------------------------------------------------------------------------------------------------------------
def prepare_top_n_table(
    df_filtered: pd.DataFrame,
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Generates the Top-N suspicious windows table for reporting.

    Parameters
    ----------
    df_filtered : pd.DataFrame
        The filtered windows DataFrame.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    pd.DataFrame
        Top-N windows sorted by Integrity Score M.
    """
    top_n = config["reporting_and_ranking"]["output_tables"]["top_n_windows"]

    if df_filtered.empty:
        return pd.DataFrame()

    # Sort by M descending
    df_sorted = df_filtered.sort_values("M", ascending=False)

    # Take top N
    df_top = df_sorted.head(top_n)

    # Select reporting columns
    cols = ["ticker", "t_start", "t_end", "M", "rank_pct"]
    return df_top[cols].copy()


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 3: Produce factor contribution summary statistics.
# -------------------------------------------------------------------------------------------------------------------------------
def compute_phi_statistics(df_raw: pd.DataFrame) -> pd.DataFrame:
    """
    Computes summary statistics for each phi factor across all detected windows.

    Metrics: Mean, Median, Max, Std, Nonzero Percentage.

    Parameters
    ----------
    df_raw : pd.DataFrame
        The raw windows DataFrame.

    Returns
    -------
    pd.DataFrame
        Summary statistics table.
    """
    if df_raw.empty:
        return pd.DataFrame()

    stats_list = []

    # iterate through the range (1, 7)
    for i in range(1, 7):
        col = f"phi_{i}"
        if col in df_raw.columns:
            series = df_raw[col]
            stats = {
                "factor": col,
                "mean": series.mean(),
                "median": series.median(),
                "max": series.max(),
                "std": series.std(),
                "nonzero_pct": (series > 0).mean()
            }
            stats_list.append(stats)

    return pd.DataFrame(stats_list).set_index("factor")


# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------
def export_artifacts(
    df_raw_ranked: pd.DataFrame,
    df_filtered: pd.DataFrame,
    config: Dict[str, Any]
) -> Dict[str, pd.DataFrame]:
    """
    Orchestrates the generation of final study artifacts.

    1. Prepares the full raw window list.
    2. Generates the Top-N filtered table.
    3. Computes factor summary statistics.

    Parameters
    ----------
    df_raw_ranked : pd.DataFrame
        All detected windows, ranked.
    df_filtered : pd.DataFrame
        Windows passing quality filters.
    config : Dict[str, Any]
        Study configuration.

    Returns
    -------
    Dict[str, pd.DataFrame]
        Dictionary containing 'raw_windows', 'top_n', and 'phi_stats'.
    """
    logger.info("Starting Task 16: Artifact Export")

    artifacts = {}

    # 1. Raw List
    artifacts["raw_windows"] = prepare_raw_window_list(df_raw_ranked)

    # 2. Top N
    artifacts["top_n"] = prepare_top_n_table(df_filtered, config)

    # 3. Stats
    artifacts["phi_stats"] = compute_phi_statistics(df_raw_ranked)

    logger.info("Task 16 Completed: Artifacts generated.")
    return artifacts


In [None]:
# Task 17: Build the end-to-end orchestrator callable

# ==============================================================================
# Task 17: Build the end-to-end orchestrator callable
# ==============================================================================

class PipelineResult(NamedTuple):
    """
    A comprehensive container for all artifacts, data structures, and audit logs
    produced by the execution of the AIMM-X Market Integrity Monitoring pipeline.

    This object encapsulates the complete state of a study run, ensuring
    reproducibility and facilitating downstream analysis or reporting.

    Parameters
    ----------
    config_snapshot : Dict[str, Any]
        The fully validated configuration dictionary used for this run.
        Includes resolved parameters (e.g., explicit session lists) to ensure
        deterministic reproduction.

    audit_log : Dict[str, Any]
        A structured log containing execution metadata, including:
        - Library versions (pandas, numpy).
        - Step-by-step completion status.
        - Data quality metrics (quarantined rows, missing sessions).
        - Counts of detected and filtered windows.

    df_windows_raw : pd.DataFrame
        The complete set of detected windows prior to any filtering.
        Contains all scoring factors (phi_1...phi_6), contributions, and
        integrity scores (M). Useful for analyzing the distribution of anomalies.

    df_windows_filtered : pd.DataFrame
        The subset of windows that passed all quality filters (e.g., warmup
        exclusion, artifact removal). This dataset represents the valid
        candidates for analyst review.

    df_top_n : pd.DataFrame
        A curated table of the highest-ranking windows (by Integrity Score M),
        formatted for reporting (e.g., the "Top 15" table in the paper).

    df_phi_summary : pd.DataFrame
        Summary statistics (mean, median, max, nonzero%) for each phi-factor
        across the detected windows. Used to diagnose factor dominance or
        scaling issues.

    intermediate_series : Dict[str, pd.Series]
        A dictionary containing the intermediate time-series computed during
        the pipeline, including:
        - 'r_series': Log returns.
        - 'sigma_series': Volatility proxy.
        - 'A_series': Fused attention signal.
        - 's_series': Composite strength score.
        - 'z_r', 'z_sigma', 'z_A': Individual channel z-scores.
        Retained for debugging, visualization, and deep-dive analysis.
    """
    config_snapshot: Dict[str, Any]
    audit_log: Dict[str, Any]
    df_windows_raw: pd.DataFrame
    df_windows_filtered: pd.DataFrame
    df_top_n: pd.DataFrame
    df_phi_summary: pd.DataFrame
    intermediate_series: Dict[str, pd.Series]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 1 & 2 & 3: Implement the orchestrator with audit logging.
# -------------------------------------------------------------------------------------------------------------------------------
def run_aimm_x_pipeline(
    df_raw_panel: pd.DataFrame,
    config: Dict[str, Any]
) -> PipelineResult:
    """
    Executes the complete AIMM-X Market Integrity Monitoring pipeline.

    Sequence:
    1.  Validate Config (Task 1)
    2.  Validate Panel Schema (Task 2)
    3.  Cleanse Panel (Task 3)
    4.  Enforce Calendar (Task 4)
    5.  Align Attention (Task 5)
    6.  Normalize Attention (Task 6)
    7.  Fuse Attention (Task 7)
    8.  Compute Returns (Task 8)
    9.  Compute Volatility (Task 9)
    10. Compute Z-Scores (Task 10)
    11. Compute Composite Score (Task 11)
    12. Segment Windows (Task 12)
    13. Compute Phi Factors (Task 13)
    14. Score Windows (Task 14)
    15. Rank & Filter (Task 15)
    16. Export Artifacts (Task 16)

    Parameters
    ----------
    df_raw_panel : pd.DataFrame
        The raw input panel data.
    config : Dict[str, Any]
        The study configuration.

    Returns
    -------
    PipelineResult
        A named tuple containing all study artifacts and audit logs.
    """
    audit_log = {
        "python_version": sys.version,
        "pandas_version": pd.__version__,
        "numpy_version": np.__version__,
        "steps_completed": []
    }

    try:
        # Task 1: Validate Config
        logger.info("=== Pipeline Step 1: Config Validation ===")
        validated_config = validate_study_config(config)
        audit_log["steps_completed"].append("Task 1")

        # Task 2: Validate Schema
        logger.info("=== Pipeline Step 2: Schema Validation ===")
        validate_panel_schema(df_raw_panel, validated_config)
        audit_log["steps_completed"].append("Task 2")

        # Task 3: Cleanse Panel
        logger.info("=== Pipeline Step 3: Panel Cleansing ===")
        df_clean, df_quarantine = cleanse_panel(df_raw_panel, validated_config)
        audit_log["quarantined_rows"] = len(df_quarantine)
        audit_log["clean_rows"] = len(df_clean)
        audit_log["steps_completed"].append("Task 3")

        # Task 4: Enforce Calendar
        logger.info("=== Pipeline Step 4: Calendar Enforcement ===")
        calendar_report = enforce_trading_calendar(df_clean, validated_config)
        audit_log["calendar_report"] = calendar_report
        audit_log["steps_completed"].append("Task 4")

        # Task 5: Align Attention
        logger.info("=== Pipeline Step 5: Attention Alignment ===")
        aligned_attention = align_attention_sources(df_clean, validated_config)
        audit_log["steps_completed"].append("Task 5")

        # Task 6: Normalize Attention
        logger.info("=== Pipeline Step 6: Attention Normalization ===")
        tilde_a = normalize_attention_sources(aligned_attention, validated_config)
        audit_log["steps_completed"].append("Task 6")

        # Task 7: Fuse Attention
        logger.info("=== Pipeline Step 7: Attention Fusion ===")
        A_series = fuse_attention(tilde_a, validated_config)
        audit_log["steps_completed"].append("Task 7")

        # Task 8: Compute Returns
        logger.info("=== Pipeline Step 8: Return Computation ===")
        r_series = compute_log_returns(df_clean, validated_config)
        audit_log["steps_completed"].append("Task 8")

        # Task 9: Compute Volatility
        logger.info("=== Pipeline Step 9: Volatility Computation ===")
        sigma_series = compute_volatility(r_series, validated_config)
        audit_log["steps_completed"].append("Task 9")

        # Task 10: Compute Z-Scores
        logger.info("=== Pipeline Step 10: Z-Score Computation ===")
        z_r, z_sigma, z_A = compute_baselines_and_zscores(
            r_series, sigma_series, A_series, validated_config
        )
        audit_log["steps_completed"].append("Task 10")

        # Task 11: Composite Score
        logger.info("=== Pipeline Step 11: Composite Score ===")
        s_series = compute_composite_strength(z_r, z_sigma, z_A, validated_config)
        audit_log["steps_completed"].append("Task 11")

        # Task 12: Segment Windows
        logger.info("=== Pipeline Step 12: Window Segmentation ===")
        df_windows_raw = segment_windows_hysteresis(s_series, validated_config)
        audit_log["raw_windows_count"] = len(df_windows_raw)
        audit_log["steps_completed"].append("Task 12")

        # Task 13: Compute Phi Factors
        logger.info("=== Pipeline Step 13: Phi Factor Computation ===")
        df_phi = compute_phi_factors(
            df_windows_raw, z_r, z_sigma, z_A, validated_config
        )
        audit_log["steps_completed"].append("Task 13")

        # Task 14: Score Windows
        logger.info("=== Pipeline Step 14: Scoring ===")
        df_scored = compute_integrity_scores(df_phi, validated_config)
        audit_log["steps_completed"].append("Task 14")

        # Task 15: Rank & Filter
        logger.info("=== Pipeline Step 15: Ranking & Filtering ===")
        df_ranked, df_filtered = rank_and_filter_windows(
            df_scored, z_r, z_sigma, z_A, validated_config
        )
        audit_log["filtered_windows_count"] = len(df_filtered)
        audit_log["steps_completed"].append("Task 15")

        # Task 16: Export Artifacts
        logger.info("=== Pipeline Step 16: Artifact Export ===")
        artifacts = export_artifacts(df_ranked, df_filtered, validated_config)
        audit_log["steps_completed"].append("Task 16")

        # Package Results
        intermediate_series = {
            "r_series": r_series,
            "sigma_series": sigma_series,
            "A_series": A_series,
            "s_series": s_series,
            "z_r": z_r,
            "z_sigma": z_sigma,
            "z_A": z_A
        }

        return PipelineResult(
            config_snapshot=validated_config,
            audit_log=audit_log,
            df_windows_raw=artifacts["raw_windows"],
            df_windows_filtered=df_filtered,
            df_top_n=artifacts["top_n"],
            df_phi_summary=artifacts["phi_stats"],
            intermediate_series=intermediate_series
        )

    except Exception as e:
        logger.critical(f"Pipeline failed at step: {audit_log['steps_completed'][-1] if audit_log['steps_completed'] else 'Start'}")
        logger.critical(f"Error: {str(e)}")
        raise
