# `README.md`

# On-Chain Behavioral Pre-Emption System (OBPS): Early-Warning Signals of Political Risk in Stablecoin Markets

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2512.00893-b31b1b.svg)](https://arxiv.org/abs/2512.00893)
[![Journal](https://img.shields.io/badge/Journal-Quantitative%20Finance%20%28q--fin.ST%29-003366)](https://arxiv.org/abs/2512.00893)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/early_warning_signals_political_risk_stablecoin_markets)
[![Discipline](https://img.shields.io/badge/Discipline-Econophysics%20%7C%20Blockchain%20Analytics-00529B)](https://github.com/chirindaopensource/early_warning_signals_political_risk_stablecoin_markets)
[![Data Sources](https://img.shields.io/badge/Data-Ethereum%20Blockchain%20%28ERC--20%29-lightgrey)](https://etherscan.io/)
[![Data Sources](https://img.shields.io/badge/Data-Google%20BigQuery%20Crypto-lightgrey)](https://cloud.google.com/blog/products/data-analytics/introducing-six-new-cryptocurrencies-in-bigquery-public-datasets-and-how-to-analyze-them)
[![Data Sources](https://img.shields.io/badge/Data-CoinGecko%20%2F%20CEX%20Data-lightgrey)](https://www.coingecko.com/)
[![Core Method](https://img.shields.io/badge/Method-Bai--Perron%20Structural%20Breaks%20%7C%20Hilbert--Huang%20Transform-orange)](https://github.com/chirindaopensource/early_warning_signals_political_risk_stablecoin_markets)
[![Analysis](https://img.shields.io/badge/Analysis-Structural%20VAR%20%7C%20AAFT%20Surrogates-red)](https://github.com/chirindaopensource/early_warning_signals_political_risk_stablecoin_markets)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%230C55A5.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![Statsmodels](https://img.shields.io/badge/statsmodels-blue.svg)](https://www.statsmodels.org/)
[![PyYAML](https://img.shields.io/badge/PyYAML-gray?logo=yaml&logoColor=white)](https://pyyaml.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)

**Repository:** `https://github.com/chirindaopensource/early_warning_signals_political_risk_stablecoin_markets`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"Early-Warning Signals of Political Risk in Stablecoin Markets: Human and Algorithmic Behavior Around the 2024 U.S. Election"** by:

*   Kundan Mukhia
*   Buddha Nath Sharma
*   Salam Rabindrajit Luwang
*   Md. Nurujjaman
*   Chittaranjan Hens
*   Suman Saha
*   Tanujit Chakraborty

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous on-chain data validation and behavioral topology classification to advanced structural break detection, non-linear signal processing via Hilbert-Huang Transform, and regime-dependent Structural Vector Autoregression (SVAR) analysis.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `run_obps_pipeline`](#key-callable-run_obps_pipeline)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the analytical framework presented in Mukhia et al. (2025). The core of this repository is the iPython Notebook `early_warning_signals_political_risk_stablecoin_markets_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings. The pipeline is designed to be a generalizable toolkit for detecting and quantifying the transmission of political uncertainty into cryptocurrency markets.

The paper addresses the challenge of identifying early-warning signals of market stress by distinguishing between human-driven peer-to-peer stablecoin transactions and automated algorithmic activity. This codebase operationalizes the paper's framework, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single `config.yaml` file.
-   Process raw ERC-20 transaction logs to isolate **Human (EOA-EOA)** and **Algorithmic (SC-SC)** flows.
-   Implement **Bai-Perron Structural Break Detection** using dynamic programming to identify endogenous regime shifts in transaction volumes.
-   Apply **Empirical Mode Decomposition (EMD)** and **Hilbert Spectral Analysis** to detect non-linear extreme volatility events in BTC and ETH prices.
-   Execute **Structural Vector Autoregression (SVAR)** with Cholesky identification to quantify volatility spillovers between stablecoins (USDT/USDC).
-   Validate findings using **Amplitude-Adjusted Fourier Transform (AAFT)** surrogates and **Wald statistics** for regime comparison.
-   Automatically generate a comprehensive synthesis report confirming the "early warning" property of human on-chain flows.

## Theoretical Background

The implemented methods are grounded in econophysics, time-series econometrics, and blockchain analytics.

**1. Behavioral Topology Classification:**
The core differentiation strategy relies on the Ethereum Account Model.
-   **Human Signal (EOA-EOA):** Transactions where both sender and receiver are Externally Owned Accounts (controlled by private keys). This captures direct human sentiment and precautionary hedging.
-   **Algorithmic Signal (SC-SC):** Transactions involving Smart Contracts, reflecting automated arbitrage, DeFi protocols, and bot activity.

**2. Structural Break Detection:**
The pipeline uses the Bai-Perron methodology to estimate unknown break dates $T_1, \dots, T_m$ in a mean-shift model $y_t = \mu_j + u_t$. The optimal partition is found by minimizing the global Sum of Squared Residuals (SSR) via dynamic programming:
$$ \{\hat{T}_1, \dots, \hat{T}_m\} = \arg \min_{(T_1, \dots, T_m)} SSR(T_1, \dots, T_m) $$

**3. Non-Linear Signal Processing:**
To analyze non-stationary price dynamics, the Hilbert-Huang Transform is used:
-   **EMD:** Decomposes the signal into Intrinsic Mode Functions (IMFs).
-   **Hilbert Spectrum:** Computes instantaneous energy $IE(t) = \int H^2(t, \omega) d\omega$.
-   **Extreme Events:** Identified when $IE(t) > E_\mu + 4\sigma$.

**4. Structural VAR:**
Volatility spillovers are modeled using a regime-dependent SVAR:
$$ A_0 u_t = \varepsilon_t \implies \Sigma_u = A_0^{-1} (A_0^{-1})' $$
Identification is achieved via Cholesky decomposition, and regime shifts are tested using the Wald statistic.

## Features

The provided iPython Notebook (`early_warning_signals_political_risk_stablecoin_markets_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into 32 distinct, modular tasks, each with its own orchestrator function.
-   **Configuration-Driven Design:** All study parameters are managed in an external `config.yaml` file.
-   **Rigorous Data Validation:** A multi-stage validation process checks the schema, content integrity, and temporal consistency of blockchain and market data.
-   **Advanced Econometrics:** Integrates `statsmodels` for ADF tests and VAR estimation, and custom `numpy`/`scipy` implementations for Bai-Perron DP and EMD.
-   **Robustness Verification:** Includes automated AAFT surrogate generation (1000 iterations) to validate the statistical significance of all detected breaks and events.
-   **Reproducible Artifacts:** Generates structured dataclasses for every intermediate result, ensuring full auditability.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Validation & Preprocessing (Tasks 1-9):** Ingests raw chain logs, validates schemas, filters by time/status, normalizes values to USD, deduplicates, and classifies topology.
2.  **Market Data Processing (Tasks 10-11):** Cleanses exchange data and extracts aligned price/volume series.
3.  **Panel Construction (Tasks 12-15):** Merges datasets, applies log-transformations, tests stationarity (ADF), and differences I(1) series.
4.  **Structural Break Analysis (Tasks 16-21):** Executes Bai-Perron detection on EOA, SC, and Exchange series, validated by AAFT surrogates.
5.  **HHT Analysis (Tasks 22-26):** Performs EMD and Hilbert Spectral Analysis on BTC/ETH prices to detect extreme volatility events.
6.  **SVAR Analysis (Tasks 27-31):** Estimates regime-dependent VAR models, identifies structural shocks via Cholesky, and performs Wald tests for parameter stability.
7.  **Synthesis (Task 32):** Aggregates all findings and validates them against the expected empirical results (e.g., Nov 3 human signal vs. Nov 5 election).

## Core Components (Notebook Structure)

The `early_warning_signals_political_risk_stablecoin_markets_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the 32 major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `run_obps_pipeline`

The project is designed around a single, top-level user-facing interface function:

-   **`run_obps_pipeline`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, managing data flow between all 32 sub-tasks.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `pyyaml`, `scipy`, `statsmodels`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/early_warning_signals_political_risk_stablecoin_markets.git
    cd early_warning_signals_political_risk_stablecoin_markets
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy pyyaml scipy statsmodels
    ```

## Input Data Structure

The pipeline requires two primary DataFrames:
1.  **`df_chain_raw`**: ERC-20 transfer logs with columns: `timeStamp`, `tokenAddress`, `from`, `to`, `value`, `fromIsContract`, `toIsContract`, `transactionHash`, `blockNumber`, `logIndex`, `txStatus`.
2.  **`df_market_raw`**: Exchange OHLCV data with columns: `Date`, `Symbol`, `Close`, `Volume`.

## Usage

The `early_warning_signals_political_risk_stablecoin_markets_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `run_obps_pipeline` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # 1. Load the master configuration from the YAML file.
    with open('config.yaml', 'r') as f:
        study_config = yaml.safe_load(f)
    
    # 2. Load raw datasets (Example using synthetic generator provided in the notebook)
    # In production, load from CSV/Parquet: pd.read_csv(...)
    df_chain_raw = ...
    df_market_raw = ...
    
    # 3. Execute the entire replication study.
    results = run_obps_pipeline(
        df_chain_raw=df_chain_raw,
        df_market_raw=df_market_raw,
        study_config=study_config
    )
    
    # 4. Access results
    print(f"Conclusion: {results.final_report.overall_conclusion}")
```

## Output Structure

The pipeline returns an `OBPSPipelineResult` object containing all analytical artifacts:
-   **`config`**: Validated configuration object.
-   **`final_panel`**: The processed econometric panel DataFrame.
-   **`structural_breaks`**: Dictionary of detected break dates and SupF statistics.
-   **`break_robustness`**: AAFT p-values for structural breaks.
-   **`hht_analysis`**: Hilbert Spectra and detected extreme event dates.
-   **`svar_model`**: Full-sample VAR estimation results.
-   **`svar_regimes`**: Pre- and Post-election VAR models.
-   **`svar_identification`**: Structural impact matrices (Cholesky factors).
-   **`wald_test`**: Wald statistic for regime shift significance.
-   **`final_report`**: A synthesis object comparing observed results to expected findings.

## Project Structure

```
early_warning_signals_political_risk_stablecoin_markets/
│
├── early_warning_signals_political_risk_stablecoin_markets_draft.ipynb  # Main implementation notebook
├── config.yaml                                                          # Master configuration file
├── requirements.txt                                                     # Python package dependencies
│
├── LICENSE                                                              # MIT Project License File
└── README.md                                                            # This file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can modify study parameters such as:
-   **Observation Window:** `start_date`, `end_date`.
-   **Bai-Perron Settings:** `max_breaks`, `trimming_epsilon`.
-   **HHT Settings:** `max_imfs`, `B` (threshold multiplier).
-   **SVAR Settings:** `max_lags`, `break_date`.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Real-Time Monitoring:** Adapting the pipeline for streaming blockchain data.
-   **Multi-Chain Support:** Extending the analysis to other EVM-compatible chains (Polygon, Arbitrum).
-   **Machine Learning Integration:** Incorporating LSTM or Transformer models for predictive signaling.

## License

This project is licensed under the MIT License. See the `LICENSE` file for details.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@article{mukhia2025earlywarning,
  title={Early-Warning Signals of Political Risk in Stablecoin Markets: Human and Algorithmic Behavior Around the 2024 U.S. Election},
  author={Mukhia, Kundan and Sharma, Buddha Nath and Luwang, Salam Rabindrajit and Nurujjaman, Md. and Hens, Chittaranjan and Saha, Suman and Chakraborty, Tanujit},
  journal={arXiv preprint arXiv:2512.00893},
  year={2025}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). On-Chain Behavioral Pre-Emption System (OBPS): An Open Source Implementation.
GitHub repository: https://github.com/chirindaopensource/early_warning_signals_political_risk_stablecoin_markets
```

## Acknowledgments

-   Credit to **Kundan Mukhia et al.** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, SciPy, and Statsmodels**.

--

*This README was generated based on the structure and content of the `early_warning_signals_political_risk_stablecoin_markets_draft.ipynb` notebook and follows best practices for research software documentation.*


# Paper

Title: "*Early-Warning Signals of Political Risk in Stablecoin Markets: Human and Algorithmic Behavior Around the 2024 U.S. Election*"

Authors: Kundan Mukhia, Buddha Nath Sharma, Salam Rabindrajit Luwang, Md. Nurujjaman, Chittaranjan Hens, Suman Saha, Tanujit Chakraborty

E-Journal Submission Date: 30 November 2025

Link: https://arxiv.org/abs/2512.00893

Abstract:

We study how the 2024 U.S. presidential election, viewed as a major political risk event, affected cryptocurrency markets by distinguishing human-driven peer-to-peer stablecoin transactions from automated algorithmic activity. Using structural break analysis, we find that human-driven Ethereum Request for Comment 20 (ERC-20) transactions shifted on November 3, two days before the election, while exchange trading volumes reacted only on Election Day. Automated smart-contract activity adjusted much later, with structural breaks appearing in January 2025. We validate these shifts using surrogate-based robustness tests. Complementary energy-spectrum analysis of Bitcoin and Ethereum identifies pronounced post-election turbulence, and a structural vector autoregression confirms a regime shift in stablecoin dynamics. Overall, human-driven stablecoin flows act as early-warning indicators of political stress, preceding both exchange behavior and algorithmic responses.

# Summary


### **Executive Abstract**
This paper presents a novel econometric analysis of the 2024 U.S. Presidential Election's impact on cryptocurrency markets. By decomposing Ethereum (ERC-20) blockchain data, the authors isolate **human-driven** (Externally Owned Accounts or EOA) transactions from **algorithmic** (Smart Contract or SC) activity. The central finding is a distinct temporal divergence: human actors exhibit anticipatory hedging behavior *prior* to the election, serving as a leading indicator, while algorithmic agents demonstrate a significant lag, recalibrating only after market volatility stabilizes.

--

### **Problem Definition and Data Architecture**
The authors address a limitation in standard financial literature: the reliance on aggregated exchange-based price and volume data, which obscures the heterogeneity of market participants.

*   **The Event:** The 2024 U.S. Presidential Election is treated as a major exogenous political shock.
*   **The Assets:** The study focuses on the two dominant stablecoins, **Tether (USDT)** and **USD Coin (USDC)**, alongside Bitcoin (BTC) and Ethereum (ETH) as market proxies.
*   **Data Granularity:** The dataset covers March 2024 to February 2025. The authors utilize raw ERC-20 token transfer data to classify transactions into two distinct vectors:
    1.  **EOA-to-EOA:** Peer-to-peer transfers initiated by private keys (Human-driven).
    2.  **SC-to-SC:** Interactions between automated code blocks (Algorithmic/Bot-driven).

### **Methodological Framework**
The paper employs a sophisticated suite of statistical and signal processing techniques to detect structural breaks and non-linear dynamics.

1.  **Stationarity Testing:** The Augmented Dickey-Fuller (ADF) test is applied. Log-transformed series were found to be non-stationary ($I(1)$), necessitating first-differencing for the Vector Autoregression models.
2.  **Structural Break Detection:** The **Bai-Perron (BP)** test is utilized to endogenously identify breakpoints in the time series without prior specification. The **SupF** statistic validates the significance of these regime shifts.
3.  **Robustness via Surrogate Data:** To ensure breaks were not artifacts of noise, the authors employed **Amplitude-Adjusted Fourier Transform (AAFT)** surrogates. This generates synthetic data preserving the original linear autocorrelation and amplitude distribution but randomizing the phase, providing a rigorous null hypothesis.
4.  **Signal Processing (HHT):** The **Hilbert-Huang Transform**, utilizing Empirical Mode Decomposition (EMD), was used on non-stationary BTC/ETH price data to derive instantaneous energy spectra and detect "Extreme Events" (defined as energy exceeding mean + 4 standard deviations).
5.  **Structural Vector Autoregression (SVAR):** An SVAR model with Cholesky decomposition was estimated to quantify volatility spillovers and causal flows between stablecoins across pre- and post-election regimes.

### **Empirical Evidence – The Temporal Divergence**
The core contribution of the paper is the identification of a lead-lag relationship between human and machine actors.

*   **The Human Signal (Leading Indicator):**
    *   The Bai-Perron test detected a statistically significant structural break in **EOA-to-EOA** transactions on **November 3, 2024**—two days *before* the election.
    *   **Interpretation:** This represents precautionary capital reallocation and hedging by human agents anticipating political uncertainty. It serves as an **Early Warning System (EWS)**.

*   **The Market Reaction (Coincident Indicator):**
    *   Centralized Exchange (CEX) trading volumes for USDT and USDC exhibited structural breaks on **November 5, 2024** (Election Day).
    *   **Interpretation:** The broader market reacted synchronously with the event, lagging the on-chain human signal.

*   **The Algorithmic Lag (Lagging Indicator):**
    *   Automated **SC-to-SC** activity did not break until **January 2025** (Jan 2 for USDC, Jan 16 for USDT).
    *   **Interpretation:** Algorithmic trading systems and DeFi protocols are reactive. They require a period of observation to recalibrate parameters to the new post-election equilibrium before altering their execution logic.

### **Volatility Dynamics and Regime Shifts**
The paper quantifies the magnitude of the market stress using HHT and SVAR analysis.

*   **Extreme Energy Events:** The Hilbert Spectrum analysis revealed concentrated regions of high instantaneous energy in BTC and ETH prices immediately following the election (Nov 6–10), confirming the transmission of stress from stablecoin flows to volatile assets.
*   **SVAR Results:**
    *   **Wald Tests:** Confirmed a massive structural shift in the dynamic relationship between USDT and USDC ($p < 0.0001$).
    *   **Impulse Response & Spillovers:** Post-election, volatility spillovers increased by approximately **2848%**.
    *   **Transmission Channel:** USDT was identified as the dominant channel for shock transmission, with its own-shock effects and cross-market spillovers rising significantly more than USDC's.

### **Conclusion and Implications**
The study establishes that on-chain data possesses predictive power superior to traditional exchange metrics during political risk events.

1.  **Alpha Generation/Risk Management:** Investors can utilize EOA-specific stablecoin velocity as a leading indicator for volatility, providing a roughly 48-hour advantage over exchange-based signals.
2.  **Algorithmic Rigidity:** The significant delay in smart contract adjustments suggests that current algorithmic trading strategies lack the semantic understanding to process exogenous political shocks in real-time, creating a window of inefficiency.
3.  **Systemic Stability:** The massive increase in volatility spillovers post-election highlights the fragility of the crypto-ecosystem to political narratives, with stablecoins acting as the primary conduit for this systemic stress.

# Import Essential Modules


In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  On-Chain Behavioral Pre-Emption System (OBPS) for Political Risk Analysis
#
#  This module provides a complete, production-grade implementation of the
#  analytical framework presented in "Early-Warning Signals of Political Risk in
#  Stablecoin Markets: Human and Algorithmic Behavior Around the 2024 U.S. Election"
#  by Mukhia et al. (2025). It delivers a rigorous system for detecting and
#  quantifying the transmission of political uncertainty into cryptocurrency markets
#  by isolating human-driven on-chain signals from algorithmic activity.
#
#  Core Methodological Components:
#  • Behavioral Topology Classification: Segregation of EOA-EOA (Human) vs. SC-SC (Algo) flows
#  • Endogenous Structural Break Detection: Bai-Perron mean-shift models with dynamic programming
#  • Non-Linear Signal Processing: Hilbert-Huang Transform (HHT) for instantaneous energy analysis
#  • Robustness Verification: Amplitude-Adjusted Fourier Transform (AAFT) surrogate testing
#  • Structural Vector Autoregression (SVAR): Regime-dependent volatility spillover analysis
#  • Hypothesis Testing: Wald statistics for structural parameter stability across regimes
#
#  Technical Implementation Features:
#  • Vectorized processing of high-frequency blockchain transaction logs
#  • Rigorous time-series stationarity enforcement via ADF testing
#  • Efficient dynamic programming algorithms for global SSR minimization
#  • FFT-based phase randomization for non-linear surrogate generation
#  • Cholesky identification for structural shock transmission
#  • Comprehensive validation framework for replication fidelity
#
#  Paper Reference:
#  Mukhia, K., Sharma, B. N., Luwang, S. R., Nurujjaman, M., Hens, C., Saha, S., & Chakraborty, T. (2025).
#  Early-Warning Signals of Political Risk in Stablecoin Markets: Human and Algorithmic Behavior
#  Around the 2024 U.S. Election. arXiv preprint arXiv:2512.00893.
#  https://arxiv.org/abs/2512.00893
#
#  License: MIT
#  Author:  CS Chirinda
#  Version: 1.0.0
#
# ==============================================================================#

import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Dict, Any, Set, Optional, Tuple

import numpy as np
import pandas as pd
from scipy.interpolate import CubicSpline
from scipy.signal import argrelextrema, hilbert
from scipy.stats import chi2
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.vector_ar.var_model import VARResultsWrapper


# Implementation

## Draft 1

### **Discussion of the Inputs, Processes and Outputs of Key Callables**

### 1. `validate_study_config` (Task 1 Orchestrator)

*   **Inputs:** A raw Python dictionary (`config`) containing metadata, schema definitions, and analysis parameters.
*   **Processes:** This callable orchestrates a tripartite validation sequence. First, it invokes `validate_meta_config` to enforce temporal logic (e.g., $T_{start} < T_{end}$) and UTC standardization. Second, it calls `validate_schemas` to verify the presence of required columns (e.g., `timeStamp`, `value`) and asset definitions (USDT/USDC addresses). Third, it executes `validate_analysis_config` to enforce mathematical constraints on model parameters (e.g., Bai-Perron trimming parameter $\epsilon \in (0, 0.5)$).
*   **Outputs:** A `ValidatedStudyConfig` immutable dataclass containing typed and verified configuration objects.
*   **Research Context:** This implements the **Data Governance** layer of the pipeline. It ensures that all subsequent econometric models (Bai-Perron, HHT, SVAR) receive parameters that are mathematically valid and consistent with the study's observation window (March 2024 – February 2025).

### 2. `validate_df_chain_raw` (Task 2 Orchestrator)

*   **Inputs:** The raw on-chain DataFrame (`df_raw`) and the validated schema configuration (`schemas`).
*   **Processes:** It executes a sequence of integrity checks: `validate_chain_columns` enforces data types (casting `int64`, `int8`), `validate_chain_domains` applies regex filters (e.g., `^0x[0-9a-fA-F]{40}$` for addresses) and value range checks ($t > 0$), and `validate_chain_uniqueness` computes the cardinality of the composite primary key $(TxHash, Block, LogIndex)$.
*   **Outputs:** A `ChainValidationResult` dataclass containing the validated DataFrame and summary statistics (row counts, duplicates).
*   **Research Context:** This implements the **Data Integrity Verification** step. It ensures the raw ledger data conforms to the ERC-20 standard interface requirements described in Section 2, preventing garbage-in-garbage-out errors in the behavioral topology classification.

### 3. `validate_df_market_raw` (Task 3 Orchestrator)

*   **Inputs:** The raw market DataFrame (`df_raw`), schema config (`schemas`), and metadata (`meta_config`).
*   **Processes:** It validates the market microstructure log by calling `validate_market_columns` (type enforcement), `validate_symbol_coverage` (verifying presence of BTC, ETH, USDT, USDC), and `validate_temporal_coverage` (identifying missing dates via set difference against the expected daily range).
*   **Outputs:** A `MarketValidationResult` dataclass containing the validated DataFrame and coverage metrics.
*   **Research Context:** This implements the **Market Data Quality Control** step. It ensures that the control variables (BTC/ETH prices) and exchange volume series cover the critical election period, a prerequisite for the comparative analysis in Section 4.3.

### 4. `filter_chain_data` (Task 4 Orchestrator)

*   **Inputs:** The validated on-chain DataFrame (`df_validated`), metadata (`meta_config`), and preprocessing config.
*   **Processes:** It transforms the data by converting Unix epoch timestamps to UTC dates via `convert_timestamp_to_utc_date`. It then applies `apply_temporal_filter` to restrict records to the interval $[T_{start}, T_{end}]$ and `filter_transaction_status` to remove failed transactions (where `txStatus \neq 0`).
*   **Outputs:** A `FilteredChainData` dataclass containing the temporally and status-filtered DataFrame.
*   **Research Context:** This implements the **Temporal and Status Filtering** described in Section 2. It isolates the relevant "successful" economic activity within the study window, ensuring that failed transactions do not distort the volume aggregation.

### 5. `normalize_chain_data` (Task 5 Orchestrator)

*   **Inputs:** The filtered chain data (`filtered_data`), schemas, and preprocessing config.
*   **Processes:** It filters the dataset to retain only USDT and USDC transfers via `filter_by_token_address`. Crucially, it applies `normalize_token_values` to transform raw `uint256` values into USD equivalents using Equation (1) from the paper: $\text{USD Value} = \frac{\texttt{value}}{10^6}$. Finally, it removes economically insignificant zero-value transfers via `remove_zero_value_transfers`.
*   **Outputs:** A `NormalizedChainData` dataclass with the normalized `usd_value` column.
*   **Research Context:** This implements the **Unit Normalization** step (Equation 1). It transforms raw blockchain integers into economically meaningful USD figures, enabling the aggregation of volume series for the structural break analysis.

### 6. `deduplicate_chain_data` (Task 6 Orchestrator)

*   **Inputs:** The normalized chain data (`normalized_data`).
*   **Processes:** It identifies duplicate log entries based on the tuple $(TxHash, BlockNumber, LogIndex)$ via `check_uniqueness_stats`. It then executes `remove_duplicates`, sorting by block height to deterministically retain the canonical record, and analyzes internal transaction flags via `analyze_internal_transactions`.
*   **Outputs:** A `DeduplicatedChainData` dataclass containing the unique set of transfer logs.
*   **Research Context:** This implements the **Data Cleansing** requirement mentioned in Section 2 ("Duplicate... transactions were removed to reduce noise"). It ensures that the volume series reflects unique economic transfers, preventing double-counting.

### 7. `classify_chain_topology` (Task 7 Orchestrator)

*   **Inputs:** The deduplicated chain data (`deduplicated_data`).
*   **Processes:** It applies `apply_topology_classification` to categorize transactions based on the `fromIsContract` and `toIsContract` flags. It implements the logic:
    *   **Human (EOA-EOA):** `from=0` AND `to=0`
    *   **Algo (SC-SC):** `from=1` AND `to=1`
    It then validates the classification integrity via `validate_topology_counts`.
*   **Outputs:** A `ClassifiedChainData` dataclass with a categorical `topology` column.
*   **Research Context:** This implements the **Behavioral Topology Classification** described in Section 2 and Table 2. This is the core differentiation strategy of the paper, isolating human sentiment (EOA) from algorithmic response (SC).

### 8. `aggregate_chain_volumes` (Task 8 Orchestrator)

*   **Inputs:** The classified chain data (`classified_data`) and metadata (`meta_config`).
*   **Processes:** It executes `group_and_sum_volumes` to compute daily sums: $V_{k,t,d} = \sum_{i \in \text{group}} \text{usd\_value}_i$. It then reshapes the data via `pivot_to_wide_format` to create distinct columns for each topology-token pair (e.g., `V_EOA_EOA_USDT`) and ensures a complete time series via `reindex_to_full_window`, filling missing days with zero.
*   **Outputs:** An `AggregatedChainData` dataclass containing the daily volume time series.
*   **Research Context:** This implements the **Time Series Aggregation** step. It transforms discrete transaction logs into continuous daily volume signals, which serve as the primary input for the Bai-Perron and SVAR models.

### 9. `validate_daily_chain_series` (Task 9 Orchestrator)

*   **Inputs:** The aggregated chain data (`aggregated_data`) and metadata.
*   **Processes:** It performs statistical validation via `check_series_integrity` (non-negativity), computes descriptive statistics (moments) via `compute_series_statistics`, and identifies outliers using the $4\sigma$ rule via `detect_outliers`. It also generates visualization specifications.
*   **Outputs:** A `ChainSeriesValidationResult` dataclass containing statistical summaries and validation status.
*   **Research Context:** This implements **Descriptive Statistical Analysis**. It verifies the distributional properties of the constructed time series before they are subjected to rigorous econometric testing.

### 10. `cleanse_market_data` (Task 10 Orchestrator)

*   **Inputs:** The validated market results (`validation_result`) and metadata.
*   **Processes:** It normalizes market dates via `normalize_market_dates`, filters to the study window via `filter_market_window`, and removes invalid numeric entries (e.g., negative prices) via `clean_numeric_columns`.
*   **Outputs:** A `CleanedMarketData` dataclass containing the sanitized market DataFrame.
*   **Research Context:** This implements **Market Data Preprocessing**. It ensures that the exchange-based control variables are temporally aligned and numerically valid for the subsequent correlation and causality analysis.

### 11. `extract_market_series` (Task 11 Orchestrator)

*   **Inputs:** The cleaned market data (`cleaned_data`).
*   **Processes:** It separates the dataset into individual time series for BTC price, ETH price, USDT volume, and USDC volume using `extract_price_series` and `extract_volume_series`. It then combines them into a single aligned DataFrame via `combine_market_series` using an inner join on the date index.
*   **Outputs:** A `MarketSeriesData` dataclass containing the aligned market time series.
*   **Research Context:** This implements the **Variable Selection** step. It isolates the specific financial variables (BTC/ETH Close, Stablecoin Volumes) required for the Hilbert-Huang Transform and Structural Break tests.

### 12. `merge_data_sources` (Task 12 Orchestrator)

*   **Inputs:** The aggregated chain data (`chain_data`) and market series (`market_data`).
*   **Processes:** It executes `join_datasets` to perform an inner join on the `Date` column, ensuring temporal alignment between on-chain and off-chain data. It verifies completeness via `verify_panel_completeness` and finalizes the structure via `finalize_panel`.
*   **Outputs:** A `MergedPanelData` dataclass containing the unified econometric panel.
*   **Research Context:** This implements the **Dataset Integration** step. It creates the master dataset $S_{merged} = S_{chain} \cap S_{market}$ required for the multivariate SVAR analysis in Section 4.5.

### 13. `construct_log_series` (Task 13 Orchestrator)

*   **Inputs:** The merged panel data (`merged_data`).
*   **Processes:** It identifies target series via `identify_target_series` and applies logarithmic transformations via `apply_log_transformations`. It uses $y_t = \ln(x_t)$ for prices and $y_t = \ln(x_t + 1)$ for volumes to handle potential zeros.
*   **Outputs:** A `LogTransformedData` dataclass containing the log-level series.
*   **Research Context:** This implements the **Logarithmic Transformation** described in Section 3.1.2 (Equation 9). This transformation stabilizes variance and interprets changes as percentage shifts, which is standard for financial time series analysis.

### 14. `perform_stationarity_tests` (Task 14 Orchestrator)

*   **Inputs:** The log-transformed data (`log_data`) and preprocessing config.
*   **Processes:** It iterates through all series, configuring the Augmented Dickey-Fuller test via `get_adf_config` (Regression='ct', Autolag='AIC'). It executes `run_adf_test` to test the null hypothesis of a unit root ($H_0: \alpha=1$) and summarizes the results via `summarize_stationarity`.
*   **Outputs:** A `StationarityTestResults` dataclass identifying I(1) and I(0) series.
*   **Research Context:** This implements the **Augmented Dickey-Fuller (ADF) Test** described in Section 3.1.1 (Equations 2-5). It determines the integration order of the variables, a critical prerequisite for selecting the appropriate form (levels vs. differences) for the SVAR model.

### 15. `finalize_integration_order` (Task 15 Orchestrator)

*   **Inputs:** Log data (`log_data`), stationarity results (`stationarity_results`), and config.
*   **Processes:** It computes the first difference $\Delta y_t = y_t - y_{t-1}$ for all I(1) series via `compute_differences`. It verifies the stationarity of these differences via `verify_diff_stationarity` and maps each series to its appropriate analysis method (Bai-Perron uses levels, SVAR uses differences) via `map_series_to_methods`.
*   **Outputs:** A `FinalizedSeriesData` dataclass containing the fully prepared dataset and method mappings.
*   **Research Context:** This implements the **Data Transformation for Stationarity** step. It ensures that the input vectors for the SVAR model satisfy the stationarity condition required for stable estimation, while preserving level data for structural break detection.

### 16. `run_bai_perron_eoa` (Task 16 Orchestrator)

*   **Inputs:** The finalized data (`finalized_data`) and analysis config.
*   **Processes:** It extracts the EOA-EOA log-volume series and executes the **Bai-Perron Dynamic Programming Algorithm** via `analyze_series_breaks_robust`. This involves computing the global SSR matrix, finding optimal partitions for $k$ breaks, selecting $k$ via BIC, and computing the SupF statistic.
*   **Outputs:** A `BaiPerronResults` dataclass containing break dates and statistics for USDT and USDC human flows.
*   **Research Context:** This implements the **Structural Break Analysis (SBA)** for human behavior described in Section 4.2. It solves Equation (8): $\{\hat{T}_1, \dots, \hat{T}_m\} = \arg \min SSR(T_1, \dots, T_m)$ to detect the anticipatory shift in human sentiment on Nov 3, 2024.

### 17. `run_bai_perron_sc` (Task 17 Orchestrator)

*   **Inputs:** The finalized data (`finalized_data`) and analysis config.
*   **Processes:** It applies the same robust Bai-Perron analysis (`analyze_series_breaks_robust`) to the SC-SC (algorithmic) log-volume series.
*   **Outputs:** An `SCBaiPerronResults` dataclass containing break dates for algorithmic flows.
*   **Research Context:** This implements the **SBA for Algorithmic Behavior** (Section 4.4). It detects the delayed structural breaks in automated activity (Jan 2025), contrasting them with the early human response.

### 18. `run_bai_perron_exchange` (Task 18 Orchestrator)

*   **Inputs:** The finalized data (`finalized_data`) and analysis config.
*   **Processes:** It applies the Bai-Perron analysis to the exchange volume series (`log_Volume_USDT_USD`, etc.).
*   **Outputs:** An `ExchangeBaiPerronResults` dataclass containing break dates for centralized exchange activity.
*   **Research Context:** This implements the **SBA for Market Microstructure** (Section 4.3.1). It identifies the coincident break on Election Day (Nov 5), serving as a benchmark for the "early warning" property of the on-chain signal.

### 19. `run_bai_perron_prices` (Task 19 Orchestrator)

*   **Inputs:** The finalized data (`finalized_data`) and analysis config.
*   **Processes:** It applies the Bai-Perron analysis to the BTC and ETH log-price series.
*   **Outputs:** A `PriceBaiPerronResults` dataclass containing break dates for asset prices.
*   **Research Context:** This implements the **SBA for Asset Prices**. It confirms that price adjustments (Nov 6-9) lagged behind the on-chain behavioral shifts, validating the predictive utility of the EOA signal.

### 20. `orchestrate_structural_breaks` (Task 20 Orchestrator)

*   **Inputs:** The finalized data (`finalized_data`) and analysis config.
*   **Processes:** This is a meta-orchestrator that iterates through all series flagged for structural break analysis. It calls the internal `callable_bai_perron` (wrapping the robust DP logic) for each series and aggregates the results.
*   **Outputs:** A `StructuralBreakOrchestratorResult` dataclass containing a dictionary of all break detection results.
*   **Research Context:** This unifies the **Structural Break Detection** phase, ensuring a consistent methodological application across all variable types (Human, Algo, Exchange, Price).

### 21. `execute_aaft_robustness` (Task 21 Orchestrator)

*   **Inputs:** Finalized data, break results, and analysis config.
*   **Processes:** It executes `compute_surrogate_stats` for each series with a detected break. This involves generating 1000 AAFT surrogates via `generate_aaft_surrogates` (Gaussianization $\to$ Phase Randomization $\to$ Remapping), running the Bai-Perron algorithm on each surrogate, and computing the empirical p-value of the observed SupF statistic.
*   **Outputs:** An `AAFTRobustnessResults` dataclass containing p-values and surrogate statistics.
*   **Research Context:** This implements the **Robustness Verification** described in Section 3.2.2 and 4.2. It tests the null hypothesis that the observed breaks are artifacts of linear autocorrelation, ensuring the statistical significance of the findings.

### 22. `decompose_prices_emd` (Task 22 Orchestrator)

*   **Inputs:** Finalized data and analysis config.
*   **Processes:** It executes `decompose_series` for BTC and ETH log-prices. This calls `empirical_mode_decomposition`, which iteratively sifts the signal using cubic spline envelopes (`get_envelopes`) until the residue is monotonic, extracting Intrinsic Mode Functions (IMFs).
*   **Outputs:** An `EMDDecompositionResults` dataclass containing the IMFs and residues.
*   **Research Context:** This implements the **Empirical Mode Decomposition (EMD)** described in Section 3.2.1 (Equations 14-17). It decomposes the non-stationary price series into intrinsic oscillatory modes, a prerequisite for the Hilbert Spectral Analysis.

### 23. `compute_hilbert_spectrum` (Task 23 Orchestrator)

*   **Inputs:** EMD results (`emd_results`).
*   **Processes:** It executes `compute_analytic_properties` for each IMF to derive the analytic signal $z(t) = x(t) + iH(t)$ via the Hilbert Transform (Equation 18). It computes instantaneous phase $\phi(t)$, frequency $\omega(t) = d\phi/dt$, and amplitude $A(t)$. It then aggregates these into a time-frequency distribution via `construct_spectrum`.
*   **Outputs:** A `HilbertTransformResults` dataclass containing the Hilbert Spectra.
*   **Research Context:** This implements the **Hilbert Spectral Analysis** described in Section 3.2.1 (Equations 18-21). It provides the high-resolution time-frequency representation necessary to detect transient volatility events.

### 24. `detect_extreme_events` (Task 24 Orchestrator)

*   **Inputs:** Hilbert results, analysis config, and date index.
*   **Processes:** It computes the instantaneous energy $IE(t) = \int H^2(t, \omega) d\omega$ via `compute_instantaneous_energy`. It calculates the adaptive threshold $E_{th} = E_\mu + 4\sigma$ via `compute_threshold` and identifies dates where $IE(t) > E_{th}$ via `find_extreme_events`.
*   **Outputs:** An `ExtremeEventResults` dataclass containing detected event dates.
*   **Research Context:** This implements the **Extreme Event Detection** described in Section 3.2.1 (Equations 22-24). It identifies the specific dates of market turbulence (Nov 7, Nov 10) associated with the election shock.

### 25. `orchestrate_hht` (Task 25 Orchestrator)

*   **Inputs:** Finalized data and analysis config.
*   **Processes:** This is a meta-orchestrator for the HHT pipeline. It sequentially calls `callable_emd`, `callable_hilbert_spectrum`, and `callable_extreme_events` for the target price series.
*   **Outputs:** An `HHTOrchestratorResult` dataclass aggregating all HHT artifacts.
*   **Research Context:** This unifies the **Non-Linear Signal Processing** phase, providing a streamlined execution flow for detecting extreme market events.

### 26. `execute_hht_robustness` (Task 26 Orchestrator)

*   **Inputs:** Finalized data, HHT results, and analysis config.
*   **Processes:** It executes `compute_surrogate_maxima` to generate 1000 AAFT surrogates and compute their maximum instantaneous energy. It then calculates the empirical p-value of the observed maximum energy via `calculate_hht_p_value`.
*   **Outputs:** An `HHTRobustnessResults` dataclass containing significance tests for extreme events.
*   **Research Context:** This implements the **Robustness Verification for HHT**. It ensures that the detected extreme events are statistically significant deviations from the underlying process, not random noise.

### 27. `prepare_svar_inputs` (Task 27 Orchestrator)

*   **Inputs:** Finalized data and analysis config.
*   **Processes:** It selects the differenced EOA-EOA volume series via `select_svar_series`, cleans missing values, and splits the dataset into pre-election ($t < T_{break}$) and post-election ($t \ge T_{break}$) regimes via `split_regimes`.
*   **Outputs:** A `SVARInputData` dataclass containing the stationary input vectors for the VAR model.
*   **Research Context:** This implements the **Data Preparation for SVAR**. It constructs the vector $Y_t = [\Delta \log V_{USDC}, \Delta \log V_{USDT}]'$ and defines the structural break point (Nov 5) for the regime-dependent analysis.

### 28. `estimate_reduced_var` (Task 28 Orchestrator)

*   **Inputs:** SVAR inputs and analysis config.
*   **Processes:** It initializes the VAR model via `initialize_var_model`, selects the optimal lag order $p^*$ using the Akaike Information Criterion (AIC) via `select_optimal_lag`, and estimates the reduced-form model on the full sample via `fit_var_model`.
*   **Outputs:** A `VARModelResults` dataclass containing the fitted model and optimal lag.
*   **Research Context:** This implements the **Reduced-Form VAR Estimation** (Equation 29). It determines the lag structure required to capture the dynamic interdependencies between stablecoin flows.

### 29. `estimate_regime_vars` (Task 29 Orchestrator)

*   **Inputs:** SVAR inputs and full model results.
*   **Processes:** It estimates separate VAR models for the pre-election and post-election periods using the optimal lag $p^*$ via `estimate_regime_var`. It extracts and vectorizes the coefficients ($\hat{\theta}_{pre}, \hat{\theta}_{post}$) and their covariance matrices.
*   **Outputs:** A `RegimeComparisonData` dataclass containing the regime-specific model estimates.
*   **Research Context:** This implements the **Regime-Dependent Estimation**. It provides the parameter estimates required to test for structural changes in the transmission mechanism of volatility.

### 30. `identify_structural_shocks` (Task 30 Orchestrator)

*   **Inputs:** Regime data, SVAR inputs, and analysis config.
*   **Processes:** It executes `compute_cholesky_impact` for each regime and ordering. This involves permuting the residual covariance matrix $\Sigma_u$ to match the identification ordering (e.g., USDC $\to$ USDT) and performing Cholesky decomposition $\Sigma_u = PP'$ to obtain the structural impact matrix $P$ (Equation 32).
*   **Outputs:** A `SVARIdentificationResults` dataclass containing the structural impact matrices.
*   **Research Context:** This implements the **Structural Identification** described in Section 3.3.1 (Equations 30-32). It recovers the orthogonal structural shocks $\varepsilon_t$ and quantifies the contemporaneous spillover effects between stablecoins.

### 31. `perform_wald_test` (Task 31 Orchestrator)

*   **Inputs:** Regime comparison data.
*   **Processes:** It executes `calculate_wald_stat` to compute the Wald statistic $W = (\hat{\theta}_{post} - \hat{\theta}_{pre})' (\hat{\Sigma}_{pre} + \hat{\Sigma}_{post})^{-1} (\hat{\theta}_{post} - \hat{\theta}_{pre})$. It then computes the p-value from the $\chi^2$ distribution via `compute_chi2_p_value`.
*   **Outputs:** A `WaldTestResult` dataclass containing the test statistic and significance.
*   **Research Context:** This implements the **Wald Test for Structural Change** described in Section 3.3.2 (Equations 33-34). It formally tests whether the election induced a statistically significant shift in the dynamic relationships between stablecoins.

### 32. `synthesize_study_results` (Task 32 Orchestrator)

*   **Inputs:** Results from all analysis phases (Breaks, AAFT, HHT, SVAR).
*   **Processes:** It validates the findings against the paper's claims via `verify_breaks` (checking dates like Nov 3), `verify_hht` (checking dates like Nov 7/10), and `verify_svar` (checking Wald significance and spillover increase). It aggregates these checks into a final report.
*   **Outputs:** A `FinalStudyReport` dataclass summarizing the replication success.
*   **Research Context:** This implements the **Synthesis and Validation** phase. It confirms that the computational pipeline reproduces the empirical findings of the study: the early warning signal in human flows, the delayed algorithmic response, and the post-election regime shift in volatility.

### 33. `run_obps_pipeline` (Top-Level Orchestrator)

*   **Inputs:** Raw chain DataFrame, raw market DataFrame, and configuration dictionary.
*   **Processes:** This master function sequentially invokes every orchestrator from Task 1 to Task 32. It manages the data flow, passing validated and transformed data objects between stages (Validation $\to$ Processing $\to$ Panel Construction $\to$ Analysis $\to$ Synthesis).
*   **Outputs:** An `OBPSPipelineResult` dataclass containing every intermediate and final artifact produced by the system.
*   **Research Context:** This is the **End-to-End System Implementation**. It represents the complete, automated realization of the OBPS framework, capable of ingesting raw data and producing the full set of econometric insights and validation metrics required for political risk monitoring.

<br><br>
### **Usage Example**

The following code snippet uses synthetically generated data to illustrate, in a step by step fashion, how to run the "*Early-Warning Signals of Political Risk in Stablecoin Markets: Human and Algorithmic Behavior Around the 2024 U.S. Election*" End-to-End research pipeline accurately:

```python
import pandas as pd
import numpy as np
import yaml
from datetime import datetime, timedelta

# ==============================================================================
# USAGE EXAMPLE: On-Chain Behavioral Pre-Emption System (OBPS)
# ==============================================================================

# This script demonstrates the end-to-end execution of the OBPS pipeline using
# synthetically generated data that mirrors the structure and statistical
# properties of the 2024 U.S. Election study.

# ------------------------------------------------------------------------------
# Step 1: Synthetic Data Generation
# ------------------------------------------------------------------------------
# We generate two DataFrames: 'df_chain_raw' (blockchain logs) and 'df_market_raw'
# (exchange data). The data is seeded to contain the structural breaks and
# volatility events hypothesized in the study (e.g., Nov 3 human signal).

def generate_synthetic_data():
    """
    Generates implementation-grade synthetic datasets for the OBPS pipeline.
    """
    print("Generating synthetic research data...")
    np.random.seed(42)
    
    # --- 1. Define Scope ---
    start_date = pd.Timestamp("2024-03-01", tz="UTC")
    end_date = pd.Timestamp("2025-02-28", tz="UTC")
    days = pd.date_range(start_date, end_date, freq="D")
    n_days = len(days)
    
    # --- 2. Generate Market Data (df_market_raw) ---
    # We simulate BTC/ETH prices with a shock around Nov 5
    market_rows = []
    symbols = ["BTC/USD", "ETH/USD", "USDT/USD", "USDC/USD"]
    
    for sym in symbols:
        # Base price and volatility
        if "BTC" in sym: price, vol = 60000, 0.02
        elif "ETH" in sym: price, vol = 3000, 0.03
        else: price, vol = 1.0, 0.001 # Stablecoins
        
        # Random walk
        returns = np.random.normal(0, vol, n_days)
        
        # Inject Election Shock (Nov 5 - Nov 10)
        election_idx = days.get_loc("2024-11-05")
        returns[election_idx:election_idx+5] += np.random.normal(0.05, 0.02, 5) # 5% shock
        
        price_series = price * np.cumprod(1 + returns)
        volume_series = np.random.lognormal(16, 1, n_days) # ~$10M daily volume base
        
        # Inject Volume Shock for Stablecoins on Election Day
        if "USD" in sym and "BTC" not in sym and "ETH" not in sym:
            volume_series[election_idx] *= 5.0
            
        for i, d in enumerate(days):
            market_rows.append({
                "Date": d,
                "Symbol": sym,
                "Close": price_series[i],
                "Volume": volume_series[i]
            })
            
    df_market_raw = pd.DataFrame(market_rows)
    
    # --- 3. Generate On-Chain Data (df_chain_raw) ---
    # We simulate ~500 transactions per day for efficiency in this example
    # (Real data would be millions)
    chain_rows = []
    
    usdt_addr = "0xdac17f958d2ee523a2206206994597c13d831ec7"
    usdc_addr = "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
    
    for i, d in enumerate(days):
        # Base transaction count
        n_tx = 500
        
        # Inject Human Signal (EOA-EOA) Surge on Nov 3
        is_human_signal = (d.date() == pd.Timestamp("2024-11-03").date())
        if is_human_signal:
            n_tx = 2000 # Surge
            
        for _ in range(n_tx):
            # Randomize Token
            token = usdt_addr if np.random.random() > 0.5 else usdc_addr
            
            # Randomize Topology
            # Normal: 70% Human, 30% Algo
            # Signal Day: 95% Human
            if is_human_signal:
                is_human = np.random.random() < 0.95
            else:
                is_human = np.random.random() < 0.70
                
            if is_human:
                f_contract, t_contract = 0, 0
            else:
                f_contract, t_contract = 1, 1
                
            # Value (6 decimals)
            val_usd = np.random.exponential(1000) # Avg $1000
            val_int = int(val_usd * 1e6)
            
            # Timestamp (random time within the day)
            ts_offset = np.random.randint(0, 86400)
            ts = int(d.timestamp()) + ts_offset
            
            chain_rows.append({
                "timeStamp": ts,
                "tokenAddress": token,
                "from": f"0xsender{i}",
                "to": f"0xreceiver{i}",
                "value": str(val_int), # String as per schema
                "fromIsContract": f_contract,
                "toIsContract": t_contract,
                "transactionHash": f"0xhash{len(chain_rows)}",
                "blockNumber": 1000000 + i,
                "logIndex": len(chain_rows) % 100,
                "txStatus": 0, # Success
                "isInternal": 0
            })
            
    df_chain_raw = pd.DataFrame(chain_rows)
    
    print(f"Generated {len(df_market_raw)} market rows and {len(df_chain_raw)} chain rows.")
    return df_chain_raw, df_market_raw

# Generate the data
df_chain_raw, df_market_raw = generate_synthetic_data()

# ------------------------------------------------------------------------------
# Step 2: Load Configuration
# ------------------------------------------------------------------------------
# We define the YAML content directly here for the example, mimicking file read.
yaml_content = """
meta:
  study_title: "Early-Warning Signals of Political Risk in Stablecoin Markets"
  authors: ["Mukhia", "Sharma", "Luwang", "Nurujjaman", "Hens", "Saha", "Chakraborty"]
  observation_window:
    start_date: "2024-03-01"
    end_date: "2025-02-28"
    timezone: "UTC"
  critical_events:
    election_day: "2024-11-05"
    human_signal_date: "2024-11-03"
  frequency: "1D"

schemas:
  chain_raw:
    columns:
      timeStamp: "int64"
      tokenAddress: "string"
      from: "string"
      to: "string"
      value: "object"
      fromIsContract: "int8"
      toIsContract: "int8"
      transactionHash: "string"
      blockNumber: "int64"
      logIndex: "int64"
      txStatus: "int8"
      isInternal: "int8"
    assets:
      USDT: {address: "0xdac17f958d2ee523a2206206994597c13d831ec7", decimals: 6}
      USDC: {address: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", decimals: 6}
    topology_filters:
      human_signal: {fromIsContract: 0, toIsContract: 0}
      algo_signal: {fromIsContract: 1, toIsContract: 1}

  market_raw:
    columns:
      Date: "datetime64[ns]"
      Symbol: "string"
      Close: "float64"
      Volume: "float64"
    symbols: ["BTC/USD", "ETH/USD", "USDT/USD", "USDC/USD"]

preprocessing:
  normalization_equation: "value / (10 ** decimals)"
  handling:
    drop_zero_value: true
    drop_failed_tx: true
    tx_status_column: "txStatus"
    accepted_status: [0]
  transformations:
    logarithm: true
    differencing: "conditional"
  stationarity_test:
    method: "Augmented Dickey-Fuller"
    regression: "ct"
    autolag: "AIC"
    significance_level: 0.05
    critical_values_source: "MacKinnon (1996)"

bai_perron:
  model_type: "mean_shift"
  algorithm: "dynamic_programming"
  parameters:
    max_breaks: 5
    trimming_epsilon: 0.15
    significance_test: "SupF"
    alpha: 0.05
  robustness:
    method: "AAFT Surrogates"
    iterations: 100 # Reduced from 1000 for example speed
    null_hypothesis: "Linear Gaussian Process"

hht:
  target_variable: "Close"
  emd_parameters:
    spline_type: "cubic"
    sifting_stop_sd: 0.2
    max_imfs: 10
  spectrum_parameters:
    type: "Hilbert"
    energy_norm: "max_energy"
  extreme_event_detection:
    formula: "mean + B * sigma"
    B: 4

svar:
  target_variables: ["USDC_log_diff", "USDT_log_diff"]
  model_selection:
    criterion: "AIC"
    max_lags: 12
  identification:
    method: "Cholesky"
    orderings:
      - ["USDC", "USDT"]
      - ["USDT", "USDC"]
  regime_analysis:
    break_date: "2024-11-05"
    test: "Wald Statistic"
    hypothesis: "one_sided"

analysis_targets:
  structural_break_series:
    blockchain:
      - "log V_EOA_EOA_USDT"
      - "log V_EOA_EOA_USDC"
      - "log V_SC_SC_USDT"
      - "log V_SC_SC_USDC"
    exchange:
      - "log Volume_USDT_USD"
      - "log Volume_USDC_USD"
    prices:
      - "log Close_BTC_USD"
      - "log Close_ETH_USD"
  hht_series:
    - "log Close_BTC_USD"
    - "log Close_ETH_USD"
  svar_series:
    blockchain_EOA:
      - "Δ log V_EOA_EOA_USDC"
      - "Δ log V_EOA_EOA_USDT"
    exchange:
      - "Δ log Volume_USDC_USD"
      - "Δ log Volume_USDT_USD"
"""

# Parse the YAML string into a Python dictionary
study_config = yaml.safe_load(yaml_content)
print("Configuration loaded successfully.")

# ------------------------------------------------------------------------------
# Step 3: Execute Pipeline
# ------------------------------------------------------------------------------
# We invoke the top-level orchestrator with our prepared data and config.
# This will trigger the full sequence of 32 tasks.

try:
    pipeline_results = run_obps_pipeline(
        df_chain_raw=df_chain_raw,
        df_market_raw=df_market_raw,
        study_config=study_config
    )
    print("\nPipeline execution successful!")
    
except Exception as e:
    print(f"\nPipeline execution failed: {e}")
    # In a real notebook, we would raise e here to see the traceback
    # raise e

# ------------------------------------------------------------------------------
# Step 4: Inspect Results
# ------------------------------------------------------------------------------
# We can now access the rich structured data returned by the pipeline.

if 'pipeline_results' in locals():
    res = pipeline_results
    
    print("\n--- Final Report Summary ---")
    print(f"Conclusion: {res.final_report.overall_conclusion}")
    
    print("\n--- Structural Break Detection (Human Signal) ---")
    # Accessing the result for USDT EOA-EOA
    usdt_breaks = res.structural_breaks.results.get("log_V_EOA_EOA_USDT")
    if usdt_breaks:
        dates = [d.date() for d in usdt_breaks.break_dates]
        sup_f = usdt_breaks.sup_f_stat
        print(f"USDT EOA-EOA Breaks: {dates}")
        print(f"SupF Statistic: {sup_f:.2f}")
        
    print("\n--- SVAR Regime Shift Analysis ---")
    wald = res.wald_test
    print(f"Wald Statistic: {wald.wald_statistic:.2f}")
    print(f"p-value: {wald.p_value:.4e}")
    print(f"Significant Shift: {wald.is_significant}")
    
    print("\n--- HHT Extreme Events ---")
    btc_events = res.hht_analysis.results.get("log_Close_BTC_USD", {}).get("events")
    if btc_events:
        dates = [d.date() for d in btc_events.extreme_event_dates]
        print(f"BTC Extreme Events: {dates}")
```

Note: The Usage Example assumes that the user has imported all the relevant Python modules and callables into his/her/their Python enviroment.
<br>

In [None]:
# Task 1 — Validate `STUDY_CONFIG` Parameter Integrity

# ==============================================================================
# Task 1: Validate and parse the study configuration dictionary
# ==============================================================================

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 1:  Load the `study_parameters` dictionary and verify structural completeness.
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass(frozen=True)
class ValidatedMetaConfig:
    """
    Immutable container for validated metadata and temporal parameters.

    Attributes:
        start_date (datetime): The start of the observation window (UTC).
        end_date (datetime): The end of the observation window (UTC).
        election_day (datetime): The date of the U.S. election (UTC).
        human_signal_date (datetime): The date of the structural break in human activity (UTC).
        timezone_str (str): The timezone string, strictly 'UTC'.
    """
    start_date: datetime
    end_date: datetime
    election_day: datetime
    human_signal_date: datetime
    timezone_str: str

def validate_meta_config(meta_config: Dict[str, Any]) -> ValidatedMetaConfig:
    """
    Validates the 'meta' section of the study configuration.

    This function parses date strings, enforces UTC timezone, and verifies
    temporal ordering constraints (start < end, events within window).

    Args:
        meta_config (Dict[str, Any]): The 'meta' dictionary from STUDY_CONFIG.

    Returns:
        ValidatedMetaConfig: A structured object containing parsed dates.

    Raises:
        ValueError: If dates are malformed, logically invalid, or timezone is not UTC.
        KeyError: If required keys are missing.
    """
    # Define required keys for the observation window
    # Check for existence of 'observation_window' key
    if "observation_window" not in meta_config:
        raise KeyError("Missing 'observation_window' in meta config.")

    obs_window = meta_config["observation_window"]

    # Validate Timezone - Strict adherence to UTC is required for replication
    if obs_window.get("timezone") != "UTC":
        raise ValueError(f"Timezone must be strictly 'UTC', found: {obs_window.get('timezone')}")

    # Helper to parse dates strictly as YYYY-MM-DD
    def parse_date(date_str: str, field_name: str) -> datetime:
        try:
            # Parse string to datetime
            dt = datetime.strptime(date_str, "%Y-%m-%d")
            # Set timezone to UTC explicitly
            return dt.replace(tzinfo=timezone.utc)
        except ValueError:
            raise ValueError(f"Invalid date format for '{field_name}': {date_str}. Expected YYYY-MM-DD.")

    # Parse start and end dates
    start_date = parse_date(obs_window["start_date"], "start_date")
    end_date = parse_date(obs_window["end_date"], "end_date")

    # Enforce temporal ordering: start must be strictly before end
    if start_date >= end_date:
        raise ValueError(f"start_date ({start_date.date()}) must be strictly before end_date ({end_date.date()}).")

    # Validate Critical Events
    if "critical_events" not in meta_config:
        raise KeyError("Missing 'critical_events' in meta config.")

    events = meta_config["critical_events"]
    election_day = parse_date(events["election_day"], "election_day")
    human_signal_date = parse_date(events["human_signal_date"], "human_signal_date")

    # Enforce inclusion: events must be within [start_date, end_date]
    # Equation: start_date <= event_date <= end_date
    if not (start_date <= election_day <= end_date):
        raise ValueError(f"election_day ({election_day.date()}) is outside the observation window.")

    if not (start_date <= human_signal_date <= end_date):
        raise ValueError(f"human_signal_date ({human_signal_date.date()}) is outside the observation window.")

    # Return validated dataclass
    return ValidatedMetaConfig(
        start_date=start_date,
        end_date=end_date,
        election_day=election_day,
        human_signal_date=human_signal_date,
        timezone_str="UTC"
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 2: Validate numerical parameter ranges and types.
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass(frozen=True)
class ValidatedSchemas:
    """
    Immutable container for validated schema definitions.

    Attributes:
        chain_columns (Set[str]): Set of required columns for on-chain data.
        market_columns (Set[str]): Set of required columns for market data.
        market_symbols (List[str]): List of required market symbols.
        assets (Dict[str, Any]): Validated asset definitions (USDT/USDC).
    """
    chain_columns: Set[str]
    market_columns: Set[str]
    market_symbols: List[str]
    assets: Dict[str, Any]

def validate_schemas(schemas_config: Dict[str, Any]) -> ValidatedSchemas:
    """
    Validates the 'schemas' section of the study configuration.

    Verifies that all required columns, assets, and symbols are present and
    conform to the specified formats (e.g., Ethereum address regex).

    Args:
        schemas_config (Dict[str, Any]): The 'schemas' dictionary from STUDY_CONFIG.

    Returns:
        ValidatedSchemas: A structured object containing validated schema sets.

    Raises:
        ValueError: If required columns/assets are missing or malformed.
        KeyError: If major schema sections are missing.
    """
    # 1. Validate Chain Raw Schema
    if "chain_raw" not in schemas_config:
        raise KeyError("Missing 'chain_raw' in schemas config.")

    chain_config = schemas_config["chain_raw"]

    # Required columns for on-chain ledger
    required_chain_cols = {
        "timeStamp", "tokenAddress", "from", "to", "value",
        "fromIsContract", "toIsContract", "transactionHash",
        "blockNumber", "logIndex", "txStatus"
    }

    # Check if configured columns are a superset of required columns
    configured_chain_cols = set(chain_config.get("columns", {}).keys())
    missing_chain_cols = required_chain_cols - configured_chain_cols
    if missing_chain_cols:
        raise ValueError(f"Missing required chain columns: {missing_chain_cols}")

    # Validate Assets (USDT, USDC)
    assets = chain_config.get("assets", {})
    required_assets = {"USDT", "USDC"}
    if not required_assets.issubset(assets.keys()):
        raise ValueError(f"Missing required assets. Found: {list(assets.keys())}, Required: {required_assets}")

    # Regex for Ethereum Address: 0x followed by 40 hex chars
    eth_address_pattern = re.compile(r"^0x[0-9a-fA-F]{40}$")

    for asset_name in required_assets:
        asset_def = assets[asset_name]
        # Check address format
        addr = asset_def.get("address", "")
        if not eth_address_pattern.match(addr):
            raise ValueError(f"Invalid Ethereum address for {asset_name}: {addr}")

        # Check decimals (Must be 6 for USDT/USDC per ERC-20 spec)
        decimals = asset_def.get("decimals")
        if decimals != 6:
            raise ValueError(f"Invalid decimals for {asset_name}: {decimals}. Expected 6.")

    # 2. Validate Market Raw Schema
    if "market_raw" not in schemas_config:
        raise KeyError("Missing 'market_raw' in schemas config.")

    market_config = schemas_config["market_raw"]

    # Required columns for market data
    required_market_cols = {"Date", "Symbol", "Close", "Volume"}
    configured_market_cols = set(market_config.get("columns", {}).keys())
    missing_market_cols = required_market_cols - configured_market_cols
    if missing_market_cols:
        raise ValueError(f"Missing required market columns: {missing_market_cols}")

    # Required symbols
    required_symbols = {"BTC/USD", "ETH/USD", "USDT/USD", "USDC/USD"}
    configured_symbols = set(market_config.get("symbols", []))
    missing_symbols = required_symbols - configured_symbols
    if missing_symbols:
        raise ValueError(f"Missing required market symbols: {missing_symbols}")

    return ValidatedSchemas(
        chain_columns=configured_chain_cols,
        market_columns=configured_market_cols,
        market_symbols=market_config["symbols"],
        assets=assets
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Step 3: Validate string-based model identifiers and create a configuration snapshot.
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass(frozen=True)
class ValidatedAnalysisConfig:
    """
    Immutable container for validated analysis parameters (Bai-Perron, HHT, SVAR).

    Attributes:
        bp_max_breaks (int): Maximum number of structural breaks.
        bp_trimming (float): Trimming parameter epsilon.
        hht_threshold_b (float): Extreme event threshold multiplier.
        hht_max_imfs (int): Maximum number of IMFs.
        svar_max_lags (int): Maximum lag order for VAR.
        svar_break_date (datetime): Date of regime shift for SVAR.
    """
    bp_max_breaks: int
    bp_trimming: float
    hht_threshold_b: float
    hht_max_imfs: int
    svar_max_lags: int
    svar_break_date: datetime

def validate_analysis_config(config: Dict[str, Any], obs_window: ValidatedMetaConfig) -> ValidatedAnalysisConfig:
    """
    Validates the analysis-specific configurations (Bai-Perron, HHT, SVAR).

    Enforces mathematical constraints on model parameters (e.g., trimming epsilon
    must be < 0.5) and verifies that analysis dates align with the observation window.

    Args:
        config (Dict[str, Any]): The full STUDY_CONFIG dictionary.
        obs_window (ValidatedMetaConfig): The validated metadata (for date checking).

    Returns:
        ValidatedAnalysisConfig: A structured object with validated parameters.

    Raises:
        ValueError: If parameters violate mathematical or logical constraints.
    """
    # 1. Validate Bai-Perron
    bp_config = config.get("bai_perron", {})

    # Model type check
    if bp_config.get("model_type") != "mean_shift":
        raise ValueError(f"Bai-Perron model_type must be 'mean_shift', found: {bp_config.get('model_type')}")

    bp_params = bp_config.get("parameters", {})
    max_breaks = bp_params.get("max_breaks")
    trimming = bp_params.get("trimming_epsilon")

    # Constraint: max_breaks >= 1
    if not isinstance(max_breaks, int) or max_breaks < 1:
        raise ValueError(f"Bai-Perron max_breaks must be a positive integer. Found: {max_breaks}")

    # Constraint: 0 < trimming < 0.5
    if not isinstance(trimming, float) or not (0 < trimming < 0.5):
        raise ValueError(f"Bai-Perron trimming_epsilon must be in (0, 0.5). Found: {trimming}")

    # 2. Validate HHT
    hht_config = config.get("hht", {})
    ee_config = hht_config.get("extreme_event_detection", {})
    emd_config = hht_config.get("emd_parameters", {})

    # Constraint: B = 4 (per study)
    b_val = ee_config.get("B")
    if b_val != 4:
        raise ValueError(f"HHT extreme event threshold B must be 4. Found: {b_val}")

    # Constraint: max_imfs > 0
    max_imfs = emd_config.get("max_imfs")
    if not isinstance(max_imfs, int) or max_imfs < 1:
        raise ValueError(f"HHT max_imfs must be a positive integer. Found: {max_imfs}")

    # 3. Validate SVAR
    svar_config = config.get("svar", {})
    model_sel = svar_config.get("model_selection", {})
    regime_an = svar_config.get("regime_analysis", {})

    # Constraint: max_lags > 0
    max_lags = model_sel.get("max_lags")
    if not isinstance(max_lags, int) or max_lags < 1:
        raise ValueError(f"SVAR max_lags must be a positive integer. Found: {max_lags}")

    # Validate SVAR break date
    break_date_str = regime_an.get("break_date")
    try:
        svar_break_date = datetime.strptime(break_date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
    except ValueError:
        raise ValueError(f"Invalid SVAR break_date format: {break_date_str}")

    # Constraint: SVAR break date must be within observation window
    if not (obs_window.start_date <= svar_break_date <= obs_window.end_date):
        raise ValueError(f"SVAR break_date ({svar_break_date.date()}) is outside observation window.")

    return ValidatedAnalysisConfig(
        bp_max_breaks=max_breaks,
        bp_trimming=trimming,
        hht_threshold_b=float(b_val),
        hht_max_imfs=max_imfs,
        svar_max_lags=max_lags,
        svar_break_date=svar_break_date
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

@dataclass(frozen=True)
class ValidatedStudyConfig:
    """
    Master container for the fully validated study configuration.

    Attributes:
        meta (ValidatedMetaConfig): Metadata and temporal scope.
        schemas (ValidatedSchemas): Data structure definitions.
        analysis (ValidatedAnalysisConfig): Model parameters.
    """
    meta: ValidatedMetaConfig
    schemas: ValidatedSchemas
    analysis: ValidatedAnalysisConfig

def validate_study_config(config: Dict[str, Any]) -> ValidatedStudyConfig:
    """
    Orchestrates the validation of the entire study configuration dictionary.

    This function invokes specific validators for metadata, schemas, and analysis
    parameters, aggregating the results into a single immutable configuration object.
    It ensures that all parameters required for the OBPS pipeline are present,
    correctly typed, and mathematically consistent.

    Args:
        config (Dict[str, Any]): The raw STUDY_CONFIG dictionary.

    Returns:
        ValidatedStudyConfig: The fully validated configuration object.

    Raises:
        ValueError: If any parameter violates constraints.
        KeyError: If required sections are missing.
    """
    # Step 1: Validate Metadata (Dates, Timezone)
    # This establishes the temporal ground truth for the study.
    validated_meta = validate_meta_config(config.get("meta", {}))

    # Step 2: Validate Schemas (Columns, Assets)
    # This ensures input dataframes will match expected structure.
    validated_schemas = validate_schemas(config.get("schemas", {}))

    # Step 3: Validate Analysis Parameters (Models)
    # This ensures mathematical models are configured correctly and consistent with metadata.
    validated_analysis = validate_analysis_config(config, validated_meta)

    return ValidatedStudyConfig(
        meta=validated_meta,
        schemas=validated_schemas,
        analysis=validated_analysis
    )


In [None]:
# Task 2 — Validate df_chain_raw Structure and Integrity

# ==============================================================================
# Task 2: Validate df_chain_raw Structure and Integrity
# ==============================================================================

@dataclass
class ChainValidationResult:
    """
    Container for the results of the on-chain data validation process.

    Attributes:
        validated_df (pd.DataFrame): The DataFrame after type coercion and validation.
        row_count (int): Total number of rows.
        unique_count (int): Number of unique records based on the composite key.
        duplicate_count (int): Number of duplicate records found.
        distinct_tokens (int): Number of unique token addresses found.
        date_range_start (pd.Timestamp): Earliest timestamp in the data.
        date_range_end (pd.Timestamp): Latest timestamp in the data.
    """
    validated_df: pd.DataFrame
    row_count: int
    unique_count: int
    duplicate_count: int
    distinct_tokens: int
    date_range_start: pd.Timestamp
    date_range_end: pd.Timestamp

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 1: Validate Column Presence and Data Types
# -------------------------------------------------------------------------------------------------------------------------------

def validate_chain_columns(df: pd.DataFrame, required_columns: Set[str]) -> pd.DataFrame:
    """
    Validates that the DataFrame contains all required columns and enforces data types.

    This function checks for the existence of columns defined in the schema.
    It attempts to cast columns to their expected types (e.g., int64, int8, string).
    It raises an error if columns are missing or if type coercion fails for critical fields.

    Args:
        df (pd.DataFrame): The raw on-chain DataFrame.
        required_columns (Set[str]): A set of column names required by the schema.

    Returns:
        pd.DataFrame: The DataFrame with validated and coerced data types.

    Raises:
        ValueError: If required columns are missing or type coercion fails.
    """
    # 1. Check for missing columns
    # Set difference: required - existing
    missing_cols = required_columns - set(df.columns)
    if missing_cols:
        raise ValueError(f"df_chain_raw is missing required columns: {missing_cols}")

    # 2. Define expected types mapping
    # We map schema types to pandas/numpy dtypes for strict enforcement.
    # Note: 'value' is kept as object (string) to preserve uint256 precision.
    dtype_map = {
        "timeStamp": "int64",
        "blockNumber": "int64",
        "logIndex": "int64",
        "fromIsContract": "int8",
        "toIsContract": "int8",
        "txStatus": "int8",
        # String/Object columns
        "tokenAddress": "string",
        "from": "string",
        "to": "string",
        "transactionHash": "string",
        "value": "object"
    }

    # 3. Enforce Data Types
    # We iterate through the map. If the column exists (it should), we cast it.
    # We work on a copy to avoid SettingWithCopy warnings on the original input if it's a slice.
    df_out = df.copy()

    for col, dtype in dtype_map.items():
        if col in df_out.columns:
            try:
                # Special handling for boolean-like integers or actual booleans
                if dtype == "int8":
                    # If it's boolean, convert to int first (True->1, False->0)
                    if pd.api.types.is_bool_dtype(df_out[col]):
                        df_out[col] = df_out[col].astype(int)

                    # Check for nulls before casting to integer, as int8 doesn't support NaN
                    if df_out[col].isnull().any():
                        # If nulls exist in flags, we cannot proceed safely for this strict schema
                        raise ValueError(f"Column '{col}' contains null values, which are not allowed for int8 flags.")

                    df_out[col] = df_out[col].astype("int8")

                elif dtype == "int64":
                    if df_out[col].isnull().any():
                        raise ValueError(f"Column '{col}' contains null values, which are not allowed for int64 fields.")
                    df_out[col] = df_out[col].astype("int64")

                elif dtype == "string":
                    df_out[col] = df_out[col].astype("string")

                # 'value' is left as object/string, but we verify it's not all null
                elif col == "value":
                     if df_out[col].isnull().all():
                         raise ValueError("Column 'value' is entirely null.")

            except Exception as e:
                raise ValueError(f"Failed to cast column '{col}' to {dtype}: {str(e)}")

    # 4. Check for entirely null columns (redundant check but good for safety)
    for col in required_columns:
        if df_out[col].isnull().all():
            raise ValueError(f"Column '{col}' is entirely null. Data quality insufficient.")

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 2: Validate Value Domains
# -------------------------------------------------------------------------------------------------------------------------------

def validate_chain_domains(df: pd.DataFrame) -> None:
    """
    Validates the value domains of specific columns in the on-chain DataFrame.

    Checks:
    - timeStamp: Must be positive integers.
    - tokenAddress: Must match Ethereum address regex (0x + 40 hex chars).
    - fromIsContract/toIsContract: Must be {0, 1}.
    - txStatus: Must be {0, 1}.

    Args:
        df (pd.DataFrame): The DataFrame validated in Step 1.

    Raises:
        ValueError: If any value domain constraint is violated.
    """
    # 1. Validate timeStamp
    # Must be strictly positive
    if (df["timeStamp"] <= 0).any():
        invalid_count = (df["timeStamp"] <= 0).sum()
        raise ValueError(f"Found {invalid_count} rows with timeStamp <= 0.")

    # 2. Validate tokenAddress format
    # Regex: Starts with 0x, followed by exactly 40 hex characters (case-insensitive)
    eth_address_pattern = r"^0x[0-9a-fA-F]{40}$"

    # We use vectorized string matching. 'na=False' treats NaNs as non-matches (though we checked for NaNs earlier).
    valid_addresses = df["tokenAddress"].str.match(eth_address_pattern, na=False)
    if not valid_addresses.all():
        invalid_count = (~valid_addresses).sum()
        # Show a few examples
        examples = df.loc[~valid_addresses, "tokenAddress"].head(3).tolist()
        raise ValueError(f"Found {invalid_count} invalid tokenAddress values. Examples: {examples}")

    # 3. Validate Binary Flags (fromIsContract, toIsContract)
    # Allowed values: {0, 1}
    for col in ["fromIsContract", "toIsContract"]:
        # We use isin. Since we cast to int8, we check against integers.
        if not df[col].isin([0, 1]).all():
            unique_vals = df[col].unique()
            raise ValueError(f"Column '{col}' contains invalid values: {unique_vals}. Expected {{0, 1}}.")

    # 4. Validate txStatus
    # Allowed values: {0, 1}
    if not df["txStatus"].isin([0, 1]).all():
        unique_vals = df["txStatus"].unique()
        raise ValueError(f"Column 'txStatus' contains invalid values: {unique_vals}. Expected {{0, 1}}.")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Step 3: Validate Uniqueness and Record Count
# -------------------------------------------------------------------------------------------------------------------------------

def validate_chain_uniqueness(df: pd.DataFrame) -> Tuple[int, int, int]:
    """
    Checks the uniqueness of the composite key (transactionHash, blockNumber, logIndex).

    Calculates total rows, unique keys, and duplicate counts.
    Logs warnings if row counts are suspiciously low.

    Args:
        df (pd.DataFrame): The DataFrame validated in Step 2.

    Returns:
        Tuple[int, int, int]: (row_count, unique_count, duplicate_count)
    """
    # 1. Define Composite Key
    key_cols = ["transactionHash", "blockNumber", "logIndex"]

    # 2. Calculate Counts
    row_count = len(df)

    # Count unique tuples.
    # drop_duplicates() returns a dataframe with unique rows based on subset.
    unique_count = len(df.drop_duplicates(subset=key_cols))

    duplicate_count = row_count - unique_count

    # 3. Heuristic Checks (Warnings)
    # If row count is very low (< 1000), it might indicate a test sample, not full data.
    # We won't raise an error, but we note it.
    if row_count < 1000:
        print(f"WARNING: Row count is low ({row_count}). Ensure this is the full dataset.")

    return row_count, unique_count, duplicate_count

# -------------------------------------------------------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_df_chain_raw(df_raw: pd.DataFrame, schemas: Any) -> ChainValidationResult:
    """
    Orchestrates the validation of the raw on-chain DataFrame.

    Executes:
    1. Column presence and type validation.
    2. Value domain validation (regex, ranges, binary flags).
    3. Uniqueness checks on the composite primary key.
    4. Summary statistics calculation.

    Args:
        df_raw (pd.DataFrame): The raw input DataFrame.
        schemas (ValidatedSchemas): The schema configuration object from Task 1.

    Returns:
        ChainValidationResult: A dataclass containing the validated DataFrame and metadata.

    Raises:
        ValueError: If any validation step fails.
    """
    # Step 1: Column and Type Validation
    # We use the set of columns defined in the ValidatedSchemas object
    print("Task 2: Validating columns and types...")
    df_validated = validate_chain_columns(df_raw, schemas.chain_columns)

    # Step 2: Domain Validation
    print("Task 2: Validating value domains...")
    validate_chain_domains(df_validated)

    # Step 3: Uniqueness and Stats
    print("Task 2: Checking uniqueness...")
    row_count, unique_count, duplicate_count = validate_chain_uniqueness(df_validated)

    # Calculate additional stats for the result object
    distinct_tokens = df_validated["tokenAddress"].nunique()

    # Convert min/max timestamps to datetime for reporting
    min_ts = df_validated["timeStamp"].min()
    max_ts = df_validated["timeStamp"].max()
    date_start = pd.to_datetime(min_ts, unit='s', utc=True)
    date_end = pd.to_datetime(max_ts, unit='s', utc=True)

    print(f"Task 2 Complete. Rows: {row_count}, Unique: {unique_count}, Duplicates: {duplicate_count}")
    print(f"Date Range: {date_start.date()} to {date_end.date()}")

    return ChainValidationResult(
        validated_df=df_validated,
        row_count=row_count,
        unique_count=unique_count,
        duplicate_count=duplicate_count,
        distinct_tokens=distinct_tokens,
        date_range_start=date_start,
        date_range_end=date_end
    )


In [None]:
# Task 3 — Validate df_market_raw Structure and Coverage

# ==============================================================================
# Task 3: Validate df_market_raw Structure and Coverage
# ==============================================================================

@dataclass
class MarketValidationResult:
    """
    Container for the results of the market data validation process.

    Attributes:
        validated_df (pd.DataFrame): The DataFrame after type coercion and validation.
        symbol_counts (Dict[str, int]): Number of rows per symbol.
        missing_dates (Dict[str, List[pd.Timestamp]]): List of missing dates per symbol within the window.
        date_ranges (Dict[str, Tuple[pd.Timestamp, pd.Timestamp]]): Min and max dates per symbol.
        total_rows (int): Total number of rows in the dataset.
    """
    validated_df: pd.DataFrame
    symbol_counts: Dict[str, int]
    missing_dates: Dict[str, List[pd.Timestamp]]
    date_ranges: Dict[str, Tuple[pd.Timestamp, pd.Timestamp]]
    total_rows: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 1: Validate Column Presence and Data Types
# -------------------------------------------------------------------------------------------------------------------------------

def validate_market_columns(df: pd.DataFrame, required_columns: Set[str]) -> pd.DataFrame:
    """
    Validates that the market DataFrame contains required columns and enforces strict data types.

    Enforces:
    - Date: datetime64[ns, UTC]
    - Symbol: string
    - Close: float64
    - Volume: float64

    Args:
        df (pd.DataFrame): The raw market DataFrame.
        required_columns (Set[str]): Set of required column names.

    Returns:
        pd.DataFrame: The validated DataFrame with coerced types.

    Raises:
        ValueError: If columns are missing or type coercion fails.
    """
    # 1. Check for missing columns
    missing_cols = required_columns - set(df.columns)
    if missing_cols:
        raise ValueError(f"df_market_raw is missing required columns: {missing_cols}")

    # Work on a copy
    df_out = df.copy()

    # 2. Enforce Data Types
    try:
        # Date: Force UTC
        # errors='raise' ensures we don't silently ignore bad dates
        df_out["Date"] = pd.to_datetime(df_out["Date"], utc=True, errors='raise')

        # Symbol: String
        df_out["Symbol"] = df_out["Symbol"].astype("string")

        # Close and Volume: Float64
        # pd.to_numeric handles strings like "100.50" correctly, raises on "abc"
        df_out["Close"] = pd.to_numeric(df_out["Close"], errors='raise').astype("float64")
        df_out["Volume"] = pd.to_numeric(df_out["Volume"], errors='raise').astype("float64")

    except Exception as e:
        raise ValueError(f"Type coercion failed for market data: {str(e)}")

    # 3. Check for Nulls in Critical Columns
    # We allow nulls in Close/Volume *only if* we plan to drop them later,
    # but for strict validation, we should flag them.
    # However, Date and Symbol must never be null.
    if df_out["Date"].isnull().any():
        raise ValueError("Column 'Date' contains null values.")
    if df_out["Symbol"].isnull().any():
        raise ValueError("Column 'Symbol' contains null values.")

    # Log warning for nulls in numeric columns (handled in cleansing task)
    null_close = df_out["Close"].isnull().sum()
    null_vol = df_out["Volume"].isnull().sum()
    if null_close > 0 or null_vol > 0:
        print(f"WARNING: Found nulls in market data - Close: {null_close}, Volume: {null_vol}")

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 2: Validate Symbol Coverage
# -------------------------------------------------------------------------------------------------------------------------------

def validate_symbol_coverage(df: pd.DataFrame, required_symbols: List[str]) -> Dict[str, int]:
    """
    Verifies that all required symbols are present and have sufficient data points.

    Args:
        df (pd.DataFrame): The validated DataFrame from Step 1.
        required_symbols (List[str]): List of expected symbols (e.g., BTC/USD).

    Returns:
        Dict[str, int]: A dictionary mapping symbols to their row counts.

    Raises:
        ValueError: If any required symbol is completely missing.
    """
    # Get unique symbols present in data
    present_symbols = set(df["Symbol"].unique())
    required_set = set(required_symbols)

    # Check for missing symbols
    missing_symbols = required_set - present_symbols
    if missing_symbols:
        raise ValueError(f"Missing required symbols in market data: {missing_symbols}")

    # Count rows per symbol
    symbol_counts = df["Symbol"].value_counts().to_dict()

    # Check for sufficiency (heuristic: ~300 rows for a year)
    for sym in required_symbols:
        count = symbol_counts.get(sym, 0)
        if count < 300:
            print(f"WARNING: Symbol '{sym}' has low row count ({count}). Expected ~365 for daily data.")

    return symbol_counts

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Step 3: Validate Temporal Coverage
# -------------------------------------------------------------------------------------------------------------------------------

def validate_temporal_coverage(
    df: pd.DataFrame,
    required_symbols: List[str],
    start_date: pd.Timestamp,
    end_date: pd.Timestamp
) -> Tuple[Dict[str, List[pd.Timestamp]], Dict[str, Tuple[pd.Timestamp, pd.Timestamp]]]:
    """
    Checks temporal coverage for each symbol against the observation window.

    Identifies missing dates within the [start_date, end_date] range.
    Computes the actual min/max dates for each symbol.

    Args:
        df (pd.DataFrame): The validated DataFrame.
        required_symbols (List[str]): List of symbols to check.
        start_date (pd.Timestamp): Start of observation window (UTC).
        end_date (pd.Timestamp): End of observation window (UTC).

    Returns:
        Tuple containing:
        - missing_dates: Dict mapping symbol to list of missing timestamps.
        - date_ranges: Dict mapping symbol to (min_date, max_date) tuple.
    """
    # Generate expected daily date range (inclusive)
    # Normalize start/end to midnight UTC for comparison if they aren't already
    expected_range = pd.date_range(start=start_date, end=end_date, freq='D', tz='UTC').normalize()
    expected_set = set(expected_range)

    missing_dates = {}
    date_ranges = {}

    for sym in required_symbols:
        # Filter data for this symbol
        sym_df = df[df["Symbol"] == sym]

        # Get actual dates present (normalized to midnight for set comparison)
        actual_dates = pd.to_datetime(sym_df["Date"]).dt.normalize()
        actual_set = set(actual_dates)

        # Find missing dates
        missing = sorted(list(expected_set - actual_set))
        missing_dates[sym] = missing

        # Record actual range
        if not actual_dates.empty:
            min_date = actual_dates.min()
            max_date = actual_dates.max()
            date_ranges[sym] = (min_date, max_date)

            # Check bounds
            if min_date > start_date:
                print(f"WARNING: {sym} starts late ({min_date.date()} > {start_date.date()})")
            if max_date < end_date:
                print(f"WARNING: {sym} ends early ({max_date.date()} < {end_date.date()})")
        else:
            date_ranges[sym] = (pd.NaT, pd.NaT)

    return missing_dates, date_ranges

# -------------------------------------------------------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_df_market_raw(df_raw: pd.DataFrame, schemas: Any, meta_config: Any) -> MarketValidationResult:
    """
    Orchestrates the validation of the raw market DataFrame.

    Executes:
    1. Column presence and type enforcement.
    2. Symbol coverage verification.
    3. Temporal coverage and missing date identification.

    Args:
        df_raw (pd.DataFrame): The raw market DataFrame.
        schemas (ValidatedSchemas): Schema configuration from Task 1.
        meta_config (ValidatedMetaConfig): Metadata configuration from Task 1.

    Returns:
        MarketValidationResult: Validated data and coverage statistics.

    Raises:
        ValueError: If validation fails.
    """
    print("Task 3: Validating market columns and types...")
    # Step 1: Column/Type Validation
    df_validated = validate_market_columns(df_raw, schemas.market_columns)

    print("Task 3: Validating symbol coverage...")
    # Step 2: Symbol Coverage
    symbol_counts = validate_symbol_coverage(df_validated, schemas.market_symbols)

    print("Task 3: Validating temporal coverage...")
    # Step 3: Temporal Coverage
    # Ensure start/end dates are Timestamp objects (Task 1 returns datetime, pandas handles conversion)
    start_ts = pd.Timestamp(meta_config.start_date)
    end_ts = pd.Timestamp(meta_config.end_date)

    missing_dates, date_ranges = validate_temporal_coverage(
        df_validated,
        schemas.market_symbols,
        start_ts,
        end_ts
    )

    total_rows = len(df_validated)
    print(f"Task 3 Complete. Total Rows: {total_rows}")
    for sym, count in symbol_counts.items():
        missing_count = len(missing_dates[sym])
        print(f"  - {sym}: {count} rows, {missing_count} missing dates in window.")

    return MarketValidationResult(
        validated_df=df_validated,
        symbol_counts=symbol_counts,
        missing_dates=missing_dates,
        date_ranges=date_ranges,
        total_rows=total_rows
    )


In [None]:
# Task 4 — Filter df_chain_raw by Time Window and Transaction Status

# ==============================================================================
# Task 4: Filter df_chain_raw by Time Window and Transaction Status
# ==============================================================================

@dataclass
class FilteredChainData:
    """
    Container for the results of the on-chain data filtering process.

    Attributes:
        filtered_df (pd.DataFrame): The DataFrame after temporal and status filtering.
        initial_rows (int): Row count before any filtering.
        rows_after_time_filter (int): Row count after applying the observation window.
        rows_after_status_filter (int): Row count after removing failed transactions.
        dropped_time_count (int): Number of rows dropped due to time window.
        dropped_status_count (int): Number of rows dropped due to transaction status.
    """
    filtered_df: pd.DataFrame
    initial_rows: int
    rows_after_time_filter: int
    rows_after_status_filter: int
    dropped_time_count: int
    dropped_status_count: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 1: Convert Timestamps to UTC Dates
# -------------------------------------------------------------------------------------------------------------------------------

def convert_timestamp_to_utc_date(df: pd.DataFrame) -> pd.DataFrame:
    """
    Converts the Unix epoch 'timeStamp' column to a normalized UTC datetime column 'txDate'.

    The resulting 'txDate' column will have a dtype of datetime64[ns, UTC] with
    the time component normalized to midnight (00:00:00).

    Args:
        df (pd.DataFrame): The validated on-chain DataFrame.

    Returns:
        pd.DataFrame: A copy of the DataFrame with the new 'txDate' column.
    """
    # Work on a copy to avoid side effects
    df_out = df.copy()

    # Convert int64 timestamp to UTC datetime
    # unit='s' assumes standard Unix epoch seconds
    df_out["txDate"] = pd.to_datetime(df_out["timeStamp"], unit='s', utc=True)

    # Normalize to midnight to facilitate daily grouping later
    # This keeps the dtype as datetime64[ns, UTC]
    df_out["txDate"] = df_out["txDate"].dt.normalize()

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 2: Apply Temporal Filter
# -------------------------------------------------------------------------------------------------------------------------------

def apply_temporal_filter(
    df: pd.DataFrame,
    start_date: pd.Timestamp,
    end_date: pd.Timestamp
) -> pd.DataFrame:
    """
    Filters the DataFrame to retain only rows within the observation window [start_date, end_date].

    Args:
        df (pd.DataFrame): The DataFrame containing the 'txDate' column.
        start_date (pd.Timestamp): The start of the observation window (UTC).
        end_date (pd.Timestamp): The end of the observation window (UTC).

    Returns:
        pd.DataFrame: A subset of the input DataFrame.
    """
    # Ensure start/end are normalized to midnight for accurate comparison
    start_norm = start_date.normalize()
    end_norm = end_date.normalize()

    # Create boolean mask
    # Inclusive boundaries: start_date <= txDate <= end_date
    mask = (df["txDate"] >= start_norm) & (df["txDate"] <= end_norm)

    # Return filtered copy
    return df.loc[mask].copy()

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Step 3: Remove Failed Transactions
# -------------------------------------------------------------------------------------------------------------------------------

def filter_transaction_status(
    df: pd.DataFrame,
    status_col: str,
    accepted_codes: List[int]
) -> pd.DataFrame:
    """
    Filters the DataFrame to retain only successful transactions.

    Args:
        df (pd.DataFrame): The DataFrame to filter.
        status_col (str): The name of the status column (e.g., 'txStatus').
        accepted_codes (List[int]): List of status codes indicating success (e.g., [0]).

    Returns:
        pd.DataFrame: A subset of the input DataFrame containing only accepted transactions.
    """
    # Check if column exists
    if status_col not in df.columns:
        print(f"WARNING: Status column '{status_col}' not found. Skipping status filter.")
        return df

    # Create boolean mask
    # We use isin() to support multiple accepted codes if necessary
    mask = df[status_col].isin(accepted_codes)

    # Return filtered copy
    return df.loc[mask].copy()

# -------------------------------------------------------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def filter_chain_data(
    df_validated: pd.DataFrame,
    meta_config: Any,
    preprocessing_config: Any
) -> FilteredChainData:
    """
    Orchestrates the temporal and status filtering of the on-chain data.

    Executes:
    1. Conversion of timestamps to UTC dates.
    2. Filtering rows outside the observation window.
    3. Filtering rows with failed transaction status.

    Args:
        df_validated (pd.DataFrame): The validated on-chain DataFrame from Task 2.
        meta_config (ValidatedMetaConfig): Metadata configuration from Task 1.
        preprocessing_config (Dict): The 'preprocessing' section of the raw config.

    Returns:
        FilteredChainData: Result object containing the filtered DataFrame and counts.
    """
    initial_rows = len(df_validated)
    print(f"Task 4: Starting filtering on {initial_rows} rows...")

    # Step 1: Date Conversion
    print("Task 4: Converting timestamps to UTC dates...")
    df_dated = convert_timestamp_to_utc_date(df_validated)

    # Step 2: Temporal Filter
    print(f"Task 4: Applying temporal filter ({meta_config.start_date.date()} to {meta_config.end_date.date()})...")
    # Ensure we use the Timestamp objects from the validated config
    df_time_filtered = apply_temporal_filter(
        df_dated,
        meta_config.start_date,
        meta_config.end_date
    )

    rows_after_time = len(df_time_filtered)
    dropped_time = initial_rows - rows_after_time
    print(f"  - Dropped {dropped_time} rows outside observation window.")

    # Step 3: Status Filter
    print("Task 4: Applying transaction status filter...")
    handling_config = preprocessing_config.get("handling", {})

    if handling_config.get("drop_failed_tx", True):
        status_col = handling_config.get("tx_status_column", "txStatus")
        accepted_codes = handling_config.get("accepted_status", [0])

        df_final = filter_transaction_status(
            df_time_filtered,
            status_col,
            accepted_codes
        )

        rows_after_status = len(df_final)
        dropped_status = rows_after_time - rows_after_status
        print(f"  - Dropped {dropped_status} failed transactions.")
    else:
        print("  - Skipping status filter (drop_failed_tx=False).")
        df_final = df_time_filtered
        rows_after_status = rows_after_time
        dropped_status = 0

    print(f"Task 4 Complete. Final Rows: {rows_after_status}")

    return FilteredChainData(
        filtered_df=df_final,
        initial_rows=initial_rows,
        rows_after_time_filter=rows_after_time,
        rows_after_status_filter=rows_after_status,
        dropped_time_count=dropped_time,
        dropped_status_count=dropped_status
    )


In [None]:
# Task 5 — Filter df_chain_raw by Token and Normalize Values

# ==============================================================================
# Task 5: Filter df_chain_raw by Token and Normalize Values
# ==============================================================================

@dataclass
class NormalizedChainData:
    """
    Container for the results of the token filtering and value normalization process.

    Attributes:
        normalized_df (pd.DataFrame): The DataFrame containing only USDT/USDC transfers with normalized USD values.
        rows_after_token_filter (int): Row count after keeping only target tokens.
        rows_after_zero_filter (int): Row count after removing zero-value transfers.
        dropped_token_count (int): Number of rows dropped because they were not USDT/USDC.
        dropped_zero_value_count (int): Number of rows dropped due to zero or negative value.
    """
    normalized_df: pd.DataFrame
    rows_after_token_filter: int
    rows_after_zero_filter: int
    dropped_token_count: int
    dropped_zero_value_count: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 1: Apply Token Address Filter
# -------------------------------------------------------------------------------------------------------------------------------

def filter_by_token_address(df: pd.DataFrame, assets_config: Dict[str, Any]) -> pd.DataFrame:
    """
    Filters the DataFrame to retain only USDT and USDC transfers and assigns a 'token' label.

    Args:
        df (pd.DataFrame): The filtered on-chain DataFrame from Task 4.
        assets_config (Dict[str, Any]): The 'assets' dictionary from the schema config.

    Returns:
        pd.DataFrame: A subset of the input DataFrame with a new 'token' column.
    """
    # Extract canonical addresses and normalize to lowercase
    usdt_addr = assets_config["USDT"]["address"].lower()
    usdc_addr = assets_config["USDC"]["address"].lower()

    # Normalize DataFrame addresses to lowercase for comparison
    # We use a temporary series to avoid modifying the original if not desired,
    # but here we are creating a new filtered DF anyway.
    addr_series = df["tokenAddress"].str.lower()

    # Create masks
    is_usdt = addr_series == usdt_addr
    is_usdc = addr_series == usdc_addr

    # Combine masks
    mask = is_usdt | is_usdc

    # Filter DataFrame
    df_out = df.loc[mask].copy()

    # Assign 'token' label
    # Initialize with empty string or NaN, then fill
    df_out["token"] = np.nan
    df_out.loc[is_usdt[mask].index, "token"] = "USDT"
    df_out.loc[is_usdc[mask].index, "token"] = "USDC"

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 2: Cast and Normalize the `value` Field
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_token_values(df: pd.DataFrame) -> pd.DataFrame:
    """
    Converts the raw 'value' string to a normalized float 'usd_value'.

    Implements Equation (1): USD Value = value / 10^6.
    Assumes both USDT and USDC have 6 decimals (verified in Task 1).

    Args:
        df (pd.DataFrame): The DataFrame with 'token' and 'value' columns.

    Returns:
        pd.DataFrame: The DataFrame with a new 'usd_value' column.
    """
    # Work on a copy
    df_out = df.copy()

    # 1. Convert object/string 'value' to Python integer (arbitrary precision)
    # We use a lambda to handle potential non-numeric strings gracefully if any slipped through,
    # though Task 2 validation should have caught them.
    # Using pd.to_numeric with downcast='integer' might overflow for uint256,
    # so we stick to python int for the intermediate step or direct float conversion if precision allows.
    # However, direct float conversion of uint256 string loses precision for very large numbers immediately.
    # Best approach: Convert to python int, then divide.

    try:
        # Vectorized conversion to numeric (float64) directly is risky for precision if values > 2^53.
        # But we are dividing by 10^6 immediately.
        # Let's use a safe apply approach for correctness over raw speed here,
        # as 'value' is an object column.
        int_values = df_out["value"].apply(int)
    except ValueError as e:
        raise ValueError(f"Failed to convert 'value' column to integers: {e}")

    # 2. Normalize
    # Both tokens have 6 decimals.
    # Division by 10^6 converts to float64.
    df_out["usd_value"] = int_values / 1_000_000.0

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Step 3: Remove Zero-Value Transfers
# -------------------------------------------------------------------------------------------------------------------------------

def remove_zero_value_transfers(df: pd.DataFrame) -> pd.DataFrame:
    """
    Removes rows where the normalized USD value is zero or negative.

    Args:
        df (pd.DataFrame): The DataFrame with 'usd_value'.

    Returns:
        pd.DataFrame: A subset of the input DataFrame.
    """
    # Filter condition: usd_value > 0
    # This implicitly removes 0 and any negative values (data errors)
    mask = df["usd_value"] > 0

    # Check for negative values to log a warning
    neg_count = (df["usd_value"] < 0).sum()
    if neg_count > 0:
        print(f"WARNING: Found {neg_count} negative value transactions. Removing them.")

    return df.loc[mask].copy()

# -------------------------------------------------------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_chain_data(
    filtered_data: Any,
    schemas: Any,
    preprocessing_config: Any
) -> NormalizedChainData:
    """
    Orchestrates the token filtering and value normalization.

    Executes:
    1. Filtering for USDT/USDC addresses.
    2. Normalization of raw values to USD units.
    3. Removal of zero-value transfers.

    Args:
        filtered_data (FilteredChainData): Result object from Task 4.
        schemas (ValidatedSchemas): Schema configuration from Task 1.
        preprocessing_config (Dict): Preprocessing configuration dictionary.

    Returns:
        NormalizedChainData: Result object containing the normalized DataFrame and counts.
    """
    df_input = filtered_data.filtered_df
    initial_rows = len(df_input)
    print(f"Task 5: Starting normalization on {initial_rows} rows...")

    # Step 1: Token Filter
    print("Task 5: Filtering for USDT and USDC...")
    df_token_filtered = filter_by_token_address(df_input, schemas.assets)

    rows_after_token = len(df_token_filtered)
    dropped_token = initial_rows - rows_after_token
    print(f"  - Dropped {dropped_token} rows (non-target tokens).")

    # Step 2: Value Normalization
    print("Task 5: Normalizing raw values (dividing by 10^6)...")
    df_normalized = normalize_token_values(df_token_filtered)

    # Step 3: Zero Value Filter
    print("Task 5: Removing zero-value transfers...")
    handling_config = preprocessing_config.get("handling", {})

    if handling_config.get("drop_zero_value", True):
        df_final = remove_zero_value_transfers(df_normalized)
        rows_after_zero = len(df_final)
        dropped_zero = rows_after_token - rows_after_zero
        print(f"  - Dropped {dropped_zero} zero-value rows.")
    else:
        print("  - Skipping zero-value filter (drop_zero_value=False).")
        df_final = df_normalized
        rows_after_zero = rows_after_token
        dropped_zero = 0

    print(f"Task 5 Complete. Final Rows: {rows_after_zero}")

    return NormalizedChainData(
        normalized_df=df_final,
        rows_after_token_filter=rows_after_token,
        rows_after_zero_filter=rows_after_zero,
        dropped_token_count=dropped_token,
        dropped_zero_value_count=dropped_zero
    )


In [None]:
# Task 6 — Deduplicate df_chain_raw

# ==============================================================================
# Task 6: Deduplicate df_chain_raw
# ==============================================================================

@dataclass
class DeduplicatedChainData:
    """
    Container for the results of the deduplication process.

    Attributes:
        deduplicated_df (pd.DataFrame): The DataFrame with duplicate logs removed.
        rows_before (int): Row count before deduplication.
        rows_after (int): Row count after deduplication.
        duplicates_removed (int): Number of duplicate rows removed.
        internal_tx_count (Optional[int]): Number of internal transactions (if flag present).
        top_level_tx_count (Optional[int]): Number of top-level transactions (if flag present).
    """
    deduplicated_df: pd.DataFrame
    rows_before: int
    rows_after: int
    duplicates_removed: int
    internal_tx_count: Optional[int]
    top_level_tx_count: Optional[int]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 1: Define Uniqueness Key
# -------------------------------------------------------------------------------------------------------------------------------

def check_uniqueness_stats(df: pd.DataFrame) -> Tuple[int, int, int]:
    """
    Calculates uniqueness statistics based on the composite primary key.

    Key: (transactionHash, blockNumber, logIndex)

    Args:
        df (pd.DataFrame): The normalized DataFrame.

    Returns:
        Tuple[int, int, int]: (total_rows, unique_rows, duplicate_count)
    """
    key_cols = ["transactionHash", "blockNumber", "logIndex"]

    total_rows = len(df)

    # Count unique combinations
    # drop_duplicates returns the unique rows, so len() gives unique count
    unique_rows = len(df.drop_duplicates(subset=key_cols))

    duplicate_count = total_rows - unique_rows

    return total_rows, unique_rows, duplicate_count

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 2: Remove Duplicate Rows
# -------------------------------------------------------------------------------------------------------------------------------

def remove_duplicates(df: pd.DataFrame) -> pd.DataFrame:
    """
    Removes duplicate rows based on the composite primary key.

    Sorts by blockNumber and logIndex to ensure deterministic retention of the 'first' record.

    Args:
        df (pd.DataFrame): The DataFrame containing duplicates.

    Returns:
        pd.DataFrame: A deduplicated DataFrame with a reset index.
    """
    key_cols = ["transactionHash", "blockNumber", "logIndex"]

    # Sort for deterministic behavior
    # We assume that if duplicates exist, they are identical.
    # If they differ in other fields, sorting ensures we pick a consistent one.
    df_sorted = df.sort_values(by=["blockNumber", "logIndex"], ascending=True)

    # Drop duplicates, keeping the first occurrence
    df_deduped = df_sorted.drop_duplicates(subset=key_cols, keep="first")

    # Reset index for cleanliness
    df_final = df_deduped.reset_index(drop=True)

    return df_final

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Step 3: Handle `isInternal` Flag (If Present)
# -------------------------------------------------------------------------------------------------------------------------------

def analyze_internal_transactions(df: pd.DataFrame) -> Tuple[Optional[int], Optional[int]]:
    """
    Analyzes the distribution of internal vs. top-level transactions if the flag exists.

    Args:
        df (pd.DataFrame): The deduplicated DataFrame.

    Returns:
        Tuple[Optional[int], Optional[int]]: (internal_count, top_level_count).
        Returns (None, None) if 'isInternal' column is missing.
    """
    if "isInternal" in df.columns:
        # Assuming 1 = Internal, 0 = Top-level (or boolean)
        # We cast to int to be safe if it's boolean
        try:
            is_internal = df["isInternal"].astype(int)
            internal_count = (is_internal == 1).sum()
            top_level_count = (is_internal == 0).sum()
            return internal_count, top_level_count
        except Exception:
            print("WARNING: Could not cast 'isInternal' to integer for analysis.")
            return None, None
    else:
        return None, None

# -------------------------------------------------------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def deduplicate_chain_data(normalized_data: Any) -> DeduplicatedChainData:
    """
    Orchestrates the deduplication of the on-chain data.

    Executes:
    1. Calculation of duplication statistics.
    2. Removal of duplicate logs based on unique key.
    3. Analysis of internal transaction flags (if available).

    Args:
        normalized_data (NormalizedChainData): Result object from Task 5.

    Returns:
        DeduplicatedChainData: Result object containing the clean DataFrame and stats.
    """
    df_input = normalized_data.normalized_df
    print("Task 6: Checking for duplicates...")

    # Step 1: Check Stats
    total, unique, dups = check_uniqueness_stats(df_input)

    if dups > 0:
        print(f"  - Found {dups} duplicate rows ({dups/total:.2%}). Removing...")
        # Step 2: Remove Duplicates
        df_deduped = remove_duplicates(df_input)
    else:
        print("  - No duplicates found.")
        df_deduped = df_input.copy().reset_index(drop=True)

    # Verify count
    rows_after = len(df_deduped)
    if rows_after != unique:
        print(f"WARNING: Post-deduplication count ({rows_after}) does not match expected unique count ({unique}).")

    # Step 3: Internal Tx Analysis
    print("Task 6: Analyzing internal transaction flags...")
    internal_count, top_level_count = analyze_internal_transactions(df_deduped)

    if internal_count is not None:
        print(f"  - Internal Tx: {internal_count}, Top-Level Tx: {top_level_count}")
    else:
        print("  - 'isInternal' flag not present. Assuming finalized log data.")

    print(f"Task 6 Complete. Final Rows: {rows_after}")

    return DeduplicatedChainData(
        deduplicated_df=df_deduped,
        rows_before=total,
        rows_after=rows_after,
        duplicates_removed=dups,
        internal_tx_count=internal_count,
        top_level_tx_count=top_level_count
    )


In [None]:
# Task 7 — Classify Transactions by Behavioral Topology

# ==============================================================================
# Task 7: Classify Transactions by Behavioral Topology
# ==============================================================================

@dataclass
class ClassifiedChainData:
    """
    Container for the results of the behavioral topology classification.

    Attributes:
        classified_df (pd.DataFrame): The DataFrame with the new 'topology' column.
        count_eoa_eoa (int): Number of human-driven transactions.
        count_sc_sc (int): Number of algorithmic transactions.
        count_other (int): Number of other transactions (mixed topology).
        total_rows (int): Total number of rows processed.
    """
    classified_df: pd.DataFrame
    count_eoa_eoa: int
    count_sc_sc: int
    count_other: int
    total_rows: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 1 & 2: Define Rules and Apply Classification
# -------------------------------------------------------------------------------------------------------------------------------

def apply_topology_classification(df: pd.DataFrame) -> pd.DataFrame:
    """
    Classifies transactions into behavioral topologies based on sender/receiver contract status.

    Rules:
    - EOA-EOA (Human): fromIsContract == 0 AND toIsContract == 0
    - SC-SC (Algo): fromIsContract == 1 AND toIsContract == 1
    - OTHER: All other combinations

    Args:
        df (pd.DataFrame): The deduplicated on-chain DataFrame.

    Returns:
        pd.DataFrame: A copy of the DataFrame with a new 'topology' column (categorical).
    """
    # Work on a copy
    df_out = df.copy()

    # Define conditions
    # We assume columns are int8/int64 as validated in Task 2
    cond_human = (df_out["fromIsContract"] == 0) & (df_out["toIsContract"] == 0)
    cond_algo = (df_out["fromIsContract"] == 1) & (df_out["toIsContract"] == 1)

    # Define choices corresponding to conditions
    choices = ["EOA-EOA", "SC-SC"]

    # Apply vectorized selection
    # default="OTHER" covers (0,1) and (1,0) cases
    df_out["topology"] = np.select([cond_human, cond_algo], choices, default="OTHER")

    # Convert to categorical for memory efficiency and performance
    df_out["topology"] = df_out["topology"].astype("category")

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Step 3: Validate Classification Integrity
# -------------------------------------------------------------------------------------------------------------------------------

def validate_topology_counts(df: pd.DataFrame) -> Dict[str, int]:
    """
    Validates the integrity of the topology classification.

    Checks that all rows are classified and counts sum to total.

    Args:
        df (pd.DataFrame): The classified DataFrame.

    Returns:
        Dict[str, int]: Dictionary of counts per topology.

    Raises:
        ValueError: If classification resulted in nulls or count mismatch.
    """
    # Check for nulls
    if df["topology"].isnull().any():
        raise ValueError("Topology classification resulted in null values.")

    # Get value counts
    counts = df["topology"].value_counts().to_dict()

    # Ensure all keys exist in dict even if 0
    for key in ["EOA-EOA", "SC-SC", "OTHER"]:
        if key not in counts:
            counts[key] = 0

    # Verify sum
    total_classified = sum(counts.values())
    total_rows = len(df)

    if total_classified != total_rows:
        raise ValueError(f"Mismatch in classification counts: {total_classified} vs {total_rows}")

    return counts

# -------------------------------------------------------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def classify_chain_topology(deduplicated_data: Any) -> ClassifiedChainData:
    """
    Orchestrates the classification of transactions into behavioral topologies.

    Executes:
    1. Application of classification rules (EOA-EOA vs SC-SC).
    2. Validation of classification integrity.

    Args:
        deduplicated_data (DeduplicatedChainData): Result object from Task 6.

    Returns:
        ClassifiedChainData: Result object containing the classified DataFrame and stats.
    """
    df_input = deduplicated_data.deduplicated_df
    print("Task 7: Classifying behavioral topology...")

    # Step 1 & 2: Apply Classification
    df_classified = apply_topology_classification(df_input)

    # Step 3: Validate
    counts = validate_topology_counts(df_classified)

    count_eoa = counts["EOA-EOA"]
    count_sc = counts["SC-SC"]
    count_other = counts["OTHER"]
    total = len(df_classified)

    print(f"Task 7 Complete. Total Rows: {total}")
    print(f"  - EOA-EOA (Human): {count_eoa} ({count_eoa/total:.1%})")
    print(f"  - SC-SC (Algo):    {count_sc} ({count_sc/total:.1%})")
    print(f"  - OTHER:           {count_other} ({count_other/total:.1%})")

    return ClassifiedChainData(
        classified_df=df_classified,
        count_eoa_eoa=count_eoa,
        count_sc_sc=count_sc,
        count_other=count_other,
        total_rows=total
    )


In [None]:
# Task 8 — Aggregate Daily Volumes by Token and Topology

# ==============================================================================
# Task 8: Aggregate Daily Volumes by Token and Topology
# ==============================================================================

@dataclass
class AggregatedChainData:
    """
    Container for the aggregated daily on-chain volume series.

    Attributes:
        daily_df (pd.DataFrame): DataFrame indexed by Date with 4 volume columns.
        start_date (pd.Timestamp): Start of the aggregation window.
        end_date (pd.Timestamp): End of the aggregation window.
        total_days (int): Number of days in the sequence.
    """
    daily_df: pd.DataFrame
    start_date: pd.Timestamp
    end_date: pd.Timestamp
    total_days: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 1: Define Aggregation Groups
# -------------------------------------------------------------------------------------------------------------------------------

def group_and_sum_volumes(df: pd.DataFrame) -> pd.DataFrame:
    """
    Groups transactions by Date, Token, and Topology and sums the USD value.

    Args:
        df (pd.DataFrame): The classified DataFrame with 'txDate', 'token', 'topology', 'usd_value'.

    Returns:
        pd.DataFrame: A DataFrame with a MultiIndex (txDate, token, topology) and summed 'usd_value'.
    """
    # Group by the three dimensions
    # observed=True ensures we only get combinations that exist in data initially
    grouped = df.groupby(["txDate", "token", "topology"], observed=True)["usd_value"].sum()

    return grouped.to_frame(name="volume")

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 2: Pivot to Wide Format
# -------------------------------------------------------------------------------------------------------------------------------

def pivot_to_wide_format(grouped_df: pd.DataFrame) -> pd.DataFrame:
    """
    Pivots the grouped data into a wide format with specific column names.

    Target Columns:
    - V_EOA_EOA_USDT
    - V_EOA_EOA_USDC
    - V_SC_SC_USDT
    - V_SC_SC_USDC

    Args:
        grouped_df (pd.DataFrame): The grouped DataFrame from Step 1.

    Returns:
        pd.DataFrame: A wide DataFrame indexed by Date.
    """
    # Reset index to make columns accessible for pivoting
    df_flat = grouped_df.reset_index()

    # Filter for only the topologies we care about (EOA-EOA, SC-SC)
    # We ignore "OTHER" for the specific time series construction as per instructions
    target_topologies = ["EOA-EOA", "SC-SC"]
    df_filtered = df_flat[df_flat["topology"].isin(target_topologies)].copy()

    # Create a composite column name for pivoting
    # Format: V_{TOPOLOGY}_{TOKEN}
    # We replace hyphens in topology with underscores for variable-name friendliness if needed,
    # but the paper uses EOA-EOA. Let's stick to the requested output format: V_EOA_EOA_USDT.
    # So we replace '-' with '_' in topology.

    # Ensure topology is string for manipulation
    topo_str = df_filtered["topology"].astype(str).str.replace("-", "_")
    token_str = df_filtered["token"].astype(str)

    df_filtered["col_name"] = "V_" + topo_str + "_" + token_str

    # Pivot
    # Index: txDate
    # Columns: col_name
    # Values: volume
    wide_df = df_filtered.pivot(index="txDate", columns="col_name", values="volume")

    # Fill NaNs with 0 (days where a specific category had no volume)
    wide_df = wide_df.fillna(0.0)

    return wide_df

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Step 3: Fill Missing Dates
# -------------------------------------------------------------------------------------------------------------------------------

def reindex_to_full_window(
    df: pd.DataFrame,
    start_date: pd.Timestamp,
    end_date: pd.Timestamp
) -> pd.DataFrame:
    """
    Reindexes the DataFrame to ensure a complete daily sequence from start_date to end_date.

    Args:
        df (pd.DataFrame): The wide DataFrame.
        start_date (pd.Timestamp): Start of window (UTC).
        end_date (pd.Timestamp): End of window (UTC).

    Returns:
        pd.DataFrame: The reindexed DataFrame with 0-filling for missing days.
    """
    # Generate full date range
    full_idx = pd.date_range(start=start_date, end=end_date, freq='D', tz='UTC').normalize()

    # Reindex
    # fill_value=0.0 ensures missing days get 0 volume
    df_reindexed = df.reindex(full_idx, fill_value=0.0)

    # Rename index to 'Date' for consistency with market data
    df_reindexed.index.name = "Date"

    # Ensure all 4 expected columns exist, even if data was completely missing for one
    expected_cols = [
        "V_EOA_EOA_USDT", "V_EOA_EOA_USDC",
        "V_SC_SC_USDT", "V_SC_SC_USDC"
    ]
    for col in expected_cols:
        if col not in df_reindexed.columns:
            df_reindexed[col] = 0.0

    # Sort columns for consistent order
    df_reindexed = df_reindexed[expected_cols]

    return df_reindexed

# -------------------------------------------------------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def aggregate_chain_volumes(
    classified_data: Any,
    meta_config: Any
) -> AggregatedChainData:
    """
    Orchestrates the aggregation of daily on-chain volumes.

    Executes:
    1. Grouping and summing by date/token/topology.
    2. Pivoting to wide format.
    3. Reindexing to the full observation window.

    Args:
        classified_data (ClassifiedChainData): Result object from Task 7.
        meta_config (ValidatedMetaConfig): Metadata configuration from Task 1.

    Returns:
        AggregatedChainData: Result object containing the daily time series.
    """
    df_input = classified_data.classified_df
    print("Task 8: Aggregating daily volumes...")

    # Step 1: Group and Sum
    grouped_df = group_and_sum_volumes(df_input)

    # Step 2: Pivot
    wide_df = pivot_to_wide_format(grouped_df)

    # Step 3: Reindex
    # Ensure we use the validated timestamps
    start_ts = pd.Timestamp(meta_config.start_date)
    end_ts = pd.Timestamp(meta_config.end_date)

    final_df = reindex_to_full_window(wide_df, start_ts, end_ts)

    # Reset index to make Date a column, as requested in Task 8 Step 2 ("Create a daily DataFrame with columns: Date...")
    # But keeping it as index is often easier for plotting/merging.
    # The instructions say "Create a daily DataFrame with columns: Date...".
    # Let's reset index to be strictly compliant with "columns: Date".
    final_df_reset = final_df.reset_index()

    total_days = len(final_df_reset)
    print(f"Task 8 Complete. Generated {total_days} daily observations.")
    print(f"Columns: {list(final_df_reset.columns)}")

    return AggregatedChainData(
        daily_df=final_df_reset,
        start_date=start_ts,
        end_date=end_ts,
        total_days=total_days
    )


In [None]:
# Task 9 — Validate Daily On-Chain Series Quality

# ==============================================================================
# Task 9: Validate Daily On-Chain Series Quality
# ==============================================================================

@dataclass
class ChainSeriesValidationResult:
    """
    Container for the validation results of the daily on-chain time series.

    Attributes:
        is_valid (bool): True if all critical checks passed.
        stats_df (pd.DataFrame): Descriptive statistics for each volume series.
        outliers (Dict[str, List[pd.Timestamp]]): Dates where volume exceeded mean + 4 sigma.
        zero_counts (Dict[str, int]): Number of days with zero volume per series.
        plot_specifications (List[Dict[str, Any]]): Descriptions of required visualizations.
    """
    is_valid: bool
    stats_df: pd.DataFrame
    outliers: Dict[str, List[pd.Timestamp]]
    zero_counts: Dict[str, int]
    plot_specifications: List[Dict[str, Any]]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 1: Check for Non-Negativity and Missing Values
# -------------------------------------------------------------------------------------------------------------------------------

def check_series_integrity(df: pd.DataFrame, columns: List[str]) -> bool:
    """
    Validates that the specified columns contain no negative values and no NaNs.

    Args:
        df (pd.DataFrame): The daily aggregated DataFrame.
        columns (List[str]): List of volume column names to check.

    Returns:
        bool: True if valid. Raises ValueError if invalid.
    """
    for col in columns:
        # Check for NaNs
        if df[col].isnull().any():
            raise ValueError(f"Series '{col}' contains NaN values.")

        # Check for Negatives
        if (df[col] < 0).any():
            neg_count = (df[col] < 0).sum()
            raise ValueError(f"Series '{col}' contains {neg_count} negative values.")

    return True

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 2: Compute Descriptive Statistics
# -------------------------------------------------------------------------------------------------------------------------------

def compute_series_statistics(df: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
    """
    Computes descriptive statistics for the volume series.

    Metrics: Mean, Median, Std, Min, Max, Skewness, Kurtosis.

    Args:
        df (pd.DataFrame): The daily aggregated DataFrame.
        columns (List[str]): List of volume column names.

    Returns:
        pd.DataFrame: A DataFrame with statistics as rows and series as columns.
    """
    stats_dict = {}

    for col in columns:
        series = df[col]
        stats_dict[col] = {
            "mean": series.mean(),
            "median": series.median(),
            "std": series.std(),
            "min": series.min(),
            "max": series.max(),
            "skew": series.skew(),
            "kurtosis": series.kurtosis()
        }

    return pd.DataFrame(stats_dict)

def detect_outliers(df: pd.DataFrame, columns: List[str]) -> Dict[str, List[pd.Timestamp]]:
    """
    Identifies dates where volume exceeds the mean + 4 sigma threshold.

    Args:
        df (pd.DataFrame): The daily aggregated DataFrame.
        columns (List[str]): List of volume column names.

    Returns:
        Dict[str, List[pd.Timestamp]]: Dictionary mapping series name to list of outlier dates.
    """
    outliers = {}

    for col in columns:
        series = df[col]
        mean = series.mean()
        std = series.std()
        threshold = mean + 4 * std

        # Identify outliers
        outlier_mask = series > threshold
        outlier_dates = df.loc[outlier_mask, "Date"].tolist()
        outliers[col] = outlier_dates

    return outliers

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Step 3: Visual Inspection Points (Specification Only)
# -------------------------------------------------------------------------------------------------------------------------------

def generate_plot_specs(columns: List[str], meta_config: Any) -> List[Dict[str, Any]]:
    """
    Generates specifications for required time-series visualizations.

    Args:
        columns (List[str]): List of volume column names.
        meta_config (ValidatedMetaConfig): Metadata containing critical event dates.

    Returns:
        List[Dict[str, Any]]: List of plot specification dictionaries.
    """
    specs = []

    # Extract event dates as strings for the spec
    election_date = meta_config.election_day.date().isoformat()
    human_signal_date = meta_config.human_signal_date.date().isoformat()

    for col in columns:
        spec = {
            "title": f"Daily Volume: {col}",
            "x_axis": "Date",
            "y_axis": col,
            "annotations": [
                {"date": election_date, "label": "Election Day", "color": "red", "style": "dashed"},
                {"date": human_signal_date, "label": "Human Signal", "color": "blue", "style": "dotted"}
            ],
            "description": f"Time series plot of {col} with critical political events marked."
        }
        specs.append(spec)

    return specs

# -------------------------------------------------------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def validate_daily_chain_series(
    aggregated_data: Any,
    meta_config: Any
) -> ChainSeriesValidationResult:
    """
    Orchestrates the validation and statistical analysis of the daily on-chain series.

    Executes:
    1. Integrity checks (non-negativity, no NaNs).
    2. Computation of descriptive statistics and outlier detection.
    3. Generation of visualization specifications.

    Args:
        aggregated_data (AggregatedChainData): Result object from Task 8.
        meta_config (ValidatedMetaConfig): Metadata configuration from Task 1.

    Returns:
        ChainSeriesValidationResult: Result object containing stats and validation status.
    """
    df = aggregated_data.daily_df

    # Identify the 4 volume columns
    # We look for columns starting with "V_"
    vol_cols = [c for c in df.columns if c.startswith("V_")]

    print(f"Task 9: Validating series: {vol_cols}")

    # Step 1: Integrity Check
    check_series_integrity(df, vol_cols)
    print("  - Integrity check passed (no NaNs, no negatives).")

    # Step 2: Statistics & Outliers
    stats_df = compute_series_statistics(df, vol_cols)
    outliers = detect_outliers(df, vol_cols)

    # Check zero counts
    zero_counts = {}
    for col in vol_cols:
        zeros = (df[col] == 0).sum()
        zero_counts[col] = zeros
        if zeros / len(df) > 0.05:
            print(f"WARNING: Series '{col}' has {zeros} zero-volume days (>5%).")

    # Step 3: Plot Specs
    plot_specs = generate_plot_specs(vol_cols, meta_config)

    print("Task 9 Complete. Statistics computed.")

    return ChainSeriesValidationResult(
        is_valid=True,
        stats_df=stats_df,
        outliers=outliers,
        zero_counts=zero_counts,
        plot_specifications=plot_specs
    )


In [None]:
# Task 10 — Cleanse and Normalize df_market_raw

# ==============================================================================
# Task 10: Cleanse and Normalize df_market_raw
# ==============================================================================

@dataclass
class CleanedMarketData:
    """
    Container for the cleansed market data.

    Attributes:
        cleaned_df (pd.DataFrame): The DataFrame with normalized dates and valid numeric values.
        initial_rows (int): Row count before cleansing.
        rows_after_time_filter (int): Row count after applying observation window.
        rows_after_numeric_clean (int): Row count after removing invalid/missing numeric data.
        dropped_time_count (int): Number of rows dropped due to time window.
        dropped_numeric_count (int): Number of rows dropped due to missing/invalid values.
    """
    cleaned_df: pd.DataFrame
    initial_rows: int
    rows_after_time_filter: int
    rows_after_numeric_clean: int
    dropped_time_count: int
    dropped_numeric_count: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 1: Normalize Date Column to UTC
# -------------------------------------------------------------------------------------------------------------------------------

def normalize_market_dates(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalizes the 'Date' column to UTC midnight.

    Args:
        df (pd.DataFrame): The validated market DataFrame.

    Returns:
        pd.DataFrame: A copy of the DataFrame with normalized dates.
    """
    df_out = df.copy()

    # Ensure UTC and normalize to midnight
    df_out["Date"] = pd.to_datetime(df_out["Date"], utc=True).dt.normalize()

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 2: Apply Temporal Filter
# -------------------------------------------------------------------------------------------------------------------------------

def filter_market_window(
    df: pd.DataFrame,
    start_date: pd.Timestamp,
    end_date: pd.Timestamp
) -> pd.DataFrame:
    """
    Filters the market data to the observation window.

    Args:
        df (pd.DataFrame): The DataFrame with normalized dates.
        start_date (pd.Timestamp): Start of window (UTC).
        end_date (pd.Timestamp): End of window (UTC).

    Returns:
        pd.DataFrame: A subset of the input DataFrame.
    """
    # Ensure start/end are normalized
    s_norm = start_date.normalize()
    e_norm = end_date.normalize()

    mask = (df["Date"] >= s_norm) & (df["Date"] <= e_norm)

    return df.loc[mask].copy()

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Step 3: Validate Numeric Columns
# -------------------------------------------------------------------------------------------------------------------------------

def clean_numeric_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Validates and cleans numeric columns (Close, Volume).

    - Drops rows with NaN in Close or Volume.
    - Drops rows with Close <= 0 or Volume < 0.

    Args:
        df (pd.DataFrame): The temporally filtered DataFrame.

    Returns:
        pd.DataFrame: The cleaned DataFrame.
    """
    # Check for NaNs
    # We drop rows with any missing critical data
    df_clean = df.dropna(subset=["Close", "Volume"]).copy()

    # Check for invalid values
    # Close must be > 0
    # Volume must be >= 0
    valid_mask = (df_clean["Close"] > 0) & (df_clean["Volume"] >= 0)

    invalid_count = (~valid_mask).sum()
    if invalid_count > 0:
        print(f"WARNING: Dropping {invalid_count} rows with invalid numeric values (Close<=0 or Volume<0).")

    return df_clean.loc[valid_mask].copy()

# -------------------------------------------------------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def cleanse_market_data(
    validation_result: Any,
    meta_config: Any
) -> CleanedMarketData:
    """
    Orchestrates the cleansing of market data.

    Executes:
    1. Date normalization.
    2. Temporal filtering.
    3. Numeric validation and cleaning.

    Args:
        validation_result (MarketValidationResult): Result object from Task 3.
        meta_config (ValidatedMetaConfig): Metadata configuration from Task 1.

    Returns:
        CleanedMarketData: Result object containing the clean DataFrame and counts.
    """
    df_input = validation_result.validated_df
    initial_rows = len(df_input)
    print(f"Task 10: Cleansing market data ({initial_rows} rows)...")

    # Step 1: Normalize Dates
    df_dated = normalize_market_dates(df_input)

    # Step 2: Temporal Filter
    print(f"Task 10: Applying temporal filter ({meta_config.start_date.date()} to {meta_config.end_date.date()})...")
    df_time_filtered = filter_market_window(
        df_dated,
        meta_config.start_date,
        meta_config.end_date
    )

    rows_after_time = len(df_time_filtered)
    dropped_time = initial_rows - rows_after_time
    print(f"  - Dropped {dropped_time} rows outside window.")

    # Step 3: Numeric Cleaning
    print("Task 10: Cleaning numeric columns...")
    df_final = clean_numeric_columns(df_time_filtered)

    rows_after_numeric = len(df_final)
    dropped_numeric = rows_after_time - rows_after_numeric
    print(f"  - Dropped {dropped_numeric} rows with missing/invalid data.")

    print(f"Task 10 Complete. Final Rows: {rows_after_numeric}")

    return CleanedMarketData(
        cleaned_df=df_final,
        initial_rows=initial_rows,
        rows_after_time_filter=rows_after_time,
        rows_after_numeric_clean=rows_after_numeric,
        dropped_time_count=dropped_time,
        dropped_numeric_count=dropped_numeric
    )


In [None]:
# Task 11 — Extract Asset-Specific Market Series

# ==============================================================================
# Task 11: Extract Asset-Specific Market Series
# ==============================================================================

@dataclass
class MarketSeriesData:
    """
    Container for the extracted and combined market time series.

    Attributes:
        market_daily_df (pd.DataFrame): The combined DataFrame with Date and asset-specific columns.
        btc_price_series (pd.Series): Daily BTC closing prices.
        eth_price_series (pd.Series): Daily ETH closing prices.
        usdt_volume_series (pd.Series): Daily USDT exchange volume.
        usdc_volume_series (pd.Series): Daily USDC exchange volume.
        common_dates_count (int): Number of days where all assets have data.
    """
    market_daily_df: pd.DataFrame
    btc_price_series: pd.Series
    eth_price_series: pd.Series
    usdt_volume_series: pd.Series
    usdc_volume_series: pd.Series
    common_dates_count: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 1: Filter and Extract BTC and ETH Price Series
# -------------------------------------------------------------------------------------------------------------------------------

def extract_price_series(df: pd.DataFrame, symbol: str, col_name: str) -> pd.Series:
    """
    Extracts the closing price series for a specific symbol.

    Args:
        df (pd.DataFrame): The cleaned market DataFrame.
        symbol (str): The trading symbol (e.g., "BTC/USD").
        col_name (str): The name for the output series (e.g., "Close_BTC_USD").

    Returns:
        pd.Series: Time series of closing prices indexed by Date.
    """
    # Filter by symbol
    mask = df["Symbol"] == symbol
    subset = df.loc[mask].copy()

    # Check for duplicate dates
    if subset["Date"].duplicated().any():
        raise ValueError(f"Duplicate dates found for symbol {symbol}.")

    # Set index and select Close
    series = subset.set_index("Date")["Close"]
    series.name = col_name

    return series

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 2: Filter and Extract Stablecoin Volume Series
# -------------------------------------------------------------------------------------------------------------------------------

def extract_volume_series(df: pd.DataFrame, symbol: str, col_name: str) -> pd.Series:
    """
    Extracts the volume series for a specific symbol.

    Args:
        df (pd.DataFrame): The cleaned market DataFrame.
        symbol (str): The trading symbol (e.g., "USDT/USD").
        col_name (str): The name for the output series (e.g., "Volume_USDT_USD").

    Returns:
        pd.Series: Time series of volume indexed by Date.
    """
    # Filter by symbol
    mask = df["Symbol"] == symbol
    subset = df.loc[mask].copy()

    # Check for duplicate dates
    if subset["Date"].duplicated().any():
        raise ValueError(f"Duplicate dates found for symbol {symbol}.")

    # Set index and select Volume
    series = subset.set_index("Date")["Volume"]
    series.name = col_name

    return series

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Step 3: Combine into Market Daily DataFrame
# -------------------------------------------------------------------------------------------------------------------------------

def combine_market_series(series_dict: Dict[str, pd.Series]) -> pd.DataFrame:
    """
    Combines individual time series into a single DataFrame aligned by Date.

    Uses inner join to ensure only dates with complete data across all assets are retained.

    Args:
        series_dict (Dict[str, pd.Series]): Dictionary of named time series.

    Returns:
        pd.DataFrame: Combined DataFrame with 'Date' as a column.
    """
    # Concatenate with inner join to align dates
    df_combined = pd.concat(series_dict.values(), axis=1, join="inner")

    # Sort by date
    df_combined = df_combined.sort_index()

    # Reset index to make Date a column
    df_final = df_combined.reset_index()

    return df_final

# -------------------------------------------------------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def extract_market_series(cleaned_data: Any) -> MarketSeriesData:
    """
    Orchestrates the extraction and combination of market time series.

    Executes:
    1. Extraction of BTC/ETH prices.
    2. Extraction of USDT/USDC volumes.
    3. Combination into a unified daily DataFrame.

    Args:
        cleaned_data (CleanedMarketData): Result object from Task 10.

    Returns:
        MarketSeriesData: Result object containing the series and combined DataFrame.
    """
    df_input = cleaned_data.cleaned_df
    print("Task 11: Extracting asset-specific series...")

    # Step 1: Prices
    btc_series = extract_price_series(df_input, "BTC/USD", "Close_BTC_USD")
    eth_series = extract_price_series(df_input, "ETH/USD", "Close_ETH_USD")

    # Step 2: Volumes
    usdt_series = extract_volume_series(df_input, "USDT/USD", "Volume_USDT_USD")
    usdc_series = extract_volume_series(df_input, "USDC/USD", "Volume_USDC_USD")

    # Step 3: Combine
    series_map = {
        "Close_BTC_USD": btc_series,
        "Close_ETH_USD": eth_series,
        "Volume_USDT_USD": usdt_series,
        "Volume_USDC_USD": usdc_series
    }

    df_combined = combine_market_series(series_map)

    count = len(df_combined)
    print(f"Task 11 Complete. Combined {count} common trading days.")
    print(f"Columns: {list(df_combined.columns)}")

    return MarketSeriesData(
        market_daily_df=df_combined,
        btc_price_series=btc_series,
        eth_price_series=eth_series,
        usdt_volume_series=usdt_series,
        usdc_volume_series=usdc_series,
        common_dates_count=count
    )


In [None]:
# Task 12 — Merge On-Chain and Market Data

# ==============================================================================
# Task 12: Merge On-Chain and Market Data
# ==============================================================================

@dataclass
class MergedPanelData:
    """
    Container for the final merged daily panel dataset.

    Attributes:
        df_panel (pd.DataFrame): The merged DataFrame containing aligned on-chain and market series.
        chain_rows (int): Number of rows in the on-chain input.
        market_rows (int): Number of rows in the market input.
        merged_rows (int): Number of rows after inner join.
        start_date (pd.Timestamp): Earliest date in the panel.
        end_date (pd.Timestamp): Latest date in the panel.
    """
    df_panel: pd.DataFrame
    chain_rows: int
    market_rows: int
    merged_rows: int
    start_date: pd.Timestamp
    end_date: pd.Timestamp

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 1: Perform Inner Join on Date
# -------------------------------------------------------------------------------------------------------------------------------

def join_datasets(df_chain: pd.DataFrame, df_market: pd.DataFrame) -> pd.DataFrame:
    """
    Performs an inner join between on-chain and market data on the 'Date' column.

    Args:
        df_chain (pd.DataFrame): Daily on-chain volumes.
        df_market (pd.DataFrame): Daily market prices and volumes.

    Returns:
        pd.DataFrame: The merged DataFrame.
    """
    # Ensure Date is a column in both
    # Task 8 and 11 ensured Date is a column, but we double check/reset if needed
    # to be robust against upstream changes.
    # Check dtypes
    if not pd.api.types.is_datetime64_any_dtype(df_chain["Date"]):
        raise ValueError("df_chain 'Date' column is not datetime.")
    if not pd.api.types.is_datetime64_any_dtype(df_market["Date"]):
        raise ValueError("df_market 'Date' column is not datetime.")

    # Perform Inner Join
    df_merged = pd.merge(
        df_chain,
        df_market,
        on="Date",
        how="inner",
        validate="1:1"  # Ensure uniqueness on both sides
    )

    return df_merged

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 2: Verify Merged DataFrame Completeness
# -------------------------------------------------------------------------------------------------------------------------------

def verify_panel_completeness(df: pd.DataFrame) -> pd.DataFrame:
    """
    Verifies that the merged DataFrame contains all required columns and no NaNs.

    Required Columns:
    - Date
    - V_EOA_EOA_USDT, V_EOA_EOA_USDC
    - V_SC_SC_USDT, V_SC_SC_USDC
    - Volume_USDT_USD, Volume_USDC_USD
    - Close_BTC_USD, Close_ETH_USD

    Args:
        df (pd.DataFrame): The merged DataFrame.

    Returns:
        pd.DataFrame: The verified DataFrame (rows with NaNs dropped).
    """
    required_cols = [
        "Date",
        "V_EOA_EOA_USDT", "V_EOA_EOA_USDC",
        "V_SC_SC_USDT", "V_SC_SC_USDC",
        "Volume_USDT_USD", "Volume_USDC_USD",
        "Close_BTC_USD", "Close_ETH_USD"
    ]

    # Check column presence
    missing = set(required_cols) - set(df.columns)
    if missing:
        raise ValueError(f"Merged panel is missing columns: {missing}")

    # Check for NaNs
    if df[required_cols].isnull().any().any():
        nan_count = df[required_cols].isnull().any(axis=1).sum()
        print(f"WARNING: Found {nan_count} rows with NaNs in merged panel. Dropping them.")
        df_clean = df.dropna(subset=required_cols).copy()
    else:
        df_clean = df.copy()

    return df_clean

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Step 3: Finalize Panel Structure
# -------------------------------------------------------------------------------------------------------------------------------

def finalize_panel(df: pd.DataFrame) -> pd.DataFrame:
    """
    Finalizes the panel structure: sorts by Date and resets index.

    Args:
        df (pd.DataFrame): The verified DataFrame.

    Returns:
        pd.DataFrame: The final panel DataFrame.
    """
    # Sort by Date
    df_sorted = df.sort_values("Date", ascending=True)

    # Reset Index
    df_final = df_sorted.reset_index(drop=True)

    return df_final

# -------------------------------------------------------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def merge_data_sources(
    chain_data: Any,
    market_data: Any
) -> MergedPanelData:
    """
    Orchestrates the merging of on-chain and market data into a unified panel.

    Executes:
    1. Inner join on Date.
    2. Completeness verification (dropping NaNs).
    3. Final sorting and indexing.

    Args:
        chain_data (AggregatedChainData): Result object from Task 8.
        market_data (MarketSeriesData): Result object from Task 11.

    Returns:
        MergedPanelData: Result object containing the final panel.
    """
    df_chain = chain_data.daily_df
    df_market = market_data.market_daily_df

    print("Task 12: Merging on-chain and market data...")

    # Step 1: Join
    df_merged_raw = join_datasets(df_chain, df_market)

    # Step 2: Verify
    df_verified = verify_panel_completeness(df_merged_raw)

    # Step 3: Finalize
    df_panel = finalize_panel(df_verified)

    rows = len(df_panel)
    start = df_panel["Date"].min()
    end = df_panel["Date"].max()

    print(f"Task 12 Complete. Final Panel: {rows} rows ({start.date()} to {end.date()}).")

    return MergedPanelData(
        df_panel=df_panel,
        chain_rows=len(df_chain),
        market_rows=len(df_market),
        merged_rows=rows,
        start_date=start,
        end_date=end
    )


In [None]:
# Task 13 — Construct Log-Transformed Series

# ==============================================================================
# Task 13: Construct Log-Transformed Series
# ==============================================================================

@dataclass
class LogTransformedData:
    """
    Container for the panel data with log-transformed series.

    Attributes:
        df_log (pd.DataFrame): The DataFrame containing original and log-transformed series.
        log_columns (List[str]): List of names of the newly created log columns.
        transformation_map (Dict[str, str]): Mapping from original column to log column.
    """
    df_log: pd.DataFrame
    log_columns: List[str]
    transformation_map: Dict[str, str]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 1: Identify Series for Log Transformation
# -------------------------------------------------------------------------------------------------------------------------------

def identify_target_series() -> Dict[str, str]:
    """
    Identifies the series requiring log transformation and defines their types.

    Returns:
        Dict[str, str]: Mapping of column name to transformation type ('log1p' for volumes, 'log' for prices).
    """
    # Volume series (can be 0) -> log1p
    volumes = [
        "V_EOA_EOA_USDT", "V_EOA_EOA_USDC",
        "V_SC_SC_USDT", "V_SC_SC_USDC",
        "Volume_USDT_USD", "Volume_USDC_USD"
    ]

    # Price series (strictly positive) -> log
    prices = [
        "Close_BTC_USD", "Close_ETH_USD"
    ]

    targets = {col: "log1p" for col in volumes}
    targets.update({col: "log" for col in prices})

    return targets

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Step 2 & 3: Handle Zero Values and Create Log Columns
# -------------------------------------------------------------------------------------------------------------------------------

def apply_log_transformations(df: pd.DataFrame, targets: Dict[str, str]) -> pd.DataFrame:
    """
    Applies logarithmic transformations to specified columns.

    - Uses np.log1p(x) = log(x + 1) for volume series (handling zeros).
    - Uses np.log(x) for price series (strictly positive).

    Args:
        df (pd.DataFrame): The merged panel DataFrame.
        targets (Dict[str, str]): Mapping of column names to transformation type.

    Returns:
        pd.DataFrame: A copy of the DataFrame with new 'log_' columns.
    """
    df_out = df.copy()

    for col, method in targets.items():
        if col not in df_out.columns:
            raise ValueError(f"Column '{col}' missing from DataFrame.")

        new_col = f"log_{col}"

        if method == "log1p":
            # Ensure non-negative
            if (df_out[col] < 0).any():
                raise ValueError(f"Column '{col}' contains negative values, cannot apply log1p.")
            df_out[new_col] = np.log1p(df_out[col])

        elif method == "log":
            # Ensure strictly positive
            if (df_out[col] <= 0).any():
                raise ValueError(f"Column '{col}' contains non-positive values, cannot apply log.")
            df_out[new_col] = np.log(df_out[col])

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def construct_log_series(merged_data: Any) -> LogTransformedData:
    """
    Orchestrates the creation of log-transformed series.

    Executes:
    1. Identification of target columns.
    2. Application of log/log1p transformations.

    Args:
        merged_data (MergedPanelData): Result object from Task 12.

    Returns:
        LogTransformedData: Result object containing the augmented DataFrame.
    """
    df_input = merged_data.df_panel
    print("Task 13: Constructing log-transformed series...")

    # Step 1: Identify
    targets = identify_target_series()

    # Step 2 & 3: Transform
    df_log = apply_log_transformations(df_input, targets)

    # Metadata
    log_cols = [f"log_{c}" for c in targets.keys()]
    trans_map = {c: f"log_{c}" for c in targets.keys()}

    print(f"Task 13 Complete. Created {len(log_cols)} log columns.")

    return LogTransformedData(
        df_log=df_log,
        log_columns=log_cols,
        transformation_map=trans_map
    )


In [None]:
# Task 14 — Perform ADF Stationarity Tests

# ==============================================================================
# Task 14: Perform ADF Stationarity Tests
# ==============================================================================

@dataclass
class ADFResult:
    """
    Container for a single Augmented Dickey-Fuller test result.

    Attributes:
        series_name (str): Name of the time series tested.
        test_statistic (float): The computed ADF statistic.
        p_value (float): The p-value associated with the test statistic.
        used_lag (int): The number of lags used in the regression.
        n_obs (int): The number of observations used for the ADF regression.
        critical_values (Dict[str, float]): Critical values for 1%, 5%, and 10%.
        is_stationary (bool): True if null hypothesis (unit root) is rejected at 5%.
        integration_order (str): 'I(0)' if stationary, 'I(1)' if non-stationary (provisional).
    """
    series_name: str
    test_statistic: float
    p_value: float
    used_lag: int
    n_obs: int
    critical_values: Dict[str, float]
    is_stationary: bool
    integration_order: str

@dataclass
class StationarityTestResults:
    """
    Container for the results of stationarity tests across all series.

    Attributes:
        results (List[ADFResult]): List of individual test results.
        summary_df (pd.DataFrame): Summary table of statistics and decisions.
        i1_series (List[str]): List of series names identified as I(1) (non-stationary).
    """
    results: List[ADFResult]
    summary_df: pd.DataFrame
    i1_series: List[str]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 1: Specify ADF Test Configuration
# -------------------------------------------------------------------------------------------------------------------------------

def get_adf_config(preprocessing_config: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extracts and formats the ADF test configuration.

    Args:
        preprocessing_config (Dict[str, Any]): The 'preprocessing' section of STUDY_CONFIG.

    Returns:
        Dict[str, Any]: Configuration dictionary for adfuller.
    """
    stat_config = preprocessing_config.get("stationarity_test", {})

    return {
        "regression": stat_config.get("regression", "ct"),  # Constant + Trend
        "autolag": stat_config.get("autolag", "AIC")        # Akaike Information Criterion
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 2: Execute ADF Test on Each Log-Level Series
# -------------------------------------------------------------------------------------------------------------------------------

def run_adf_test(series: pd.Series, config: Dict[str, Any]) -> ADFResult:
    """
    Executes the Augmented Dickey-Fuller test on a single time series.

    Hypotheses:
    H0: The series has a unit root (Non-Stationary).
    H1: The series is stationary.

    Args:
        series (pd.Series): The time series to test.
        config (Dict[str, Any]): Configuration for adfuller (regression, autolag).

    Returns:
        ADFResult: Structured test results.
    """
    # Drop NaNs (e.g., from differencing or missing data) before testing
    clean_series = series.dropna()

    # Run ADF
    # adfuller returns: (adf_stat, pvalue, usedlag, nobs, critical_values, icbest)
    result = adfuller(
        clean_series,
        regression=config["regression"],
        autolag=config["autolag"]
    )

    adf_stat = result[0]
    p_value = result[1]
    used_lag = result[2]
    n_obs = result[3]
    crit_values = result[4]

    # Decision Rule: Reject H0 if test_stat < critical_value_5%
    # Or simply if p_value < 0.05
    is_stationary = p_value < 0.05
    integration_order = "I(0)" if is_stationary else "I(1)"

    return ADFResult(
        series_name=series.name,
        test_statistic=adf_stat,
        p_value=p_value,
        used_lag=used_lag,
        n_obs=n_obs,
        critical_values=crit_values,
        is_stationary=is_stationary,
        integration_order=integration_order
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Step 3: Document Integration Order
# -------------------------------------------------------------------------------------------------------------------------------

def summarize_stationarity(results: List[ADFResult]) -> Tuple[pd.DataFrame, List[str]]:
    """
    Summarizes ADF test results into a DataFrame and identifies I(1) series.

    Args:
        results (List[ADFResult]): List of ADF test results.

    Returns:
        Tuple[pd.DataFrame, List[str]]: Summary DataFrame and list of I(1) series names.
    """
    summary_data = []
    i1_series = []

    # Iterate through 'results' object and extract key outputs
    for res in results:
        summary_data.append({
            "Series": res.series_name,
            "ADF Statistic": res.test_statistic,
            "p-value": res.p_value,
            "Critical Value (5%)": res.critical_values["5%"],
            "Result": "Stationary" if res.is_stationary else "Non-Stationary",
            "Order": res.integration_order
        })

        if not res.is_stationary:
            i1_series.append(res.series_name)

    summary_df = pd.DataFrame(summary_data)
    return summary_df, i1_series

# -------------------------------------------------------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def perform_stationarity_tests(
    log_data: Any,
    preprocessing_config: Dict[str, Any]
) -> StationarityTestResults:
    """
    Orchestrates the ADF stationarity testing for all log-transformed series.

    Executes:
    1. Configuration extraction.
    2. ADF testing for each series.
    3. Summarization and identification of non-stationary series.

    Args:
        log_data (LogTransformedData): Result object from Task 13.
        preprocessing_config (Dict[str, Any]): Preprocessing configuration.

    Returns:
        StationarityTestResults: Result object containing detailed and summary results.
    """
    df = log_data.df_log
    log_cols = log_data.log_columns

    print(f"Task 14: Running ADF tests on {len(log_cols)} series...")

    # Step 1: Config
    adf_config = get_adf_config(preprocessing_config)

    # Step 2: Execute Tests
    results = []
    for col in log_cols:
        series = df[col]
        res = run_adf_test(series, adf_config)
        results.append(res)

    # Step 3: Summarize
    summary_df, i1_series = summarize_stationarity(results)

    print("Task 14 Complete. Summary:")
    print(summary_df[["Series", "p-value", "Order"]])
    print(f"Identified {len(i1_series)} non-stationary (I(1)) series.")

    return StationarityTestResults(
        results=results,
        summary_df=summary_df,
        i1_series=i1_series
    )


In [None]:
# Task 15 — Construct Differenced Series and Finalize Integration Order

import pandas as pd
from dataclasses import dataclass
from typing import List, Dict, Any

# ==============================================================================
# Task 15: Construct Differenced Series and Finalize Integration Order
# ==============================================================================

@dataclass
class AnalysisSeriesMapping:
    """
    Container mapping specific time series to their designated analysis methods.

    Attributes:
        bai_perron_series (List[str]): Series for structural break detection (Log-Levels).
        hht_series (List[str]): Series for Hilbert-Huang Transform (Log-Levels).
        svar_series (List[str]): Series for SVAR (Differenced/Stationary).
    """
    bai_perron_series: List[str]
    hht_series: List[str]
    svar_series: List[str]

@dataclass
class FinalizedSeriesData:
    """
    Container for the final dataset ready for econometric analysis.

    Attributes:
        df_final (pd.DataFrame): DataFrame containing raw, log, and differenced series.
        diff_columns (List[str]): List of newly created differenced columns.
        stationarity_summary (pd.DataFrame): Summary of ADF tests on differenced series.
        series_mapping (AnalysisSeriesMapping): Mapping of series to methods.
    """
    df_final: pd.DataFrame
    diff_columns: List[str]
    stationarity_summary: pd.DataFrame
    series_mapping: AnalysisSeriesMapping

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 1: Compute First Differences
# -------------------------------------------------------------------------------------------------------------------------------

def compute_differences(df: pd.DataFrame, i1_series: List[str]) -> pd.DataFrame:
    """
    Computes the first difference for series identified as I(1).

    Args:
        df (pd.DataFrame): The DataFrame with log-transformed series.
        i1_series (List[str]): List of column names identified as non-stationary.

    Returns:
        pd.DataFrame: A copy of the DataFrame with new 'diff_' columns.
    """
    df_out = df.copy()

    for col in i1_series:
        new_col = f"diff_{col}"
        # Compute difference
        df_out[new_col] = df_out[col].diff()

    return df_out

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 2: Run ADF on Differenced Series
# -------------------------------------------------------------------------------------------------------------------------------

def verify_diff_stationarity(
    df: pd.DataFrame,
    diff_cols: List[str],
    adf_config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Runs ADF tests on the differenced series to confirm stationarity.

    Args:
        df (pd.DataFrame): The DataFrame with differenced series.
        diff_cols (List[str]): List of differenced column names.
        adf_config (Dict[str, Any]): Configuration for adfuller.

    Returns:
        pd.DataFrame: Summary of ADF results for differenced series.
    """
    results = []

    # Reuse the run_adf_test logic from Task 14 context (re-implemented here for standalone completeness)
    for col in diff_cols:
        series = df[col].dropna()

        # Run ADF
        res = adfuller(series, regression=adf_config["regression"], autolag=adf_config["autolag"])

        adf_stat = res[0]
        p_value = res[1]
        crit_5 = res[4]["5%"]
        is_stationary = p_value < 0.05

        results.append({
            "Series": col,
            "ADF Statistic": adf_stat,
            "p-value": p_value,
            "Critical Value (5%)": crit_5,
            "Result": "Stationary (I(0))" if is_stationary else "Non-Stationary (I(1))"
        })

        if not is_stationary:
            print(f"WARNING: Series '{col}' is still non-stationary after differencing (p={p_value:.4f}).")

    return pd.DataFrame(results)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Step 3: Assign Series to Analysis Methods
# -------------------------------------------------------------------------------------------------------------------------------

def map_series_to_methods(
    log_cols: List[str],
    diff_cols: List[str]
) -> AnalysisSeriesMapping:
    """
    Maps the available series to the specific analysis methods based on the study design.

    - Bai-Perron: Uses Log-Levels (mean-shift model).
    - HHT: Uses Log-Levels (BTC/ETH prices).
    - SVAR: Uses Differenced Log-Levels (Stationary volumes).

    Args:
        log_cols (List[str]): List of log-level columns.
        diff_cols (List[str]): List of differenced columns.

    Returns:
        AnalysisSeriesMapping: Structured mapping.
    """
    # 1. Bai-Perron: All log-level series (Volumes + Prices)
    # The paper applies structural break tests to the levels (regime shift in mean volume/price level)
    bp_series = log_cols

    # 2. HHT: Only BTC and ETH prices (Log-Levels)
    # We look for columns containing "Close"
    hht_series = [c for c in log_cols if "Close" in c]

    # 3. SVAR: Only Stablecoin Volumes (Differenced)
    # We look for columns containing "V_" or "Volume_" in the differenced list
    # The SVAR focuses on flow dynamics, so we use stationary inputs.
    svar_series = [c for c in diff_cols if "V_" in c or "Volume_" in c]

    return AnalysisSeriesMapping(
        bai_perron_series=bp_series,
        hht_series=hht_series,
        svar_series=svar_series
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def finalize_integration_order(
    log_data: Any,
    stationarity_results: Any,
    preprocessing_config: Dict[str, Any]
) -> FinalizedSeriesData:
    """
    Orchestrates the creation of differenced series and finalizes the dataset for analysis.

    Executes:
    1. Differencing of I(1) series.
    2. Verification of stationarity for differenced series.
    3. Mapping of series to analysis methods.

    Args:
        log_data (LogTransformedData): Result object from Task 13.
        stationarity_results (StationarityTestResults): Result object from Task 14.
        preprocessing_config (Dict[str, Any]): Preprocessing configuration.

    Returns:
        FinalizedSeriesData: The final dataset and metadata.
    """
    df_input = log_data.df_log
    i1_series = stationarity_results.i1_series

    print(f"Task 15: Differencing {len(i1_series)} non-stationary series...")

    # Step 1: Difference
    df_diff = compute_differences(df_input, i1_series)
    diff_cols = [f"diff_{c}" for c in i1_series]

    # Step 2: Verify Stationarity
    # Reconstruct ADF config
    adf_config = {
        "regression": preprocessing_config.get("stationarity_test", {}).get("regression", "ct"),
        "autolag": preprocessing_config.get("stationarity_test", {}).get("autolag", "AIC")
    }

    print("Task 15: Verifying stationarity of differenced series...")
    summary_df = verify_diff_stationarity(df_diff, diff_cols, adf_config)

    # Step 3: Map
    mapping = map_series_to_methods(log_data.log_columns, diff_cols)

    print("Task 15 Complete. Series mapped to methods:")
    print(f"  - Bai-Perron: {len(mapping.bai_perron_series)} series")
    print(f"  - HHT: {len(mapping.hht_series)} series")
    print(f"  - SVAR: {len(mapping.svar_series)} series")

    return FinalizedSeriesData(
        df_final=df_diff,
        diff_columns=diff_cols,
        stationarity_summary=summary_df,
        series_mapping=mapping
    )


In [None]:
# Task 16 — Apply Bai–Perron to EOA–EOA Blockchain Series

# ==============================================================================
# Task 16: Apply Bai–Perron to EOA–EOA Blockchain Series
# ==============================================================================

@dataclass
class BreakPointResult:
    """
    Container for the results of a structural break test on a single series.

    Attributes:
        series_name (str): Name of the series.
        optimal_k (int): The optimal number of breaks selected by BIC.
        break_indices (List[int]): Indices of the detected break points (0-based).
        break_dates (List[pd.Timestamp]): Calendar dates corresponding to the break indices.
        global_ssr (float): Sum of Squared Residuals for the optimal partition.
        bic (float): Bayesian Information Criterion for the optimal model.
        sup_f_stat (float): The F-statistic testing 0 breaks vs optimal_k breaks.
        regime_means (List[float]): The mean values for each regime.
    """
    series_name: str
    optimal_k: int
    break_indices: List[int]
    break_dates: List[pd.Timestamp]
    global_ssr: float
    bic: float
    sup_f_stat: float
    regime_means: List[float]

@dataclass
class BaiPerronResults:
    """
    Container for the results of Bai-Perron tests on EOA-EOA series.

    Attributes:
        usdt_result (BreakPointResult): Results for USDT EOA-EOA volume.
        usdc_result (BreakPointResult): Results for USDC EOA-EOA volume.
    """
    usdt_result: BreakPointResult
    usdc_result: BreakPointResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 1: Specify Mean-Shift Model (Dynamic Programming Implementation)
# -------------------------------------------------------------------------------------------------------------------------------

def compute_ssr_matrix(y: np.ndarray, min_size: int) -> np.ndarray:
    """
    Precomputes the Sum of Squared Residuals (SSR) for all admissible segments.

    This function calculates the SSR for every possible contiguous segment y[i:j]
    where the segment length (j-i) is at least `min_size`. It uses vectorized
    cumulative sums to achieve O(T^2) computational complexity, which is essential
    for the dynamic programming step in structural break detection.

    The SSR for a segment is calculated as:
    SSR(i, j) = sum(y[i:j]^2) - (sum(y[i:j])^2) / (j - i)

    Args:
        y (np.ndarray): The time series array of shape (T,).
        min_size (int): Minimum segment length (h).

    Returns:
        np.ndarray: A (T+1, T+1) matrix where entry [i, j] is the SSR for segment y[i:j].
                    Entries where segment length < min_size are infinity.
    """
    T = len(y)
    # Initialize matrix with infinity to represent invalid segments
    ssr_mat = np.full((T + 1, T + 1), np.inf)

    # Precompute cumulative sums of y and y^2 for O(1) segment sum retrieval
    # cumsum[k] is sum(y[0:k])
    # We prepend 0.0 to make indexing easier: cumsum[j] - cumsum[i] = sum(y[i:j])
    cum_y = np.concatenate(([0.0], np.cumsum(y)))
    cum_y2 = np.concatenate(([0.0], np.cumsum(y ** 2)))

    # Iterate over all possible start points i
    for i in range(T):
        # Iterate over all possible end points j
        # Segment must be at least min_size
        # Vectorized calculation for all j > i + min_size

        # Valid j range: i + min_size to T (inclusive)
        j_indices = np.arange(i + min_size, T + 1)

        if len(j_indices) == 0:
            continue

        # Calculate sums for segments y[i:j] using precomputed cumulative sums
        sum_y = cum_y[j_indices] - cum_y[i]
        sum_y2 = cum_y2[j_indices] - cum_y2[i]
        n = j_indices - i

        # Calculate SSR using the variance formula: sum(x^2) - (sum(x)^2)/n
        # SSR = sum(y^2) - (sum(y)^2) / n
        ssr_vals = sum_y2 - (sum_y ** 2) / n

        # Store in matrix
        # Note: ssr_vals corresponds to j_indices
        ssr_mat[i, j_indices] = ssr_vals

    return ssr_mat

def bai_perron_dp(
    y: np.ndarray,
    max_breaks: int,
    min_size: int
) -> Dict[int, Dict[str, Any]]:
    """
    Executes the Bai-Perron dynamic programming algorithm to find optimal partitions.

    This function implements the core dynamic programming recursion described in
    Bai & Perron (2003). It finds the global minimum Sum of Squared Residuals (SSR)
    for every number of breaks k from 0 to `max_breaks`.

    The recursion is:
    SSR(T_{k,T}) = min_{m*h <= j <= T-h} [ SSR(T_{k-1, j}) + SSR(j, T) ]

    Args:
        y (np.ndarray): The time series array of shape (T,).
        max_breaks (int): Maximum number of breaks allowed (m).
        min_size (int): Minimum segment size (h).

    Returns:
        Dict[int, Dict]: A dictionary mapping k (number of breaks) to results:
            {
                k: {
                    'ssr': float,       # The global minimum SSR for k breaks
                    'breaks': List[int] # The list of break indices (0-based)
                }
            }
    """
    T = len(y)
    # Precompute the SSR matrix for all segments
    ssr_mat = compute_ssr_matrix(y, min_size)

    # dp[k][t] stores the min SSR for partitioning y[0:t] with k breaks
    # partition_map[k][t] stores the index of the *last* break for backtracking
    dp = np.full((max_breaks + 1, T + 1), np.inf)
    partition_map = np.zeros((max_breaks + 1, T + 1), dtype=int)

    # Base case: 0 breaks (1 segment)
    # dp[0][t] is simply SSR(0, t) for the single segment y[0:t]
    for t in range(min_size, T + 1):
        dp[0][t] = ssr_mat[0, t]

    # Recursive step: 1 to max_breaks
    for k in range(1, max_breaks + 1):
        # t must be at least (k+1)*min_size to accommodate k+1 segments
        for t in range((k + 1) * min_size, T + 1):

            # We want to find split point j such that:
            # 1. Previous partition y[0:j] has k-1 breaks (valid if j >= k*min_size)
            # 2. Last segment y[j:t] has length >= min_size (valid if j <= t - min_size)

            # Search range for j (the location of the k-th break)
            j_start = k * min_size
            j_end = t - min_size # inclusive

            if j_start > j_end:
                continue

            # Vectorized search for optimal j
            j_candidates = np.arange(j_start, j_end + 1)

            # Cost = SSR of previous partition (k-1 breaks up to j) + SSR of new segment (j to t)
            costs = dp[k-1, j_candidates] + ssr_mat[j_candidates, t]

            # Find the j that minimizes the total SSR
            min_idx = np.argmin(costs)
            min_cost = costs[min_idx]
            best_j = j_candidates[min_idx]

            dp[k][t] = min_cost
            partition_map[k][t] = best_j

    # Reconstruct optimal partitions for each k by backtracking
    results = {}
    for k in range(max_breaks + 1):
        ssr = dp[k][T]
        if np.isinf(ssr):
            continue

        breaks = []
        if k > 0:
            curr = T
            for _ in range(k):
                # Find where the last break occurred for the current end point
                prev = partition_map[k - len(breaks)][curr]
                breaks.append(prev)
                curr = prev
            breaks.reverse()

        results[k] = {
            'ssr': ssr,
            'breaks': breaks
        }

    return results

def select_optimal_model(
    y: np.ndarray,
    dp_results: Dict[int, Dict[str, Any]]
) -> int:
    """
    Selects the optimal number of breaks using the Bayesian Information Criterion (BIC).

    The BIC penalizes model complexity to prevent overfitting. For a structural change
    model with k breaks (k+1 regimes), the number of parameters is p*(k+1) + k,
    where p is the number of coefficients per regime. For a pure mean-shift model, p=1.

    BIC(k) = T * ln(SSR(k)/T) + (k + 1)(q + 1) * ln(T)
    Here q=0 (only intercept changes), so parameters = k+1 means + k break dates?
    Bai-Perron (2003) suggest penalty term: p*ln(T) where p is number of parameters.
    For pure mean shift: p = (k+1) means + k break dates = 2k + 1.

    Args:
        y (np.ndarray): The time series array.
        dp_results (Dict): Results from the DP algorithm containing SSRs for each k.

    Returns:
        int: The optimal number of breaks (k) that minimizes the BIC.
    """
    T = len(y)
    best_k = 0
    min_bic = np.inf

    for k, res in dp_results.items():
        ssr = res['ssr']
        if ssr <= 1e-9: # Avoid log(0) or negative SSR due to precision
            continue

        # Number of parameters: (k+1) means + k break dates
        n_params = (k + 1) + k

        # BIC formula: T * ln(MSE) + penalty
        bic = T * np.log(ssr / T) + n_params * np.log(T)

        if bic < min_bic:
            min_bic = bic
            best_k = k

    return best_k

def compute_f_statistic(
    y: np.ndarray,
    ssr_restricted: float,
    ssr_unrestricted: float,
    k: int
) -> float:
    """
    Computes the F-statistic for testing the null hypothesis of 0 breaks against
    the alternative of k breaks.

    The statistic is defined as:
    F = ((SSR_0 - SSR_k) / k) / (SSR_k / (T - (k+1) - k))

    Degrees of freedom:
    - Numerator: k (number of additional breaks * parameters changing per break).
      For mean shift, q=1 parameter changes per break. So numerator df = k.
    - Denominator: T - n_params_unrestricted = T - (2k + 1)

    Args:
        y (np.ndarray): The time series array.
        ssr_restricted (float): SSR for the restricted model (0 breaks).
        ssr_unrestricted (float): SSR for the unrestricted model (k breaks).
        k (int): Number of breaks in the alternative hypothesis.

    Returns:
        float: The computed F-statistic. Returns 0.0 if k=0 or denominator is 0.
    """
    if k == 0:
        return 0.0

    T = len(y)
    # Number of parameters in unrestricted model: (k+1) means + k break dates
    n_params = 2 * k + 1
    df_resid = T - n_params

    # F-statistic calculation
    numerator = (ssr_restricted - ssr_unrestricted) / k # q=1
    denominator = ssr_unrestricted / df_resid

    if denominator == 0:
        return 0.0

    return numerator / denominator

def compute_regime_means(y: np.ndarray, breaks: List[int]) -> List[float]:
    """
    Computes the mean value for each regime defined by the break indices.

    Args:
        y (np.ndarray): The time series array.
        breaks (List[int]): List of break indices (start of new regimes).

    Returns:
        List[float]: List of mean values for each segment [break_i, break_{i+1}).
    """
    means = []
    start = 0
    # Add T to breaks to handle the last segment uniformly
    boundaries = breaks + [len(y)]

    for end in boundaries:
        segment = y[start:end]
        if len(segment) > 0:
            means.append(np.mean(segment))
        else:
            means.append(0.0) # Should not happen with min_size constraint
        start = end

    return means

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Step 2 & 3: Apply to EOA-EOA Series
# -------------------------------------------------------------------------------------------------------------------------------

def analyze_series_breaks_robust(
    series: pd.Series,
    config: Any
) -> BreakPointResult:
    """
    Runs the robust Bai-Perron analysis on a single pandas Series.

    Args:
        series (pd.Series): The time series (with DatetimeIndex or Date column).
        config (ValidatedAnalysisConfig): Analysis configuration.

    Returns:
        BreakPointResult: The detected breaks and statistics.
    """
    y = series.values
    T = len(y)

    # Parameters
    max_breaks = config.bp_max_breaks
    epsilon = config.bp_trimming
    min_size = int(np.floor(epsilon * T))

    # 1. Run Dynamic Programming
    dp_results = bai_perron_dp(y, max_breaks, min_size)

    # 2. Select Optimal Model
    best_k = select_optimal_model(y, dp_results)

    # 3. Extract Results for Optimal k
    best_res = dp_results[best_k]
    breaks = best_res['breaks']
    ssr_k = best_res['ssr']

    # 4. Compute Statistics
    ssr_0 = dp_results[0]['ssr']

    # If best_k is 0, we still want to report the SupF for 1 break to show if any break was significant
    # But strictly, SupF usually refers to the test statistic for the selected model vs null.
    # If selected model is 0 breaks, F is 0.
    # However, to align with the paper which reports SupF for the detected break,
    # if best_k > 0, we compute F(0 vs k).
    # If best_k == 0, we report 0.

    f_stat = compute_f_statistic(y, ssr_0, ssr_k, best_k)

    # 5. Compute Regime Means
    means = compute_regime_means(y, breaks)

    # 6. Map Dates
    # Break index 'b' means the break happens *at* index b (start of new regime).
    # The date corresponding to index b is the start date of the new regime.
    # The "break date" is usually the last date of the old regime or first of new.
    # We will return the date at index b (first date of new regime).
    break_dates = [series.index[b] for b in breaks]

    # Recalculate BIC for reporting
    n_params = 2 * best_k + 1
    bic = T * np.log(ssr_k / T) + n_params * np.log(T)

    return BreakPointResult(
        series_name=series.name,
        optimal_k=best_k,
        break_indices=breaks,
        break_dates=break_dates,
        global_ssr=ssr_k,
        bic=bic,
        sup_f_stat=f_stat,
        regime_means=means
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_bai_perron_eoa(
    finalized_data: Any,
    analysis_config: Any
) -> BaiPerronResults:
    """
    Orchestrates the Bai-Perron structural break detection for EOA-EOA series.

    Executes:
    1. Extraction of USDT and USDC EOA-EOA log-volume series.
    2. Application of the mean-shift break detection algorithm.
    3. Computation of SupF statistics.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration from Task 1.

    Returns:
        BaiPerronResults: Container with results for both tokens.
    """
    df = finalized_data.df_final

    # Ensure Date is index for easy mapping
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    print("Task 16: Running Bai-Perron on EOA-EOA series...")

    # 1. USDT
    col_usdt = "log_V_EOA_EOA_USDT"
    print(f"  - Analyzing {col_usdt}...")
    res_usdt = analyze_series_breaks_robust(df_indexed[col_usdt].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_usdt.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_usdt.break_dates]}")
    print(f"    -> SupF: {res_usdt.sup_f_stat:.4f}")

    # 2. USDC
    col_usdc = "log_V_EOA_EOA_USDC"
    print(f"  - Analyzing {col_usdc}...")
    res_usdc = analyze_series_breaks_robust(df_indexed[col_usdc].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_usdc.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_usdc.break_dates]}")
    print(f"    -> SupF: {res_usdc.sup_f_stat:.4f}")

    return BaiPerronResults(
        usdt_result=res_usdt,
        usdc_result=res_usdc
    )


In [None]:
# Task 17 — Apply Bai–Perron to SC–SC Blockchain Series

# ==============================================================================
# Task 17: Apply Bai–Perron to SC–SC Blockchain Series
# ==============================================================================

@dataclass
class SCBaiPerronResults:
    """
    Container for the results of Bai-Perron tests on SC-SC series.

    Attributes:
        usdt_result (BreakPointResult): Results for USDT SC-SC volume.
        usdc_result (BreakPointResult): Results for USDC SC-SC volume.
    """
    usdt_result: Any # Type: BreakPointResult
    usdc_result: Any # Type: BreakPointResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Step 1 & 2: Apply to SC-SC Series
# -------------------------------------------------------------------------------------------------------------------------------

# We reuse the analyze_series_breaks_robust function from Task 16.
# No new low-level logic is needed, just the application to the new series.

# -------------------------------------------------------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_bai_perron_sc(
    finalized_data: Any,
    analysis_config: Any
) -> SCBaiPerronResults:
    """
    Orchestrates the Bai-Perron structural break detection for SC-SC series.

    Executes:
    1. Extraction of USDT and USDC SC-SC log-volume series.
    2. Application of the mean-shift break detection algorithm.
    3. Computation of SupF statistics.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration from Task 1.

    Returns:
        SCBaiPerronResults: Container with results for both tokens.
    """
    df = finalized_data.df_final

    # Ensure Date is index for easy mapping
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    print("Task 17: Running Bai-Perron on SC-SC series...")

    # 1. USDT
    col_usdt = "log_V_SC_SC_USDT"
    print(f"  - Analyzing {col_usdt}...")
    # Reuse the robust analysis function from Task 16
    res_usdt = analyze_series_breaks_robust(df_indexed[col_usdt].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_usdt.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_usdt.break_dates]}")
    print(f"    -> SupF: {res_usdt.sup_f_stat:.4f}")

    # 2. USDC
    col_usdc = "log_V_SC_SC_USDC"
    print(f"  - Analyzing {col_usdc}...")
    res_usdc = analyze_series_breaks_robust(df_indexed[col_usdc].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_usdc.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_usdc.break_dates]}")
    print(f"    -> SupF: {res_usdc.sup_f_stat:.4f}")

    return SCBaiPerronResults(
        usdt_result=res_usdt,
        usdc_result=res_usdc
    )


In [None]:
# Task 18 — Apply Bai–Perron to Exchange Volumes

# ==============================================================================
# Task 18: Apply Bai–Perron to Exchange Volumes
# ==============================================================================

@dataclass
class ExchangeBaiPerronResults:
    """
    Container for the results of Bai-Perron tests on Exchange Volume series.

    Attributes:
        usdt_result (BreakPointResult): Results for USDT Exchange volume.
        usdc_result (BreakPointResult): Results for USDC Exchange volume.
    """
    usdt_result: Any # Type: BreakPointResult
    usdc_result: Any # Type: BreakPointResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Step 1 & 2: Apply to Exchange Series
# -------------------------------------------------------------------------------------------------------------------------------

# We reuse the analyze_series_breaks_robust function from Task 16.
# No new low-level logic is needed, just the application to the new series.

# -------------------------------------------------------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_bai_perron_exchange(
    finalized_data: Any,
    analysis_config: Any
) -> ExchangeBaiPerronResults:
    """
    Orchestrates the Bai-Perron structural break detection for Exchange Volume series.

    Executes:
    1. Extraction of USDT and USDC Exchange log-volume series.
    2. Application of the mean-shift break detection algorithm.
    3. Computation of SupF statistics.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration from Task 1.

    Returns:
        ExchangeBaiPerronResults: Container with results for both tokens.
    """
    df = finalized_data.df_final

    # Ensure Date is index for easy mapping
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    print("Task 18: Running Bai-Perron on Exchange Volume series...")

    # 1. USDT
    col_usdt = "log_Volume_USDT_USD"
    print(f"  - Analyzing {col_usdt}...")
    # Reuse the robust analysis function from Task 16
    res_usdt = analyze_series_breaks_robust(df_indexed[col_usdt].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_usdt.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_usdt.break_dates]}")
    print(f"    -> SupF: {res_usdt.sup_f_stat:.4f}")

    # 2. USDC
    col_usdc = "log_Volume_USDC_USD"
    print(f"  - Analyzing {col_usdc}...")
    res_usdc = analyze_series_breaks_robust(df_indexed[col_usdc].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_usdc.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_usdc.break_dates]}")
    print(f"    -> SupF: {res_usdc.sup_f_stat:.4f}")

    return ExchangeBaiPerronResults(
        usdt_result=res_usdt,
        usdc_result=res_usdc
    )


In [None]:
# Task 19 — Apply Bai–Perron to BTC and ETH Prices

# ==============================================================================
# Task 19: Apply Bai–Perron to BTC and ETH Prices
# ==============================================================================

@dataclass
class PriceBaiPerronResults:
    """
    Container for the results of Bai-Perron tests on Cryptocurrency Price series.

    Attributes:
        btc_result (BreakPointResult): Results for BTC Price.
        eth_result (BreakPointResult): Results for ETH Price.
    """
    btc_result: Any # Type: BreakPointResult
    eth_result: Any # Type: BreakPointResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Step 1 & 2: Apply to Price Series
# -------------------------------------------------------------------------------------------------------------------------------

# We reuse the analyze_series_breaks_robust function from Task 16.
# No new low-level logic is needed, just the application to the new series.

# -------------------------------------------------------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def run_bai_perron_prices(
    finalized_data: Any,
    analysis_config: Any
) -> PriceBaiPerronResults:
    """
    Orchestrates the Bai-Perron structural break detection for BTC and ETH Price series.

    Executes:
    1. Extraction of BTC and ETH log-price series.
    2. Application of the mean-shift break detection algorithm.
    3. Computation of SupF statistics.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration from Task 1.

    Returns:
        PriceBaiPerronResults: Container with results for both assets.
    """
    df = finalized_data.df_final

    # Ensure Date is index for easy mapping
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    print("Task 19: Running Bai-Perron on Price series...")

    # 1. BTC
    col_btc = "log_Close_BTC_USD"
    print(f"  - Analyzing {col_btc}...")
    # Reuse the robust analysis function from Task 16
    res_btc = analyze_series_breaks_robust(df_indexed[col_btc].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_btc.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_btc.break_dates]}")
    print(f"    -> SupF: {res_btc.sup_f_stat:.4f}")

    # 2. ETH
    col_eth = "log_Close_ETH_USD"
    print(f"  - Analyzing {col_eth}...")
    res_eth = analyze_series_breaks_robust(df_indexed[col_eth].dropna(), analysis_config)
    print(f"    -> Optimal k: {res_eth.optimal_k}")
    print(f"    -> Breaks at: {[d.date() for d in res_eth.break_dates]}")
    print(f"    -> SupF: {res_eth.sup_f_stat:.4f}")

    return PriceBaiPerronResults(
        btc_result=res_btc,
        eth_result=res_eth
    )


In [None]:
# Task 20 — Design Structural Break Orchestrator

# ==============================================================================
# Task 20: Design Structural Break Orchestrator
# ==============================================================================

@dataclass
class StructuralBreakOrchestratorResult:
    """
    Container for the results of structural break detection across all series.

    Attributes:
        results (Dict[str, BreakPointResult]): Dictionary mapping series name to break results.
        failed_series (Dict[str, str]): Dictionary mapping failed series names to error messages.
    """
    results: Dict[str, Any] # Type: BreakPointResult
    failed_series: Dict[str, str]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 1 & 2: Define Orchestrator Inputs and Internal Callables
# -------------------------------------------------------------------------------------------------------------------------------

def callable_bai_perron(series: pd.Series, config: Any) -> Any:
    """
    Internal callable to execute Bai-Perron analysis on a single series.

    Wraps the robust analysis function from Task 16.

    Args:
        series (pd.Series): The time series.
        config (ValidatedAnalysisConfig): Analysis configuration.

    Returns:
        BreakPointResult: The analysis result.
    """
    # Reuse the robust implementation from Task 16
    # We assume analyze_series_breaks_robust is available in the environment
    return analyze_series_breaks_robust(series, config)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 20, Step 3: Define Orchestrator Control Flow
# -------------------------------------------------------------------------------------------------------------------------------

def orchestrate_structural_breaks(
    finalized_data: Any,
    analysis_config: Any
) -> StructuralBreakOrchestratorResult:
    """
    Orchestrates the structural break detection for all configured series.

    Iterates through the series mapped for Bai-Perron analysis in Task 15,
    executes the detection algorithm, and aggregates results.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration from Task 1.

    Returns:
        StructuralBreakOrchestratorResult: Aggregated results.
    """
    df = finalized_data.df_final

    # Ensure Date index
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    # Get target series from mapping
    target_series = finalized_data.series_mapping.bai_perron_series

    results = {}
    failures = {}

    print(f"Task 20: Orchestrating structural breaks for {len(target_series)} series...")

    for col in target_series:
        if col not in df_indexed.columns:
            failures[col] = "Column missing from DataFrame"
            print(f"  - WARNING: Series '{col}' missing.")
            continue

        try:
            print(f"  - Processing {col}...")
            series = df_indexed[col].dropna()

            if len(series) == 0:
                failures[col] = "Series is empty after dropping NaNs"
                continue

            # Execute Analysis
            res = callable_bai_perron(series, analysis_config)
            results[col] = res

            # Log brief result
            breaks_str = ", ".join([str(d.date()) for d in res.break_dates])
            print(f"    -> Found {len(res.break_dates)} breaks: {breaks_str}")

        except Exception as e:
            failures[col] = str(e)
            print(f"    -> FAILED: {e}")

    print(f"Task 20 Complete. Successful: {len(results)}, Failed: {len(failures)}")

    return StructuralBreakOrchestratorResult(
        results=results,
        failed_series=failures
    )


In [None]:
# Task 21 — Execute AAFT Robustness for Structural Breaks

# ==============================================================================
# Task 21: Execute AAFT Robustness for Structural Breaks
# ==============================================================================

@dataclass
class RobustnessResult:
    """
    Container for the results of AAFT surrogate testing on a single series.

    Attributes:
        series_name (str): Name of the series tested.
        observed_sup_f (float): The SupF statistic from the original series.
        surrogate_sup_f_stats (List[float]): List of SupF statistics from surrogates.
        p_value (float): Empirical p-value.
        n_surrogates (int): Number of surrogates generated.
    """
    series_name: str
    observed_sup_f: float
    surrogate_sup_f_stats: List[float]
    p_value: float
    n_surrogates: int

@dataclass
class AAFTRobustnessResults:
    """
    Container for the results of AAFT robustness testing across all series.

    Attributes:
        results (Dict[str, RobustnessResult]): Dictionary mapping series name to robustness results.
    """
    results: Dict[str, RobustnessResult]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 1: Define AAFT Surrogate Generation Algorithm
# -------------------------------------------------------------------------------------------------------------------------------

def generate_aaft_surrogates(series: np.ndarray, n_surrogates: int, seed: Optional[int] = None) -> np.ndarray:
    """
    Generates Amplitude Adjusted Fourier Transform (AAFT) surrogates.

    Algorithm:
    1. Create a Gaussian time series y(t) with the same rank structure as the original series x(t).
    2. Compute the FFT of y(t).
    3. Randomize the phases of the FFT coefficients while preserving the power spectrum.
    4. Perform inverse FFT to get y'(t).
    5. Remap the values of x(t) to the ranks of y'(t) to preserve the amplitude distribution.

    Args:
        series (np.ndarray): The original time series (1D array).
        n_surrogates (int): Number of surrogates to generate.
        seed (Optional[int]): Random seed for reproducibility.

    Returns:
        np.ndarray: A 2D array of shape (n_surrogates, len(series)) containing the surrogates.
    """
    if seed is not None:
        np.random.seed(seed)

    n = len(series)
    surrogates = np.zeros((n_surrogates, n))

    # Sort original series for remapping
    sorted_original = np.sort(series)

    # Rank ordering of original series (not strictly needed for AAFT step 1,
    # but we need the sorted values for step 5)

    for i in range(n_surrogates):
        # Step 1: Gaussianization
        # Generate Gaussian noise
        white_noise = np.random.randn(n)

        # Reorder Gaussian noise to match the rank order of the original series
        # This creates a Gaussian series with the same temporal correlation structure (roughly)
        # Actually, standard AAFT starts with random Gaussian noise, sorts it to match original ranks.
        # Let's follow the standard Schreiber & Schmitz (1996) algorithm more precisely if needed,
        # or the simple AAFT (Theiler et al. 1992).
        # Theiler's AAFT:
        # 1. Gaussianize: r(t) = sort(gaussian)[rank(x(t))]
        # 2. Phase randomize r(t) -> s(t)
        # 3. Remap: x'(t) = sort(x)[rank(s(t))]

        # 1. Gaussianize
        ranks_original = np.argsort(np.argsort(series)) # Ranks 0..n-1
        gaussian_sorted = np.sort(np.random.randn(n))
        y_t = gaussian_sorted[ranks_original]

        # 2. Phase Randomization of y_t
        # Use rfft for efficiency and symmetry handling
        rfft_coeffs = np.fft.rfft(y_t)

        # Random phases in [0, 2pi]
        # We only randomize phases for positive frequencies (indices 1 to n//2)
        # DC component (0) and Nyquist (if n is even) must remain real.
        random_phases = np.random.uniform(0, 2*np.pi, len(rfft_coeffs))
        random_phases[0] = 0 # DC component phase is 0
        if n % 2 == 0:
            random_phases[-1] = 0 # Nyquist phase is 0/pi (keep real)

        # Apply phase shift: z' = |z| * exp(i * (angle(z) + phi))
        magnitudes = np.abs(rfft_coeffs)
        new_coeffs = magnitudes * np.exp(1j * random_phases)

        # Inverse FFT
        s_t = np.fft.irfft(new_coeffs, n=n)

        # 3. Remap to original amplitude distribution
        ranks_s = np.argsort(np.argsort(s_t))
        surrogate = sorted_original[ranks_s]

        surrogates[i, :] = surrogate

    return surrogates

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Step 2 & 3: Generate Surrogates and Compute Statistics
# -------------------------------------------------------------------------------------------------------------------------------

def compute_surrogate_stats(
    series: pd.Series,
    observed_sup_f: float,
    config: Any,
    n_surrogates: int
) -> RobustnessResult:
    """
    Generates surrogates and computes the empirical p-value for the SupF statistic.

    Args:
        series (pd.Series): The original time series.
        observed_sup_f (float): The observed SupF statistic.
        config (ValidatedAnalysisConfig): Analysis configuration (for Bai-Perron params).
        n_surrogates (int): Number of surrogates to generate.

    Returns:
        RobustnessResult: The robustness test results.
    """
    y = series.values
    T = len(y)

    # Parameters for Bai-Perron
    max_breaks = config.bp_max_breaks
    epsilon = config.bp_trimming
    min_size = int(np.floor(epsilon * T))

    # Generate Surrogates
    surrogates = generate_aaft_surrogates(y, n_surrogates, seed=42)

    surrogate_stats = []

    # We need to import the Bai-Perron logic.
    # Assuming bai_perron_dp and compute_f_statistic are available from Task 16 context.
    for i in range(n_surrogates):
        y_surr = surrogates[i]

        # Run DP
        dp_res = bai_perron_dp(y_surr, max_breaks, min_size)

        # Select optimal k (using same BIC logic)
        best_k = select_optimal_model(y_surr, dp_res)

        # Compute SupF (F-stat for 0 vs best_k)
        ssr_0 = dp_res[0]['ssr']
        ssr_k = dp_res[best_k]['ssr']

        f_stat = compute_f_statistic(y_surr, ssr_0, ssr_k, best_k)
        surrogate_stats.append(f_stat)

    # Compute p-value
    # p = count(surr >= obs) / N
    count_exceed = sum(1 for s in surrogate_stats if s >= observed_sup_f)
    p_value = count_exceed / n_surrogates

    return RobustnessResult(
        series_name=series.name,
        observed_sup_f=observed_sup_f,
        surrogate_sup_f_stats=surrogate_stats,
        p_value=p_value,
        n_surrogates=n_surrogates
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def execute_aaft_robustness(
    finalized_data: Any,
    break_results: Any,
    analysis_config: Any
) -> AAFTRobustnessResults:
    """
    Orchestrates the AAFT robustness testing for all series with detected breaks.

    Iterates through the results from Task 20 (StructuralBreakOrchestratorResult),
    and for each series, runs the surrogate analysis.

    Args:
        finalized_data (FinalizedSeriesData): The dataset.
        break_results (StructuralBreakOrchestratorResult): Results from Task 20.
        analysis_config (ValidatedAnalysisConfig): Configuration.

    Returns:
        AAFTRobustnessResults: Container with robustness results.
    """
    df = finalized_data.df_final
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    results = {}
    n_surrogates = 1000 # As per config/paper

    print(f"Task 21: Executing AAFT robustness tests ({n_surrogates} iterations)...")

    for series_name, res in break_results.results.items():
        print(f"  - Testing {series_name} (Observed SupF: {res.sup_f_stat:.4f})...")

        series = df_indexed[series_name].dropna()

        # Execute AAFT
        robustness = compute_surrogate_stats(
            series,
            res.sup_f_stat,
            analysis_config,
            n_surrogates
        )

        results[series_name] = robustness
        print(f"    -> p-value: {robustness.p_value:.4f}")

    return AAFTRobustnessResults(results=results)


In [None]:
# Task 22 — Apply EMD to Decompose BTC and ETH Log Prices

# ==============================================================================
# Task 22: Apply EMD to Decompose BTC and ETH Log Prices
# ==============================================================================

@dataclass
class EMDResult:
    """
    Container for the results of Empirical Mode Decomposition on a single series.

    Attributes:
        series_name (str): Name of the decomposed series.
        imfs (List[np.ndarray]): List of Intrinsic Mode Functions (arrays).
        residue (np.ndarray): The final monotonic residue.
        n_imfs (int): Number of IMFs extracted.
        reconstruction_error (float): Max absolute error between original and sum(IMFs)+residue.
    """
    series_name: str
    imfs: List[np.ndarray]
    residue: np.ndarray
    n_imfs: int
    reconstruction_error: float

@dataclass
class EMDDecompositionResults:
    """
    Container for the EMD results of BTC and ETH prices.

    Attributes:
        btc_result (EMDResult): EMD results for BTC.
        eth_result (EMDResult): EMD results for ETH.
    """
    btc_result: EMDResult
    eth_result: EMDResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 1: Specify EMD Algorithm (Self-Contained Implementation)
# -------------------------------------------------------------------------------------------------------------------------------

def get_envelopes(y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Constructs upper and lower envelopes using cubic spline interpolation of extrema.

    Args:
        y (np.ndarray): The input signal.

    Returns:
        Tuple[np.ndarray, np.ndarray]: (upper_envelope, lower_envelope).
    """
    n = len(y)
    x = np.arange(n)

    # Find local maxima and minima
    # argrelextrema returns indices
    max_idx = argrelextrema(y, np.greater)[0]
    min_idx = argrelextrema(y, np.less)[0]

    # Add endpoints to ensure envelopes cover the full range
    # We assume endpoints are extrema to bound the spline
    if len(max_idx) == 0 or max_idx[0] != 0:
        max_idx = np.r_[0, max_idx]
    if len(max_idx) == 0 or max_idx[-1] != n - 1:
        max_idx = np.r_[max_idx, n - 1]

    if len(min_idx) == 0 or min_idx[0] != 0:
        min_idx = np.r_[0, min_idx]
    if len(min_idx) == 0 or min_idx[-1] != n - 1:
        min_idx = np.r_[min_idx, n - 1]

    # Interpolate
    # CubicSpline requires strictly increasing x
    upper_spline = CubicSpline(max_idx, y[max_idx], bc_type='natural')
    lower_spline = CubicSpline(min_idx, y[min_idx], bc_type='natural')

    return upper_spline(x), lower_spline(x)

def is_monotonic(y: np.ndarray) -> bool:
    """
    Checks if a signal is monotonic (no local extrema).

    Args:
        y (np.ndarray): The signal.

    Returns:
        bool: True if monotonic.
    """
    max_idx = argrelextrema(y, np.greater)[0]
    min_idx = argrelextrema(y, np.less)[0]
    return (len(max_idx) + len(min_idx)) == 0

def sift_one_imf(
    signal: np.ndarray,
    sd_threshold: float = 0.2,
    max_sifts: int = 100
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Performs the sifting process to extract one IMF.

    Args:
        signal (np.ndarray): The current residual signal.
        sd_threshold (float): Standard deviation threshold for stopping sifting.
        max_sifts (int): Maximum number of sifts allowed.

    Returns:
        Tuple[np.ndarray, np.ndarray]: (extracted_imf, remaining_signal).
    """
    h = signal.copy()

    for _ in range(max_sifts):
        # Check if h is monotonic (residue)
        if is_monotonic(h):
            break

        # Get envelopes
        upper, lower = get_envelopes(h)
        mean_env = (upper + lower) / 2.0

        # Update
        h_prev = h.copy()
        h = h - mean_env

        # Check convergence (Cauchy-type convergence)
        # SD = sum((h_prev - h)^2) / sum(h_prev^2)
        # Avoid division by zero
        denom = np.sum(h_prev ** 2)
        if denom == 0:
            break

        sd = np.sum((h_prev - h) ** 2) / denom

        if sd < sd_threshold:
            break

    return h, signal - h

def empirical_mode_decomposition(
    series: np.ndarray,
    max_imfs: int,
    stop_sd: float
) -> Tuple[List[np.ndarray], np.ndarray]:
    """
    Executes the Empirical Mode Decomposition algorithm.

    Args:
        series (np.ndarray): The input time series.
        max_imfs (int): Maximum number of IMFs to extract.
        stop_sd (float): Sifting stopping criterion.

    Returns:
        Tuple[List[np.ndarray], np.ndarray]: (List of IMFs, Residue).
    """
    residue = series.copy()
    imfs = []

    for _ in range(max_imfs):
        if is_monotonic(residue):
            break

        # Sift
        imf, residue = sift_one_imf(residue, sd_threshold=stop_sd)

        # Check if IMF is trivial (too small or constant)
        if np.allclose(imf, 0):
            break

        imfs.append(imf)

    return imfs, residue

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Step 2 & 3: Apply EMD to BTC and ETH
# -------------------------------------------------------------------------------------------------------------------------------

def decompose_series(
    series: pd.Series,
    config: Any
) -> EMDResult:
    """
    Wraps the EMD algorithm for a specific pandas Series.

    Args:
        series (pd.Series): The log-price series.
        config (ValidatedAnalysisConfig): Analysis configuration.

    Returns:
        EMDResult: The decomposition results.
    """
    y = series.values

    # Parameters
    max_imfs = config.hht_max_imfs
    # We assume a standard stop_sd if not in config, or hardcode 0.2 as per paper
    stop_sd = 0.2

    # Run EMD
    imfs, residue = empirical_mode_decomposition(y, max_imfs, stop_sd)

    # Verify reconstruction
    reconstructed = sum(imfs) + residue
    error = np.max(np.abs(y - reconstructed))

    return EMDResult(
        series_name=series.name,
        imfs=imfs,
        residue=residue,
        n_imfs=len(imfs),
        reconstruction_error=error
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def decompose_prices_emd(
    finalized_data: Any,
    analysis_config: Any
) -> EMDDecompositionResults:
    """
    Orchestrates the EMD decomposition for BTC and ETH log-price series.

    Executes:
    1. Extraction of log-price series.
    2. Application of EMD algorithm.
    3. Validation of reconstruction.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration from Task 1.

    Returns:
        EMDDecompositionResults: Container with results for both assets.
    """
    df = finalized_data.df_final

    # Ensure Date index
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    print("Task 22: Running EMD decomposition on Price series...")

    # 1. BTC
    col_btc = "log_Close_BTC_USD"
    print(f"  - Decomposing {col_btc}...")
    res_btc = decompose_series(df_indexed[col_btc].dropna(), analysis_config)
    print(f"    -> Extracted {res_btc.n_imfs} IMFs. Reconstruction Error: {res_btc.reconstruction_error:.2e}")

    # 2. ETH
    col_eth = "log_Close_ETH_USD"
    print(f"  - Decomposing {col_eth}...")
    res_eth = decompose_series(df_indexed[col_eth].dropna(), analysis_config)
    print(f"    -> Extracted {res_eth.n_imfs} IMFs. Reconstruction Error: {res_eth.reconstruction_error:.2e}")

    return EMDDecompositionResults(
        btc_result=res_btc,
        eth_result=res_eth
    )


In [None]:
# Task 23 — Compute Hilbert Transform and Spectrum

# ==============================================================================
# Task 23: Compute Hilbert Transform and Spectrum
# ==============================================================================

@dataclass
class IMFAnalyticData:
    """
    Container for the analytic signal properties of a single IMF.

    Attributes:
        imf_index (int): Index of the IMF (0-based).
        analytic_signal (np.ndarray): The complex analytic signal z(t).
        instantaneous_phase (np.ndarray): Unwrapped phase phi(t).
        instantaneous_frequency (np.ndarray): Angular frequency omega(t) = dphi/dt.
        instantaneous_amplitude (np.ndarray): Envelope A(t) = |z(t)|.
    """
    imf_index: int
    analytic_signal: np.ndarray
    instantaneous_phase: np.ndarray
    instantaneous_frequency: np.ndarray
    instantaneous_amplitude: np.ndarray

@dataclass
class HilbertSpectrumResult:
    """
    Container for the Hilbert Spectrum of a single series.

    Attributes:
        series_name (str): Name of the series.
        imf_data (List[IMFAnalyticData]): Analytic properties for each IMF.
        hilbert_spectrum (np.ndarray): 2D array H(t, omega) representing amplitude.
        time_axis (np.ndarray): Time indices.
        frequency_axis (np.ndarray): Frequency bin centers.
    """
    series_name: str
    imf_data: List[IMFAnalyticData]
    hilbert_spectrum: np.ndarray
    time_axis: np.ndarray
    frequency_axis: np.ndarray

@dataclass
class HilbertTransformResults:
    """
    Container for Hilbert Transform results for BTC and ETH.

    Attributes:
        btc_spectrum (HilbertSpectrumResult): Results for BTC.
        eth_spectrum (HilbertSpectrumResult): Results for ETH.
    """
    btc_spectrum: HilbertSpectrumResult
    eth_spectrum: HilbertSpectrumResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 1 & 2: Apply Hilbert Transform and Compute Instantaneous Attributes
# -------------------------------------------------------------------------------------------------------------------------------

def compute_analytic_properties(imf: np.ndarray, idx: int) -> IMFAnalyticData:
    """
    Computes the analytic signal, phase, frequency, and amplitude for a single IMF.

    Uses scipy.signal.hilbert for the analytic signal.
    Frequency is computed as the gradient of the unwrapped phase.

    Args:
        imf (np.ndarray): The Intrinsic Mode Function array.
        idx (int): The index of the IMF.

    Returns:
        IMFAnalyticData: Container with computed properties.
    """
    # 1. Analytic Signal
    # scipy.signal.hilbert returns the analytic signal x(t) + iH(x(t))
    analytic = hilbert(imf)

    # 2. Instantaneous Amplitude
    amplitude = np.abs(analytic)

    # 3. Instantaneous Phase
    # angle returns values in [-pi, pi]
    phase = np.angle(analytic)
    # Unwrap to avoid discontinuities
    unwrapped_phase = np.unwrap(phase)

    # 4. Instantaneous Frequency
    # omega = d(phase)/dt
    # We use central difference for gradient
    frequency = np.gradient(unwrapped_phase)

    # Normalize frequency?
    # The derivative is in radians per sample (day).
    # We keep it in radians/sample.
    return IMFAnalyticData(
        imf_index=idx,
        analytic_signal=analytic,
        instantaneous_phase=unwrapped_phase,
        instantaneous_frequency=frequency,
        instantaneous_amplitude=amplitude
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Step 3: Construct Hilbert Spectrum
# -------------------------------------------------------------------------------------------------------------------------------

def construct_spectrum(
    imf_data_list: List[IMFAnalyticData],
    n_time_points: int,
    n_freq_bins: int = 100,
    max_freq: float = np.pi
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Constructs the Hilbert Spectrum H(t, omega).

    Aggregates the amplitude of all IMFs into time-frequency bins.

    Args:
        imf_data_list (List[IMFAnalyticData]): List of analytic data for all IMFs.
        n_time_points (int): Length of the time series.
        n_freq_bins (int): Number of frequency bins.
        max_freq (float): Maximum frequency to consider (default Nyquist = pi).

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]:
            (spectrum_matrix, time_axis, freq_axis)
            spectrum_matrix shape is (n_freq_bins, n_time_points).
    """
    # Define frequency bins
    freq_edges = np.linspace(0, max_freq, n_freq_bins + 1)
    freq_centers = (freq_edges[:-1] + freq_edges[1:]) / 2

    # Initialize spectrum (Frequency x Time)
    # We sum amplitudes (or energy? Task says "Hilbert spectrum... represents time-frequency distribution")
    # Usually H(t, w) is amplitude. Energy is H^2.
    # We accumulate Amplitude here.
    spectrum = np.zeros((n_freq_bins, n_time_points))

    for t in range(n_time_points):
        for imf_data in imf_data_list:
            freq = imf_data.instantaneous_frequency[t]
            amp = imf_data.instantaneous_amplitude[t]

            # Find bin index
            # We only care about positive frequencies up to Nyquist
            if 0 <= freq < max_freq:
                bin_idx = int(freq / max_freq * n_freq_bins)
                if bin_idx < n_freq_bins:
                    spectrum[bin_idx, t] += amp

    return spectrum, np.arange(n_time_points), freq_centers

# -------------------------------------------------------------------------------------------------------------------------------
# Task 23, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def compute_hilbert_spectrum(
    emd_results: Any
) -> HilbertTransformResults:
    """
    Orchestrates the computation of Hilbert Transforms and Spectra for BTC and ETH.

    Executes:
    1. Analytic signal computation for each IMF.
    2. Instantaneous phase/frequency/amplitude derivation.
    3. Aggregation into Hilbert Spectrum.

    Args:
        emd_results (EMDDecompositionResults): Results from Task 22.

    Returns:
        HilbertTransformResults: Container with spectra for both assets.
    """
    print("Task 23: Computing Hilbert Spectra...")

    def process_asset(emd_res: Any) -> HilbertSpectrumResult:
        imfs = emd_res.imfs
        n_samples = len(imfs[0]) if imfs else 0

        # 1. Compute properties for each IMF
        imf_data_list = []
        for i, imf in enumerate(imfs):
            data = compute_analytic_properties(imf, i)
            imf_data_list.append(data)

        # 2. Construct Spectrum
        spectrum, t_axis, f_axis = construct_spectrum(imf_data_list, n_samples)

        return HilbertSpectrumResult(
            series_name=emd_res.series_name,
            imf_data=imf_data_list,
            hilbert_spectrum=spectrum,
            time_axis=t_axis,
            frequency_axis=f_axis
        )

    # Process BTC
    print("  - Processing BTC...")
    btc_spec = process_asset(emd_results.btc_result)

    # Process ETH
    print("  - Processing ETH...")
    eth_spec = process_asset(emd_results.eth_result)

    return HilbertTransformResults(
        btc_spectrum=btc_spec,
        eth_spectrum=eth_spec
    )


In [None]:
# Task 24 — Compute Instantaneous Energy and Detect Extreme Events

# ==============================================================================
# Task 24: Compute Instantaneous Energy and Detect Extreme Events
# ==============================================================================

@dataclass
class ExtremeEventDetectionResult:
    """
    Container for the results of extreme event detection on a single series.

    Attributes:
        series_name (str): Name of the series.
        instantaneous_energy (np.ndarray): The unnormalized energy IE(t).
        normalized_energy (np.ndarray): The normalized energy IE_N(t).
        mean_energy (float): E_mu.
        std_energy (float): sigma.
        threshold (float): E_th = E_mu + B * sigma.
        extreme_event_dates (List[pd.Timestamp]): Dates where IE(t) > E_th.
        extreme_event_indices (List[int]): Indices where IE(t) > E_th.
    """
    series_name: str
    instantaneous_energy: np.ndarray
    normalized_energy: np.ndarray
    mean_energy: float
    std_energy: float
    threshold: float
    extreme_event_dates: List[pd.Timestamp]
    extreme_event_indices: List[int]

@dataclass
class ExtremeEventResults:
    """
    Container for extreme event results for BTC and ETH.

    Attributes:
        btc_events (ExtremeEventDetectionResult): Results for BTC.
        eth_events (ExtremeEventDetectionResult): Results for ETH.
    """
    btc_events: ExtremeEventDetectionResult
    eth_events: ExtremeEventDetectionResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 1: Compute Instantaneous Energy
# -------------------------------------------------------------------------------------------------------------------------------

def compute_instantaneous_energy(
    spectrum: np.ndarray,
    freq_axis: np.ndarray
) -> np.ndarray:
    """
    Computes the instantaneous energy IE(t) from the Hilbert Spectrum.

    IE(t) = sum(H(t, w)^2 * dw)

    Args:
        spectrum (np.ndarray): The Hilbert Spectrum H(t, w) (Freq x Time).
        freq_axis (np.ndarray): The frequency bin centers.

    Returns:
        np.ndarray: The instantaneous energy array (1D, length T).
    """
    # Calculate bin width (assuming uniform spacing)
    if len(freq_axis) > 1:
        dw = freq_axis[1] - freq_axis[0]
    else:
        dw = 1.0 # Fallback if single bin

    # Square the amplitude spectrum to get energy density
    energy_density = spectrum ** 2

    # Integrate over frequency (sum * dw)
    # spectrum shape is (n_freq, n_time)
    # Sum along axis 0 (frequency)
    ie_t = np.sum(energy_density, axis=0) * dw

    return ie_t

def normalize_energy(ie_t: np.ndarray) -> np.ndarray:
    """
    Normalizes the instantaneous energy by its maximum value.

    IE_N(t) = IE(t) / max(IE(t))

    Args:
        ie_t (np.ndarray): Instantaneous energy.

    Returns:
        np.ndarray: Normalized energy.
    """
    max_val = np.max(ie_t)
    if max_val == 0:
        return np.zeros_like(ie_t)
    return ie_t / max_val

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 2: Compute Extreme Event Threshold
# -------------------------------------------------------------------------------------------------------------------------------

def compute_threshold(ie_t: np.ndarray, b_param: float) -> Tuple[float, float, float]:
    """
    Computes the statistical threshold for extreme events.

    E_th = E_mu + B * sigma

    Args:
        ie_t (np.ndarray): Instantaneous energy.
        b_param (float): The multiplier B (e.g., 4).

    Returns:
        Tuple[float, float, float]: (mean, std, threshold).
    """
    mean_val = np.mean(ie_t)
    std_val = np.std(ie_t, ddof=1) # Sample standard deviation
    threshold = mean_val + b_param * std_val

    return mean_val, std_val, threshold

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Step 3: Identify Extreme Events
# -------------------------------------------------------------------------------------------------------------------------------

def find_extreme_events(
    ie_t: np.ndarray,
    threshold: float,
    date_index: pd.Index
) -> Tuple[List[int], List[pd.Timestamp]]:
    """
    Identifies indices and dates where energy exceeds the threshold.

    Args:
        ie_t (np.ndarray): Instantaneous energy.
        threshold (float): The energy threshold.
        date_index (pd.Index): The DatetimeIndex corresponding to the time axis.

    Returns:
        Tuple[List[int], List[pd.Timestamp]]: (indices, dates).
    """
    # Find indices
    indices = np.where(ie_t > threshold)[0].tolist()

    # Map to dates
    dates = [date_index[i] for i in indices]

    return indices, dates

# -------------------------------------------------------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def detect_extreme_events(
    hilbert_results: Any,
    analysis_config: Any,
    date_index: pd.Index
) -> ExtremeEventResults:
    """
    Orchestrates the detection of extreme events for BTC and ETH.

    Executes:
    1. Energy computation.
    2. Threshold calculation.
    3. Event identification.

    Args:
        hilbert_results (HilbertTransformResults): Results from Task 23.
        analysis_config (ValidatedAnalysisConfig): Configuration (for B param).
        date_index (pd.Index): The DatetimeIndex for mapping.

    Returns:
        ExtremeEventResults: Container with results for both assets.
    """
    print("Task 24: Detecting Extreme Events...")

    b_param = analysis_config.hht_threshold_b

    def process_asset(spectrum_res: Any) -> ExtremeEventDetectionResult:
        spectrum = spectrum_res.hilbert_spectrum
        freq_axis = spectrum_res.frequency_axis

        # 1. Compute Energy
        ie_t = compute_instantaneous_energy(spectrum, freq_axis)
        ie_n = normalize_energy(ie_t)

        # 2. Compute Threshold
        mean_val, std_val, threshold = compute_threshold(ie_t, b_param)

        # 3. Detect Events
        indices, dates = find_extreme_events(ie_t, threshold, date_index)

        return ExtremeEventDetectionResult(
            series_name=spectrum_res.series_name,
            instantaneous_energy=ie_t,
            normalized_energy=ie_n,
            mean_energy=mean_val,
            std_energy=std_val,
            threshold=threshold,
            extreme_event_dates=dates,
            extreme_event_indices=indices
        )

    # Process BTC
    print("  - Processing BTC...")
    btc_res = process_asset(hilbert_results.btc_spectrum)
    print(f"    -> Threshold: {btc_res.threshold:.4f}")
    print(f"    -> Found {len(btc_res.extreme_event_dates)} events: {[d.date() for d in btc_res.extreme_event_dates]}")

    # Process ETH
    print("  - Processing ETH...")
    eth_res = process_asset(hilbert_results.eth_spectrum)
    print(f"    -> Threshold: {eth_res.threshold:.4f}")
    print(f"    -> Found {len(eth_res.extreme_event_dates)} events: {[d.date() for d in eth_res.extreme_event_dates]}")

    return ExtremeEventResults(
        btc_events=btc_res,
        eth_events=eth_res
    )


In [None]:
# Task 25 — Design HHT Orchestrator

# ==============================================================================
# Task 25: Design HHT Orchestrator
# ==============================================================================

@dataclass
class HHTOrchestratorResult:
    """
    Container for the results of HHT analysis across all series.

    Attributes:
        results (Dict[str, Dict[str, Any]]): Dictionary mapping series name to HHT results.
        failed_series (Dict[str, str]): Dictionary mapping failed series names to error messages.
    """
    results: Dict[str, Any] # Type: Dict containing EMD, Hilbert, and EE results
    failed_series: Dict[str, str]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 1 & 2: Define Orchestrator Inputs and Internal Callables
# -------------------------------------------------------------------------------------------------------------------------------

def callable_emd(series: pd.Series, config: Any) -> Any:
    """
    Internal callable to execute EMD decomposition on a single series.

    Wraps the decomposition function from Task 22.

    Args:
        series (pd.Series): The time series.
        config (ValidatedAnalysisConfig): Analysis configuration.

    Returns:
        EMDResult: The decomposition result.
    """
    # Reuse the implementation from Task 22
    return decompose_series(series, config)

def callable_hilbert_spectrum(emd_res: Any) -> Any:
    """
    Internal callable to compute Hilbert Spectrum for a single series.

    Wraps the spectrum construction logic from Task 23.

    Args:
        emd_res (EMDResult): The decomposition result.

    Returns:
        HilbertSpectrumResult: The spectrum result.
    """
    # Reuse logic from Task 23
    # We need to adapt compute_hilbert_spectrum which processes both assets at once
    imfs = emd_res.imfs
    n_samples = len(imfs[0]) if imfs else 0

    # 1. Compute properties for each IMF
    imf_data_list = []
    for i, imf in enumerate(imfs):
        data = compute_analytic_properties(imf, i)
        imf_data_list.append(data)

    # 2. Construct Spectrum
    spectrum, t_axis, f_axis = construct_spectrum(imf_data_list, n_samples)

    return HilbertSpectrumResult(
        series_name=emd_res.series_name,
        imf_data=imf_data_list,
        hilbert_spectrum=spectrum,
        time_axis=t_axis,
        frequency_axis=f_axis
    )

def callable_extreme_events(spectrum_res: Any, config: Any, date_index: pd.Index) -> Any:
    """
    Internal callable to detect extreme events for a single series.

    Wraps the detection logic from Task 24.

    Args:
        spectrum_res (HilbertSpectrumResult): The spectrum result.
        config (ValidatedAnalysisConfig): Analysis configuration.
        date_index (pd.Index): The DatetimeIndex.

    Returns:
        ExtremeEventDetectionResult: The detection result.
    """
    # Reuse logic from Task 24
    b_param = config.hht_threshold_b

    spectrum = spectrum_res.hilbert_spectrum
    freq_axis = spectrum_res.frequency_axis

    # 1. Compute Energy
    ie_t = compute_instantaneous_energy(spectrum, freq_axis)
    ie_n = normalize_energy(ie_t)

    # 2. Compute Threshold
    mean_val, std_val, threshold = compute_threshold(ie_t, b_param)

    # 3. Detect Events
    indices, dates = find_extreme_events(ie_t, threshold, date_index)

    return ExtremeEventDetectionResult(
        series_name=spectrum_res.series_name,
        instantaneous_energy=ie_t,
        normalized_energy=ie_n,
        mean_energy=mean_val,
        std_energy=std_val,
        threshold=threshold,
        extreme_event_dates=dates,
        extreme_event_indices=indices
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 25, Step 3: Define Orchestrator Control Flow
# -------------------------------------------------------------------------------------------------------------------------------

def orchestrate_hht(
    finalized_data: Any,
    analysis_config: Any
) -> HHTOrchestratorResult:
    """
    Orchestrates the HHT analysis for all configured series.

    Iterates through the series mapped for HHT analysis in Task 15,
    executes EMD, Hilbert Transform, and Extreme Event Detection, and aggregates results.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration from Task 1.

    Returns:
        HHTOrchestratorResult: Aggregated results.
    """
    df = finalized_data.df_final

    # Ensure Date index
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    # Get target series from mapping
    target_series = finalized_data.series_mapping.hht_series

    results = {}
    failures = {}

    print(f"Task 25: Orchestrating HHT for {len(target_series)} series...")

    for col in target_series:
        if col not in df_indexed.columns:
            failures[col] = "Column missing from DataFrame"
            print(f"  - WARNING: Series '{col}' missing.")
            continue

        try:
            print(f"  - Processing {col}...")
            series = df_indexed[col].dropna()

            if len(series) == 0:
                failures[col] = "Series is empty after dropping NaNs"
                continue

            # 1. EMD
            emd_res = callable_emd(series, analysis_config)

            # 2. Hilbert Spectrum
            hs_res = callable_hilbert_spectrum(emd_res)

            # 3. Extreme Events
            ee_res = callable_extreme_events(hs_res, analysis_config, series.index)

            results[col] = {
                "emd": emd_res,
                "spectrum": hs_res,
                "events": ee_res
            }

            # Log brief result
            print(f"    -> Found {len(ee_res.extreme_event_dates)} extreme events.")

        except Exception as e:
            failures[col] = str(e)
            print(f"    -> FAILED: {e}")

    print(f"Task 25 Complete. Successful: {len(results)}, Failed: {len(failures)}")

    return HHTOrchestratorResult(
        results=results,
        failed_series=failures
    )


In [None]:
# Task 26 — Execute AAFT Robustness for HHT Extreme Events

# ==============================================================================
# Task 26: Execute AAFT Robustness for HHT Extreme Events
# ==============================================================================

@dataclass
class HHTRobustnessResult:
    """
    Container for the results of AAFT robustness testing on HHT extreme events.

    Attributes:
        series_name (str): Name of the series tested.
        observed_max_energy (float): The maximum instantaneous energy from the original series.
        surrogate_max_energies (List[float]): List of max energies from surrogates.
        p_value (float): Empirical p-value for the maximum energy peak.
        n_surrogates (int): Number of surrogates generated.
    """
    series_name: str
    observed_max_energy: float
    surrogate_max_energies: List[float]
    p_value: float
    n_surrogates: int

@dataclass
class HHTRobustnessResults:
    """
    Container for HHT robustness results for BTC and ETH.

    Attributes:
        btc_robustness (HHTRobustnessResult): Results for BTC.
        eth_robustness (HHTRobustnessResult): Results for ETH.
    """
    btc_robustness: HHTRobustnessResult
    eth_robustness: HHTRobustnessResult

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 1 & 2: Generate Surrogates and Compute Energy Maxima
# -------------------------------------------------------------------------------------------------------------------------------

def compute_surrogate_maxima(
    series: pd.Series,
    config: Any,
    n_surrogates: int
) -> List[float]:
    """
    Generates AAFT surrogates and computes the maximum instantaneous energy for each.

    Args:
        series (pd.Series): The original time series.
        config (ValidatedAnalysisConfig): Analysis configuration.
        n_surrogates (int): Number of surrogates.

    Returns:
        List[float]: List of maximum energy values from surrogates.
    """
    y = series.values

    # Generate Surrogates (Reusing Task 21 function)
    # We assume generate_aaft_surrogates is available in the environment
    surrogates = generate_aaft_surrogates(y, n_surrogates, seed=42)

    max_energies = []

    # EMD Parameters
    max_imfs = config.hht_max_imfs
    stop_sd = 0.2

    for i in range(n_surrogates):
        y_surr = surrogates[i]

        # 1. EMD
        # Reusing empirical_mode_decomposition from Task 22
        imfs, _ = empirical_mode_decomposition(y_surr, max_imfs, stop_sd)

        if not imfs:
            max_energies.append(0.0)
            continue

        # 2. Hilbert Transform & Energy
        # We need to compute energy without storing the full spectrum for efficiency
        # IE(t) = sum(A_j(t)^2 * w_j(t)?) No, IE(t) = sum(H(t, w)^2 dw)
        n_samples = len(y_surr)
        n_freq_bins = 100
        max_freq = np.pi
        spectrum = np.zeros((n_freq_bins, n_samples))

        freq_edges = np.linspace(0, max_freq, n_freq_bins + 1)
        dw = freq_edges[1] - freq_edges[0]

        for j, imf in enumerate(imfs):
            # Analytic signal (Task 23 logic)
            # Reusing compute_analytic_properties logic inline for speed
            analytic = hilbert(imf)
            amp = np.abs(analytic)
            phase = np.unwrap(np.angle(analytic))
            freq = np.gradient(phase)

            # Binning
            # Vectorized binning
            valid_mask = (freq >= 0) & (freq < max_freq)
            bin_indices = (freq[valid_mask] / max_freq * n_freq_bins).astype(int)
            # Clip to be safe
            bin_indices = np.clip(bin_indices, 0, n_freq_bins - 1)

            # Accumulate
            # We can't easily vectorize the accumulation into 2D array with numpy without ufunc.at
            # np.add.at(spectrum, (bin_indices, np.where(valid_mask)[0]), amp[valid_mask])
            np.add.at(spectrum, (bin_indices, np.where(valid_mask)[0]), amp[valid_mask])

        # 3. Compute Energy (Task 24 logic)
        energy_density = spectrum ** 2
        ie_t = np.sum(energy_density, axis=0) * dw

        # 4. Max Energy
        max_energies.append(np.max(ie_t))

    return max_energies

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Step 3: Compute Empirical p-Values
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_hht_p_value(
    observed_max: float,
    surrogate_maxima: List[float]
) -> float:
    """
    Computes the empirical p-value for the observed maximum energy.

    Args:
        observed_max (float): Observed max energy.
        surrogate_maxima (List[float]): List of surrogate max energies.

    Returns:
        float: p-value.
    """
    n = len(surrogate_maxima)
    if n == 0:
        return 1.0

    count_exceed = sum(1 for m in surrogate_maxima if m >= observed_max)
    return count_exceed / n

# -------------------------------------------------------------------------------------------------------------------------------
# Task 26, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def execute_hht_robustness(
    finalized_data: Any,
    hht_results: Any,
    analysis_config: Any
) -> HHTRobustnessResults:
    """
    Orchestrates the AAFT robustness testing for HHT extreme events.

    Args:
        finalized_data (FinalizedSeriesData): The dataset.
        hht_results (HHTOrchestratorResult): Results from Task 25 (containing EE results).
        analysis_config (ValidatedAnalysisConfig): Configuration.

    Returns:
        HHTRobustnessResults: Container with robustness results.
    """
    df = finalized_data.df_final
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    n_surrogates = 1000 # As per config

    print(f"Task 26: Executing HHT robustness tests ({n_surrogates} iterations)...")

    def process_asset(series_name: str, observed_max: float) -> HHTRobustnessResult:
        print(f"  - Testing {series_name} (Observed Max Energy: {observed_max:.4f})...")

        series = df_indexed[series_name].dropna()

        # Generate and Compute
        surrogate_maxima = compute_surrogate_maxima(series, analysis_config, n_surrogates)

        # Compute p-value
        p_val = calculate_hht_p_value(observed_max, surrogate_maxima)

        print(f"    -> p-value: {p_val:.4f}")

        return HHTRobustnessResult(
            series_name=series_name,
            observed_max_energy=observed_max,
            surrogate_max_energies=surrogate_maxima,
            p_value=p_val,
            n_surrogates=n_surrogates
        )

    # Extract observed max from HHT results
    # We need to access the nested structure from Task 25
    # results['log_Close_BTC_USD']['events'] -> ExtremeEventDetectionResult
    # BTC
    btc_name = "log_Close_BTC_USD"
    btc_events = hht_results.results[btc_name]["events"]
    btc_max = np.max(btc_events.instantaneous_energy)
    btc_res = process_asset(btc_name, btc_max)

    # ETH
    eth_name = "log_Close_ETH_USD"
    eth_events = hht_results.results[eth_name]["events"]
    eth_max = np.max(eth_events.instantaneous_energy)
    eth_res = process_asset(eth_name, eth_max)

    return HHTRobustnessResults(
        btc_robustness=btc_res,
        eth_robustness=eth_res
    )


In [None]:
# Task 27 — Prepare Stationary SVAR Input Series

# ==============================================================================
# Task 27: Prepare Stationary SVAR Input Series
# ==============================================================================

@dataclass
class SVARInputData:
    """
    Container for the stationary time series used in SVAR analysis.

    Attributes:
        df_full (pd.DataFrame): Full sample of stationary series (differenced log volumes).
        df_pre (pd.DataFrame): Pre-election subsample.
        df_post (pd.DataFrame): Post-election subsample.
        variables (List[str]): List of variable names in the vector Y_t.
        break_date (pd.Timestamp): The date used to split the sample.
    """
    df_full: pd.DataFrame
    df_pre: pd.DataFrame
    df_post: pd.DataFrame
    variables: List[str]
    break_date: pd.Timestamp

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 1: Select Differenced Series for SVAR
# -------------------------------------------------------------------------------------------------------------------------------

def select_svar_series(
    df: pd.DataFrame,
    target_cols: List[str]
) -> pd.DataFrame:
    """
    Selects and cleans the differenced series for SVAR analysis.

    Args:
        df (pd.DataFrame): The DataFrame containing differenced series.
        target_cols (List[str]): List of column names for the SVAR (e.g., diff_log_V_EOA_EOA_USDC).

    Returns:
        pd.DataFrame: A clean DataFrame with no NaNs, containing only target columns.
    """
    # Select columns
    subset = df[target_cols].copy()

    # Drop NaNs (e.g., first row from differencing)
    clean_subset = subset.dropna()

    # Ensure Date is preserved in index if it was in index, or set it
    # Assuming df has Date index from previous steps

    return clean_subset

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Step 2 & 3: Define Regime Boundaries and Construct Vectors
# -------------------------------------------------------------------------------------------------------------------------------

def split_regimes(
    df: pd.DataFrame,
    break_date: pd.Timestamp
) -> Dict[str, pd.DataFrame]:
    """
    Splits the dataset into pre- and post-election regimes.

    Pre-election: Date < break_date
    Post-election: Date >= break_date

    Args:
        df (pd.DataFrame): The clean SVAR DataFrame (index must be DatetimeIndex).
        break_date (pd.Timestamp): The regime shift date.

    Returns:
        Dict[str, pd.DataFrame]: Dictionary with 'pre' and 'post' DataFrames.
    """
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("DataFrame index must be DatetimeIndex for splitting.")

    # Split
    mask_pre = df.index < break_date
    mask_post = df.index >= break_date

    df_pre = df.loc[mask_pre].copy()
    df_post = df.loc[mask_post].copy()

    return {
        "pre": df_pre,
        "post": df_post
    }

# -------------------------------------------------------------------------------------------------------------------------------
# Task 27, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def prepare_svar_inputs(
    finalized_data: Any,
    analysis_config: Any
) -> SVARInputData:
    """
    Orchestrates the preparation of stationary inputs for SVAR.

    Executes:
    1. Selection of differenced EOA-EOA volume series.
    2. Cleaning of missing values.
    3. Splitting into pre- and post-election samples.

    Args:
        finalized_data (FinalizedSeriesData): The dataset from Task 15.
        analysis_config (ValidatedAnalysisConfig): Configuration (for break date).

    Returns:
        SVARInputData: Container with full, pre, and post datasets.
    """
    df = finalized_data.df_final

    # Ensure Date index
    if "Date" in df.columns:
        df_indexed = df.set_index("Date")
    else:
        df_indexed = df

    # Identify target variables
    # Let's look for "diff_log_V_EOA_EOA_USDC" and "diff_log_V_EOA_EOA_USDT".
    target_vars = [
        "diff_log_V_EOA_EOA_USDC",
        "diff_log_V_EOA_EOA_USDT"
    ]

    # Verify existence
    missing = [c for c in target_vars if c not in df_indexed.columns]
    if missing:
        raise ValueError(f"Missing SVAR target variables: {missing}")

    print(f"Task 27: Preparing SVAR inputs for {target_vars}...")

    # 1. Select and Clean
    df_clean = select_svar_series(df_indexed, target_vars)

    # 2. Split
    break_date = analysis_config.svar_break_date
    splits = split_regimes(df_clean, break_date)

    print(f"  - Full Sample: {len(df_clean)} rows")
    print(f"  - Pre-Election (< {break_date.date()}): {len(splits['pre'])} rows")
    print(f"  - Post-Election (>= {break_date.date()}): {len(splits['post'])} rows")

    return SVARInputData(
        df_full=df_clean,
        df_pre=splits["pre"],
        df_post=splits["post"],
        variables=target_vars,
        break_date=break_date
    )


In [None]:
# Task 28 — Estimate Reduced-Form VAR with Lag Selection

# ==============================================================================
# Task 28: Estimate Reduced-Form VAR with Lag Selection
# ==============================================================================

@dataclass
class VARModelResults:
    """
    Container for the estimated reduced-form VAR model.

    Attributes:
        selected_lag (int): The optimal lag order p* selected via AIC.
        full_model_results (VARResultsWrapper): The fitted model object from statsmodels.
        aic_value (float): The AIC of the selected model.
        resid_cov (np.ndarray): The residual covariance matrix (Sigma_u).
        coefficients (pd.DataFrame): Estimated coefficients (Phi matrices and intercept).
    """
    selected_lag: int
    full_model_results: VARResultsWrapper
    aic_value: float
    resid_cov: np.ndarray
    coefficients: pd.DataFrame

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 1: Specify VAR Model
# -------------------------------------------------------------------------------------------------------------------------------

def initialize_var_model(df: pd.DataFrame) -> VAR:
    """
    Initializes the VAR model structure using statsmodels.

    Args:
        df (pd.DataFrame): The stationary multivariate time series.

    Returns:
        VAR: The initialized VAR model instance.
    """
    # Ensure no NaNs (Task 27 should have handled this, but defensive check)
    if df.isnull().any().any():
        raise ValueError("Input data for VAR contains NaNs.")

    model = VAR(df)
    return model

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 2: Select Lag Order via AIC
# -------------------------------------------------------------------------------------------------------------------------------

def select_optimal_lag(model: VAR, max_lags: int) -> int:
    """
    Selects the optimal lag order based on the Akaike Information Criterion (AIC).

    Args:
        model (VAR): The initialized VAR model.
        max_lags (int): Maximum number of lags to check.

    Returns:
        int: The optimal lag order p*.
    """
    # select_order returns a LagOrderResults object
    lag_results = model.select_order(maxlags=max_lags)

    # Extract the selected lag for AIC
    selected_lag = lag_results.aic

    return selected_lag

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Step 3: Estimate Full-Sample VAR
# -------------------------------------------------------------------------------------------------------------------------------

def fit_var_model(model: VAR, lag_order: int) -> VARResultsWrapper:
    """
    Estimates the VAR model parameters using OLS given the lag order.

    Args:
        model (VAR): The initialized VAR model.
        lag_order (int): The lag order to use.

    Returns:
        VARResultsWrapper: The fitted model results.
    """
    results = model.fit(lag_order)
    return results

# -------------------------------------------------------------------------------------------------------------------------------
# Task 28, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_reduced_var(
    svar_input: Any,
    analysis_config: Any
) -> VARModelResults:
    """
    Orchestrates the estimation of the reduced-form VAR model.

    Executes:
    1. Model initialization.
    2. Lag selection via AIC.
    3. Estimation on the full sample.

    Args:
        svar_input (SVARInputData): The input data from Task 27.
        analysis_config (ValidatedAnalysisConfig): Configuration (for max_lags).

    Returns:
        VARModelResults: Container with fitted model and stats.
    """
    df = svar_input.df_full
    max_lags = analysis_config.svar_max_lags

    print(f"Task 28: Estimating VAR on full sample ({len(df)} obs)...")

    # Step 1: Initialize
    model = initialize_var_model(df)

    # Step 2: Select Lag
    selected_lag = select_optimal_lag(model, max_lags)
    print(f"  - Selected Lag (AIC): {selected_lag}")

    # Step 3: Fit
    results = fit_var_model(model, selected_lag)

    # Extract key metrics
    aic = results.aic
    sigma_u = results.sigma_u.values
    coeffs = results.params

    print(f"  - Model AIC: {aic:.4f}")

    return VARModelResults(
        selected_lag=selected_lag,
        full_model_results=results,
        aic_value=aic,
        resid_cov=sigma_u,
        coefficients=coeffs
    )


In [None]:
# Task 29 — Estimate VAR on Pre- and Post-Election Subsamples

# ==============================================================================
# Task 29: Estimate VAR on Pre- and Post-Election Subsamples
# ==============================================================================

@dataclass
class RegimeVARResult:
    """
    Container for VAR estimation results on a specific regime (subsample).

    Attributes:
        regime_name (str): 'Pre-Election' or 'Post-Election'.
        n_obs (int): Number of observations in the subsample.
        coefficients (pd.DataFrame): Estimated coefficient matrix (K x Kp+1).
        resid_cov (np.ndarray): Residual covariance matrix (Sigma_u).
        theta_vector (np.ndarray): Vectorized coefficients (aligned with covariance).
        theta_cov (np.ndarray): Covariance matrix of the coefficients.
    """
    regime_name: str
    n_obs: int
    coefficients: pd.DataFrame
    resid_cov: np.ndarray
    theta_vector: np.ndarray
    theta_cov: np.ndarray

@dataclass
class RegimeComparisonData:
    """
    Container for the comparative analysis of pre- and post-election regimes.

    Attributes:
        pre_election_results (RegimeVARResult): Results for the pre-election period.
        post_election_results (RegimeVARResult): Results for the post-election period.
        lag_order (int): The lag order used for both regimes.
    """
    pre_election_results: RegimeVARResult
    post_election_results: RegimeVARResult
    lag_order: int

# -------------------------------------------------------------------------------------------------------------------------------
# Task 29, Step 1 & 2: Estimate VAR on Subsamples
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_regime_var(
    df: pd.DataFrame,
    lag_order: int,
    regime_name: str
) -> RegimeVARResult:
    """
    Estimates a VAR model on a specific subsample.

    Args:
        df (pd.DataFrame): The subsample data.
        lag_order (int): The lag order p*.
        regime_name (str): Label for the regime.

    Returns:
        RegimeVARResult: The estimation results.
    """
    # Initialize and fit
    model = VAR(df)
    results = model.fit(lag_order)

    # Extract coefficients
    # params is (Kp + 1) x K DataFrame (constants + lags)
    coeffs = results.params

    # Extract residual covariance
    sigma_u = results.sigma_u.values

    # Extract vectorized parameters and their covariance
    # statsmodels cov_params() returns the covariance of the flattened parameters
    # The default flattening order in statsmodels for VAR is column-major (F-order),
    # meaning equation by equation.
    # We must flatten params in the same order to align theta with theta_cov.
    theta_vector = coeffs.values.flatten(order='F')
    theta_cov = results.cov_params().values

    return RegimeVARResult(
        regime_name=regime_name,
        n_obs=len(df),
        coefficients=coeffs,
        resid_cov=sigma_u,
        theta_vector=theta_vector,
        theta_cov=theta_cov
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 29, Step 3: Vectorize Coefficients for Comparison
# -------------------------------------------------------------------------------------------------------------------------------

# This step is integrated into estimate_regime_var via flattening.
# We ensure consistency by using order='F' which matches statsmodels cov_params structure.

# -------------------------------------------------------------------------------------------------------------------------------
# Task 29, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def estimate_regime_vars(
    svar_input: Any,
    full_model_results: Any
) -> RegimeComparisonData:
    """
    Orchestrates the estimation of VAR models for pre- and post-election regimes.

    Executes:
    1. Estimation on pre-election sample.
    2. Estimation on post-election sample.
    3. Packaging of vectorized coefficients for Wald testing.

    Args:
        svar_input (SVARInputData): The input data from Task 27.
        full_model_results (VARModelResults): The full model results from Task 28 (for lag order).

    Returns:
        RegimeComparisonData: Container with both regime results.
    """
    lag_order = full_model_results.selected_lag
    print(f"Task 29: Estimating Regime VARs with lag p={lag_order}...")

    # 1. Pre-Election
    print("  - Estimating Pre-Election VAR...")
    pre_res = estimate_regime_var(svar_input.df_pre, lag_order, "Pre-Election")

    # 2. Post-Election
    print("  - Estimating Post-Election VAR...")
    post_res = estimate_regime_var(svar_input.df_post, lag_order, "Post-Election")

    # Verify dimensions
    if pre_res.theta_vector.shape != post_res.theta_vector.shape:
        raise ValueError("Parameter vector dimensions mismatch between regimes.")

    return RegimeComparisonData(
        pre_election_results=pre_res,
        post_election_results=post_res,
        lag_order=lag_order
    )


In [None]:
# Task 30 — Apply Cholesky Identification to Obtain SVAR

# ==============================================================================
# Task 30: Apply Cholesky Identification to Obtain SVAR
# ==============================================================================

@dataclass
class StructuralImpactMatrix:
    """
    Container for the structural impact matrix (Cholesky factor) for a specific regime and ordering.

    Attributes:
        regime_name (str): 'Pre-Election' or 'Post-Election'.
        ordering (List[str]): The variable ordering used for identification.
        impact_matrix (pd.DataFrame): The lower triangular impact matrix P (A0_inv).
                                      Rows: Response variables. Cols: Structural shocks.
        covariance_matrix (pd.DataFrame): The permuted residual covariance matrix used for decomposition.
        is_positive_definite (bool): True if Cholesky decomposition succeeded.
    """
    regime_name: str
    ordering: List[str]
    impact_matrix: pd.DataFrame
    covariance_matrix: pd.DataFrame
    is_positive_definite: bool

@dataclass
class SVARIdentificationResults:
    """
    Container for SVAR identification results across regimes and orderings.

    Attributes:
        results (List[StructuralImpactMatrix]): List of impact matrices for all combinations.
    """
    results: List[StructuralImpactMatrix]

# -------------------------------------------------------------------------------------------------------------------------------
# Task 30, Step 1 & 2: Apply Cholesky Decomposition
# -------------------------------------------------------------------------------------------------------------------------------

def compute_cholesky_impact(
    resid_cov: np.ndarray,
    variables: List[str],
    ordering: List[str],
    regime_name: str
) -> StructuralImpactMatrix:
    """
    Computes the structural impact matrix via Cholesky decomposition for a given ordering.

    This function performs the following steps:
    1. Maps the desired identification ordering to the indices of the reduced-form covariance matrix.
    2. Permutes the covariance matrix to match the identification ordering.
    3. Performs Cholesky decomposition (Sigma = P * P') to obtain the lower triangular impact matrix P.
    4. Wraps the result in a labeled DataFrame.

    Args:
        resid_cov (np.ndarray): The residual covariance matrix (in original variable order).
        variables (List[str]): The list of variables corresponding to resid_cov indices.
        ordering (List[str]): The desired recursive ordering for identification.
        regime_name (str): Name of the regime (e.g., "Pre-Election").

    Returns:
        StructuralImpactMatrix: The identified structural model.

    Raises:
        ValueError: If ordering variables do not match model variables.
    """
    # 1. Reorder Covariance Matrix
    # Map variable names to indices in the original covariance matrix
    var_to_idx = {v: i for i, v in enumerate(variables)}

    # Get permutation indices
    try:
        perm_indices = [var_to_idx[v] for v in ordering]
    except KeyError as e:
        raise ValueError(f"Ordering variable not found in model variables: {e}")

    # Permute rows and columns
    # Sigma_perm = P_mat * Sigma * P_mat.T
    # We use numpy indexing for efficient permutation
    # resid_cov[np.ix_(rows, cols)] selects the submatrix
    cov_perm = resid_cov[np.ix_(perm_indices, perm_indices)]

    # 2. Cholesky Decomposition
    # Sigma = L L.T
    # L is lower triangular
    try:
        L = np.linalg.cholesky(cov_perm)
        is_pd = True
    except np.linalg.LinAlgError:
        # Handle non-positive definite matrices gracefully (though unlikely for well-specified VAR)
        # We return a matrix of NaNs to indicate failure without crashing the pipeline
        L = np.full_like(cov_perm, np.nan)
        is_pd = False
        print(f"WARNING: Covariance matrix for {regime_name} with ordering {ordering} is not positive definite.")

    # 3. Package Result
    # Create DataFrame for clarity
    # Rows: Response Variables (in ordering)
    # Cols: Structural Shocks (corresponding to variables in ordering)
    impact_df = pd.DataFrame(
        L,
        index=ordering,
        columns=[f"Shock_{v}" for v in ordering]
    )

    cov_df = pd.DataFrame(
        cov_perm,
        index=ordering,
        columns=ordering
    )

    return StructuralImpactMatrix(
        regime_name=regime_name,
        ordering=ordering,
        impact_matrix=impact_df,
        covariance_matrix=cov_df,
        is_positive_definite=is_pd
    )

# -------------------------------------------------------------------------------------------------------------------------------
# Task 30, Step 3: Extract Impact Matrices (Integrated in Step 2)
# -------------------------------------------------------------------------------------------------------------------------------

# The extraction logic is part of the compute_cholesky_impact function which returns the full matrix.
# Analysis of specific elements (diagonal vs off-diagonal) is done in the orchestrator or downstream reporting.

# -------------------------------------------------------------------------------------------------------------------------------
# Task 30, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def identify_structural_shocks(
    regime_data: Any,
    svar_input: Any,
    analysis_config: Any
) -> SVARIdentificationResults:
    """
    Orchestrates the SVAR identification via Cholesky decomposition.

    Iterates through:
    1. Regimes (Pre-Election, Post-Election).
    2. Identification Orderings (from config).

    Args:
        regime_data (RegimeComparisonData): Results from Task 29.
        svar_input (SVARInputData): Input data (for variable names).
        analysis_config (ValidatedAnalysisConfig): Configuration (for orderings).
        # Note: ValidatedAnalysisConfig doesn't store orderings directly in the dataclass
        # defined in Task 1 (it stores scalar params). We need to access the raw config
        # or assume standard orderings. The prompt's Task 1 ValidatedAnalysisConfig
        # didn't include orderings list. I will access the raw STUDY_CONFIG passed
        # or define standard orderings here if not available.
        # Correction: Task 1 ValidatedAnalysisConfig indeed missed the list.
        # I will accept the raw config dictionary or a list of orderings as an argument.
        # Let's assume we pass the raw 'svar' config dict or list of lists.

    Returns:
        SVARIdentificationResults: Container with all impact matrices.
    """
    # Define orderings based on variables
    vars_full = svar_input.variables

    # Heuristic mapping: find which full var contains the short name
    # This assumes distinct names
    name_map = {}
    for short in ["USDC", "USDT"]:
        matches = [v for v in vars_full if short in v]
        if len(matches) == 1:
            name_map[short] = matches[0]
        else:
            raise ValueError(f"Ambiguous or missing variable mapping for {short}: {matches}")

    # Define standard orderings if not provided
    # Ordering 1: USDC -> USDT
    ord1 = [name_map["USDC"], name_map["USDT"]]
    # Ordering 2: USDT -> USDC
    ord2 = [name_map["USDT"], name_map["USDC"]]

    orderings = [ord1, ord2]

    results = []

    print("Task 30: Identifying Structural Shocks...")

    # Process Regimes
    regimes = [
        regime_data.pre_election_results,
        regime_data.post_election_results
    ]

    for regime in regimes:
        print(f"  - Processing {regime.regime_name}...")

        for i, ordering in enumerate(orderings):
            # Compute Cholesky
            impact = compute_cholesky_impact(
                regime.resid_cov,
                vars_full,
                ordering,
                regime.regime_name
            )
            results.append(impact)

            if impact.is_positive_definite:
                # Log diagonal elements (Own shocks)
                diag = np.diag(impact.impact_matrix.values)

                print(f"    -> Ordering {i+1}: Own-Shock Impacts = {diag}")
            else:
                print(f"    -> Ordering {i+1}: Identification Failed (Non-PD)")

    return SVARIdentificationResults(results=results)


In [None]:
# Task 31 — Compute Wald Statistic for Regime Comparison

# ==============================================================================
# Task 31: Compute Wald Statistic for Regime Comparison
# ==============================================================================

@dataclass
class WaldTestResult:
    """
    Container for the results of the Wald test for structural change.

    Attributes:
        wald_statistic (float): The computed Wald statistic W.
        degrees_of_freedom (int): The number of parameters tested (k).
        p_value (float): The p-value from the Chi-squared distribution.
        is_significant (bool): True if p-value < 0.05.
        parameter_difference (np.ndarray): The vector difference (theta_post - theta_pre).
    """
    wald_statistic: float
    degrees_of_freedom: int
    p_value: float
    is_significant: bool
    parameter_difference: np.ndarray

# -------------------------------------------------------------------------------------------------------------------------------
# Task 31, Step 1 & 2: Compute Wald Statistic
# -------------------------------------------------------------------------------------------------------------------------------

def calculate_wald_stat(
    theta_pre: np.ndarray,
    cov_pre: np.ndarray,
    theta_post: np.ndarray,
    cov_post: np.ndarray
) -> Tuple[float, np.ndarray, int]:
    """
    Computes the Wald statistic for testing the equality of parameters across regimes.

    W = (theta_post - theta_pre)' (Cov_pre + Cov_post)^(-1) (theta_post - theta_pre)

    Args:
        theta_pre (np.ndarray): Vectorized parameters for pre-election regime.
        cov_pre (np.ndarray): Covariance matrix for pre-election parameters.
        theta_post (np.ndarray): Vectorized parameters for post-election regime.
        cov_post (np.ndarray): Covariance matrix for post-election parameters.

    Returns:
        Tuple[float, np.ndarray, int]: (Wald statistic, difference vector, degrees of freedom).
    """
    # 1. Compute Difference
    delta_theta = theta_post - theta_pre

    # 2. Compute Combined Covariance
    # Under the null of no change, and assuming independence of samples (pre/post),
    # Var(delta_theta) = Var(theta_post) + Var(theta_pre)
    V = cov_pre + cov_post

    # 3. Compute Wald Statistic
    # W = delta' V^-1 delta
    # Use solve for stability: x = V^-1 delta => V x = delta
    try:
        x = np.linalg.solve(V, delta_theta)
        wald_stat = np.dot(delta_theta, x)
    except np.linalg.LinAlgError:
        # Fallback to pseudo-inverse if V is singular (unlikely for full rank VAR)
        print("WARNING: Covariance matrix singular, using pseudo-inverse for Wald test.")
        V_inv = np.linalg.pinv(V)
        wald_stat = delta_theta.T @ V_inv @ delta_theta

    k = len(delta_theta)

    return wald_stat, delta_theta, k

# -------------------------------------------------------------------------------------------------------------------------------
# Task 31, Step 3: Compute p-Value and Interpret
# -------------------------------------------------------------------------------------------------------------------------------

def compute_chi2_p_value(wald_stat: float, df: int) -> float:
    """
    Computes the p-value from the Chi-squared distribution.

    Args:
        wald_stat (float): The Wald statistic.
        df (int): Degrees of freedom.

    Returns:
        float: The p-value (survival function).
    """
    # sf = 1 - cdf
    return chi2.sf(wald_stat, df)

# -------------------------------------------------------------------------------------------------------------------------------
# Task 31, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def perform_wald_test(
    regime_data: Any
) -> WaldTestResult:
    """
    Orchestrates the Wald test for structural change between pre- and post-election regimes.

    Executes:
    1. Extraction of parameter vectors and covariances.
    2. Computation of Wald statistic.
    3. Computation of p-value.

    Args:
        regime_data (RegimeComparisonData): Results from Task 29.

    Returns:
        WaldTestResult: The test results.
    """
    print("Task 31: Performing Wald Test for Regime Change...")

    pre = regime_data.pre_election_results
    post = regime_data.post_election_results

    # 1. Compute Statistic
    wald_stat, delta, k = calculate_wald_stat(
        pre.theta_vector,
        pre.theta_cov,
        post.theta_vector,
        post.theta_cov
    )

    # 2. Compute p-value
    p_val = compute_chi2_p_value(wald_stat, k)

    is_sig = p_val < 0.05

    print(f"  - Wald Statistic: {wald_stat:.4f}")
    print(f"  - Degrees of Freedom: {k}")
    print(f"  - p-value: {p_val:.4e}")
    print(f"  - Significant Change: {is_sig}")

    return WaldTestResult(
        wald_statistic=wald_stat,
        degrees_of_freedom=k,
        p_value=p_val,
        is_significant=is_sig,
        parameter_difference=delta
    )


In [None]:
# Task 32 — Synthesize Results and Validate Against Expected Findings

# ==============================================================================
# Task 32: Synthesize Results and Validate Against Expected Findings
# ==============================================================================

@dataclass
class ValidationItem:
    """
    Container for a single validation check.

    Attributes:
        metric_name (str): Name of the metric/event being validated.
        observed_value (Any): The value computed by the pipeline.
        expected_value (Any): The value expected from the paper.
        status (str): 'PASS', 'FAIL', or 'APPROX'.
        comment (str): Additional context.
    """
    metric_name: str
    observed_value: Any
    expected_value: Any
    status: str
    comment: str

@dataclass
class FinalStudyReport:
    """
    Master container for the study's final validation report.

    Attributes:
        structural_breaks (List[ValidationItem]): Validation of break dates and significance.
        extreme_events (List[ValidationItem]): Validation of HHT events.
        svar_regime_shift (List[ValidationItem]): Validation of Wald test and impact matrices.
        overall_conclusion (str): Summary of the replication success.
    """
    structural_breaks: List[ValidationItem]
    extreme_events: List[ValidationItem]
    svar_regime_shift: List[ValidationItem]
    overall_conclusion: str

# -------------------------------------------------------------------------------------------------------------------------------
# Task 32, Step 1: Verify Structural Break Timeline
# -------------------------------------------------------------------------------------------------------------------------------

def verify_breaks(
    break_results: Any,
    aaft_results: Any
) -> List[ValidationItem]:
    """
    Validates the detected structural breaks against expected dates.

    Expected:
    - EOA-EOA: 2024-11-03
    - Exchange: 2024-11-05
    - ETH: 2024-11-06
    - BTC: 2024-11-09
    - SC-SC USDC: 2025-01-02
    - SC-SC USDT: 2025-01-16

    Args:
        break_results (StructuralBreakOrchestratorResult): Detection results.
        aaft_results (AAFTRobustnessResults): Robustness results.

    Returns:
        List[ValidationItem]: List of validation checks.
    """
    items = []

    # Define expectations
    expectations = {
        "log_V_EOA_EOA_USDT": "2024-11-03",
        "log_V_EOA_EOA_USDC": "2024-11-03",
        "log_Volume_USDT_USD": "2024-11-05",
        "log_Volume_USDC_USD": "2024-11-05",
        "log_Close_ETH_USD": "2024-11-06",
        "log_Close_BTC_USD": "2024-11-09",
        "log_V_SC_SC_USDC": "2025-01-02",
        "log_V_SC_SC_USDT": "2025-01-16"
    }

    for series, exp_date_str in expectations.items():
        if series not in break_results.results:
            items.append(ValidationItem(series, "Missing", exp_date_str, "FAIL", "Series not processed"))
            continue

        res = break_results.results[series]
        dates = [d.date() for d in res.break_dates]

        # Check if expected date is in detected dates (allowing +/- 2 days)
        exp_date = pd.Timestamp(exp_date_str).date()
        match_found = False
        obs_date = None

        for d in dates:
            delta = abs((d - exp_date).days)
            if delta <= 2:
                match_found = True
                obs_date = d
                break

        # Check significance
        p_val = aaft_results.results[series].p_value
        sig_status = "Significant" if p_val < 0.001 else f"Not Significant (p={p_val})"

        status = "PASS" if match_found and p_val < 0.001 else "FAIL"
        comment = f"Observed: {dates}. {sig_status}."

        items.append(ValidationItem(
            metric_name=f"Break Date: {series}",
            observed_value=str(obs_date) if obs_date else "None",
            expected_value=exp_date_str,
            status=status,
            comment=comment
        ))

    return items

# -------------------------------------------------------------------------------------------------------------------------------
# Task 32, Step 2: Verify HHT Extreme Events
# -------------------------------------------------------------------------------------------------------------------------------

def verify_hht(
    hht_results: Any,
    hht_robust_res: Any
) -> List[ValidationItem]:
    """
    Validates HHT extreme event detection.

    Expected:
    - ETH: ~2024-11-07
    - BTC: ~2024-11-10
    - Significance: p < 0.001

    Args:
        hht_results (HHTOrchestratorResult): Detection results.
        hht_robust_res (HHTRobustnessResults): Robustness results.

    Returns:
        List[ValidationItem]: Validation checks.
    """
    items = []

    # BTC
    btc_name = "log_Close_BTC_USD"
    if btc_name in hht_results.results:
        events = hht_results.results[btc_name]["events"]
        dates = [d.date() for d in events.extreme_event_dates]
        p_val = hht_robust_res.btc_robustness.p_value

        # Check for date near Nov 10
        exp_date = pd.Timestamp("2024-11-10").date()
        match = any(abs((d - exp_date).days) <= 3 for d in dates)

        status = "PASS" if match and p_val < 0.001 else "FAIL"
        items.append(ValidationItem(
            metric_name="HHT Event: BTC",
            observed_value=str(dates),
            expected_value="~2024-11-10",
            status=status,
            comment=f"p-value: {p_val}"
        ))

    # ETH
    eth_name = "log_Close_ETH_USD"
    if eth_name in hht_results.results:
        events = hht_results.results[eth_name]["events"]
        dates = [d.date() for d in events.extreme_event_dates]
        p_val = hht_robust_res.eth_robustness.p_value

        # Check for date near Nov 7
        exp_date = pd.Timestamp("2024-11-07").date()
        match = any(abs((d - exp_date).days) <= 3 for d in dates)

        status = "PASS" if match and p_val < 0.001 else "FAIL"
        items.append(ValidationItem(
            metric_name="HHT Event: ETH",
            observed_value=str(dates),
            expected_value="~2024-11-07",
            status=status,
            comment=f"p-value: {p_val}"
        ))

    return items

# -------------------------------------------------------------------------------------------------------------------------------
# Task 32, Step 3: Verify SVAR Regime Shift
# -------------------------------------------------------------------------------------------------------------------------------

def verify_svar(
    wald_res: Any,
    svar_id_res: Any
) -> List[ValidationItem]:
    """
    Validates SVAR regime shift findings.

    Expected:
    - Wald p-value < 0.0001.
    - Impact matrix diagonal elements increase post-election.

    Args:
        wald_res (WaldTestResult): Wald test results.
        svar_id_res (SVARIdentificationResults): Impact matrices.

    Returns:
        List[ValidationItem]: Validation checks.
    """
    items = []

    # 1. Wald Test
    p_val = wald_res.p_value
    status = "PASS" if p_val < 0.0001 else "FAIL"
    items.append(ValidationItem(
        metric_name="SVAR Wald Test",
        observed_value=f"{p_val:.4e}",
        expected_value="< 0.0001",
        status=status,
        comment="Tests equality of pre/post parameters."
    ))

    # 2. Impact Matrix Increase
    pre_mats = [r for r in svar_id_res.results if r.regime_name == "Pre-Election"]
    post_mats = [r for r in svar_id_res.results if r.regime_name == "Post-Election"]

    if pre_mats and post_mats:
        # Compare first ordering
        pre_imp = pre_mats[0].impact_matrix
        post_imp = post_mats[0].impact_matrix

        # Calculate % change in diagonal (Own Shock)
        pre_diag = np.diag(pre_imp.values)
        post_diag = np.diag(post_imp.values)

        # Avoid div by zero
        pct_change = (post_diag - pre_diag) / pre_diag * 100

        # Expect increase
        avg_increase = np.mean(pct_change)
        status = "PASS" if avg_increase > 0 else "FAIL"

        items.append(ValidationItem(
            metric_name="Volatility Spillover Increase",
            observed_value=f"{avg_increase:.2f}%",
            expected_value="> 0% (approx 28-45%)",
            status=status,
            comment=f"Diagonal changes: {pct_change}"
        ))

    return items

# -------------------------------------------------------------------------------------------------------------------------------
# Task 32, Orchestrator Function
# -------------------------------------------------------------------------------------------------------------------------------

def synthesize_study_results(
    break_res: Any,
    aaft_res: Any,
    hht_res: Any,
    hht_robust_res: Any,
    wald_res: Any,
    svar_id_res: Any
) -> FinalStudyReport:
    """
    Orchestrates the final validation and synthesis of the study.

    Aggregates checks from all components to confirm replication success.

    Args:
        break_res: StructuralBreakOrchestratorResult
        aaft_res: AAFTRobustnessResults
        hht_res: HHTOrchestratorResult
        hht_robust_res: HHTRobustnessResults
        wald_res: WaldTestResult
        svar_id_res: SVARIdentificationResults

    Returns:
        FinalStudyReport: The comprehensive report.
    """
    print("Task 32: Synthesizing Final Results...")

    # 1. Breaks
    break_items = verify_breaks(break_res, aaft_res)

    # 2. HHT
    hht_items = verify_hht(hht_res, hht_robust_res)

    # 3. SVAR
    svar_items = verify_svar(wald_res, svar_id_res)

    # Conclusion
    all_items = break_items + hht_items + svar_items
    failures = [i for i in all_items if i.status == "FAIL"]

    if not failures:
        conclusion = "SUCCESS: All empirical findings replicated within tolerance."
    else:
        conclusion = f"PARTIAL SUCCESS: {len(failures)} checks failed. Review logs."

    print(f"  - {conclusion}")
    for item in all_items:
        print(f"    [{item.status}] {item.metric_name}: Obs={item.observed_value}, Exp={item.expected_value}")

    return FinalStudyReport(
        structural_breaks=break_items,
        extreme_events=hht_items,
        svar_regime_shift=svar_items,
        overall_conclusion=conclusion
    )


In [None]:
# Top-Level Orchestrator

# ==============================================================================
# Top-Level Orchestrator: On-Chain Behavioral Pre-Emption System (OBPS)
# ==============================================================================

@dataclass
class OBPSPipelineResult:
    """
    Master container for the entire OBPS research pipeline results.

    Attributes:
        config (ValidatedStudyConfig): Validated configuration.
        chain_validation (ChainValidationResult): Chain data validation stats.
        market_validation (MarketValidationResult): Market data validation stats.
        final_panel (FinalizedSeriesData): The processed time series panel.
        stationarity_tests (StationarityTestResults): ADF test results.
        structural_breaks (StructuralBreakOrchestratorResult): Detected breaks.
        break_robustness (AAFTRobustnessResults): AAFT robustness for breaks.
        hht_analysis (HHTOrchestratorResult): Hilbert-Huang Transform results.
        hht_robustness (HHTRobustnessResults): AAFT robustness for HHT.
        svar_model (VARModelResults): Full-sample VAR model.
        svar_regimes (RegimeComparisonData): Pre/Post election VAR models.
        svar_identification (SVARIdentificationResults): Structural impact matrices.
        wald_test (WaldTestResult): Regime shift significance test.
        final_report (FinalStudyReport): Synthesis and validation against expected findings.
    """
    config: Any # ValidatedStudyConfig
    chain_validation: Any # ChainValidationResult
    market_validation: Any # MarketValidationResult
    final_panel: Any # FinalizedSeriesData
    stationarity_tests: Any # StationarityTestResults
    structural_breaks: Any # StructuralBreakOrchestratorResult
    break_robustness: Any # AAFTRobustnessResults
    hht_analysis: Any # HHTOrchestratorResult
    hht_robustness: Any # HHTRobustnessResults
    svar_model: Any # VARModelResults
    svar_regimes: Any # RegimeComparisonData
    svar_identification: Any # SVARIdentificationResults
    wald_test: Any # WaldTestResult
    final_report: Any # FinalStudyReport

def run_obps_pipeline(
    df_chain_raw: pd.DataFrame,
    df_market_raw: pd.DataFrame,
    study_config: Dict[str, Any]
) -> OBPSPipelineResult:
    """
    Executes the end-to-end On-Chain Behavioral Pre-Emption System (OBPS) research pipeline.

    This orchestrator sequentially executes Tasks 1 through 32, transforming raw blockchain
    and market data into rigorous econometric insights regarding political risk transmission.

    Pipeline Stages:
    1. Configuration Validation
    2. On-Chain Data Processing (Validation, Filtering, Normalization, Classification, Aggregation)
    3. Market Data Processing (Validation, Cleansing, Extraction)
    4. Panel Construction (Merging, Transformation, Stationarity Testing)
    5. Structural Break Analysis (Bai-Perron, AAFT Robustness)
    6. Non-Linear Signal Processing (HHT, Extreme Event Detection, Robustness)
    7. Structural VAR Analysis (Estimation, Regime Comparison, Identification, Wald Test)
    8. Synthesis and Reporting

    Args:
        df_chain_raw (pd.DataFrame): Raw ERC-20 transfer logs.
        df_market_raw (pd.DataFrame): Raw market OHLCV data.
        study_config (Dict[str, Any]): Configuration dictionary.

    Returns:
        OBPSPipelineResult: A container holding all intermediate and final artifacts.
    """
    print("="*80)
    print("STARTING OBPS PIPELINE EXECUTION")
    print("="*80)

    # -------------------------------------------------------------------------
    # Phase 1: Configuration & Validation (Task 1)
    # -------------------------------------------------------------------------
    print("\n[Phase 1] Configuration Validation")
    validated_config = validate_study_config(study_config)

    # -------------------------------------------------------------------------
    # Phase 2: Chain Data Processing (Tasks 2-9)
    # -------------------------------------------------------------------------
    print("\n[Phase 2] On-Chain Data Processing")

    # Task 2: Validate Structure
    chain_val_res = validate_df_chain_raw(df_chain_raw, validated_config.schemas)

    # Task 4: Filter Time/Status
    chain_filtered = filter_chain_data(
        chain_val_res.validated_df,
        validated_config.meta,
        study_config["preprocessing"]
    )

    # Task 5: Normalize Values
    chain_norm = normalize_chain_data(
        chain_filtered,
        validated_config.schemas,
        study_config["preprocessing"]
    )

    # Task 6: Deduplicate
    chain_dedup = deduplicate_chain_data(chain_norm)

    # Task 7: Classify Topology
    chain_class = classify_chain_topology(chain_dedup)

    # Task 8: Aggregate Daily
    chain_agg = aggregate_chain_volumes(chain_class, validated_config.meta)

    # Task 9: Validate Series
    validate_daily_chain_series(chain_agg, validated_config.meta)

    # -------------------------------------------------------------------------
    # Phase 3: Market Data Processing (Tasks 3, 10-11)
    # -------------------------------------------------------------------------
    print("\n[Phase 3] Market Data Processing")

    # Task 3: Validate Structure
    market_val_res = validate_df_market_raw(
        df_market_raw,
        validated_config.schemas,
        validated_config.meta
    )

    # Task 10: Cleanse
    market_clean = cleanse_market_data(market_val_res, validated_config.meta)

    # Task 11: Extract Series
    market_series = extract_market_series(market_clean)

    # -------------------------------------------------------------------------
    # Phase 4: Panel Construction (Tasks 12-15)
    # -------------------------------------------------------------------------
    print("\n[Phase 4] Panel Construction")

    # Task 12: Merge
    merged_panel = merge_data_sources(chain_agg, market_series)

    # Task 13: Log Transform
    log_data = construct_log_series(merged_panel)

    # Task 14: Stationarity Tests
    adf_results = perform_stationarity_tests(
        log_data,
        study_config["preprocessing"]
    )

    # Task 15: Finalize Integration Order
    finalized_data = finalize_integration_order(
        log_data,
        adf_results,
        study_config["preprocessing"]
    )

    # -------------------------------------------------------------------------
    # Phase 5: Structural Break Analysis (Tasks 16-21)
    # -------------------------------------------------------------------------
    print("\n[Phase 5] Structural Break Analysis")

    # Task 20: Orchestrate Detection (covers Tasks 16-19 logic)
    break_results = orchestrate_structural_breaks(
        finalized_data,
        validated_config.analysis
    )

    # Task 21: Robustness
    break_robustness = execute_aaft_robustness(
        finalized_data,
        break_results,
        validated_config.analysis
    )

    # -------------------------------------------------------------------------
    # Phase 6: HHT Analysis (Tasks 22-26)
    # -------------------------------------------------------------------------
    print("\n[Phase 6] Non-Linear Signal Processing (HHT)")

    # Task 25: Orchestrate HHT (covers Tasks 22-24 logic)
    hht_results = orchestrate_hht(
        finalized_data,
        validated_config.analysis
    )

    # Task 26: Robustness
    hht_robustness = execute_hht_robustness(
        finalized_data,
        hht_results,
        validated_config.analysis
    )

    # -------------------------------------------------------------------------
    # Phase 7: SVAR Analysis (Tasks 27-31)
    # -------------------------------------------------------------------------
    print("\n[Phase 7] Structural VAR Analysis")

    # Task 27: Prepare Inputs
    svar_input = prepare_svar_inputs(
        finalized_data,
        validated_config.analysis
    )

    # Task 28: Estimate Full VAR
    var_model = estimate_reduced_var(
        svar_input,
        validated_config.analysis
    )

    # Task 29: Estimate Regimes
    regime_vars = estimate_regime_vars(
        svar_input,
        var_model
    )

    # Task 30: Identify Shocks
    svar_id = identify_structural_shocks(
        regime_vars,
        svar_input,
        validated_config.analysis
    )

    # Task 31: Wald Test
    wald_res = perform_wald_test(regime_vars)

    # -------------------------------------------------------------------------
    # Phase 8: Synthesis (Task 32)
    # -------------------------------------------------------------------------
    print("\n[Phase 8] Synthesis and Reporting")

    final_report = synthesize_study_results(
        break_results,
        break_robustness,
        hht_results,
        hht_robustness,
        wald_res,
        svar_id
    )

    print("="*80)
    print("OBPS PIPELINE EXECUTION COMPLETE")
    print("="*80)

    return OBPSPipelineResult(
        config=validated_config,
        chain_validation=chain_val_res,
        market_validation=market_val_res,
        final_panel=finalized_data,
        stationarity_tests=adf_results,
        structural_breaks=break_results,
        break_robustness=break_robustness,
        hht_analysis=hht_results,
        hht_robustness=hht_robustness,
        svar_model=var_model,
        svar_regimes=regime_vars,
        svar_identification=svar_id,
        wald_test=wald_res,
        final_report=final_report
    )

