# `README.md`

# ACT-Tensor: A Complete Implementation of the Tensor Completion Framework for Financial Dataset Imputation

<!-- PROJECT SHIELDS -->
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.9%2B-blue.svg)](https://www.python.org/)
[![arXiv](https://img.shields.io/badge/arXiv-2508.01861-b31b1b.svg)](https://arxiv.org/abs/2508.01861)
[![Conference](https://img.shields.io/badge/Conference-ICAIF%20'25-9cf)](https://icaif.acm.org/2025/)
[![Year](https://img.shields.io/badge/Year-2025-purple)](https://github.com/chirindaopensource/act_tensor)
[![Discipline](https://img.shields.io/badge/Discipline-Quantitative%20Finance%20%7C%20ML-00529B)](https://github.com/chirindaopensource/act_tensor)
[![Data Sources](https://img.shields.io/badge/Data-CRSP%20%7C%20Compustat-lightgrey)](https://github.com/chirindaopensource/act_tensor)
[![Core Method](https://img.shields.io/badge/Method-Tensor%20Decomposition%20%7C%20K--Means-orange)](https://github.com/chirindaopensource/act_tensor)
[![Analysis](https://img.shields.io/badge/Analysis-Asset%20Pricing%20%7C%20Portfolio%20Sorts-red)](https://github.com/chirindaopensource/act_tensor)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Type Checking: mypy](https://img.shields.io/badge/type%20checking-mypy-blue)](http://mypy-lang.org/)
[![NumPy](https://img.shields.io/badge/numpy-%23013243.svg?style=flat&logo=numpy&logoColor=white)](https://numpy.org/)
[![Pandas](https://img.shields.io/badge/pandas-%23150458.svg?style=flat&logo=pandas&logoColor=white)](https://pandas.pydata.org/)
[![Scikit-learn](https://img.shields.io/badge/scikit--learn-%23F7931E.svg?style=flat&logo=scikit-learn&logoColor=white)](https://scikit-learn.org/)
[![SciPy](https://img.shields.io/badge/SciPy-%2302557B.svg?style=flat&logo=scipy&logoColor=white)](https://scipy.org/)
[![TensorLy](https://img.shields.io/badge/TensorLy-4B0082-blueviolet)](http://tensorly.org/stable/index.html)
[![Matplotlib](https://img.shields.io/badge/matplotlib-%2311557c.svg?style=flat&logo=matplotlib&logoColor=white)](https://matplotlib.org/)
[![Jupyter](https://img.shields.io/badge/Jupyter-%23F37626.svg?style=flat&logo=Jupyter&logoColor=white)](https://jupyter.org/)
---

**Repository:** `https://github.com/chirindaopensource/act_tensor`

**Owner:** 2025 Craig Chirinda (Open Source Projects)

This repository contains an **independent**, professional-grade Python implementation of the research methodology from the 2025 paper entitled **"ACT-Tensor: Tensor Completion Framework for Financial Dataset Imputation"** by:

*   Junyi Mo
*   Jiayu Li
*   Duo Zhang
*   Elynn Chen

The project provides a complete, end-to-end computational framework for replicating the paper's findings. It delivers a modular, auditable, and extensible pipeline that executes the entire research workflow: from rigorous data validation and characteristic engineering to the core tensor imputation algorithm and the final downstream asset pricing evaluation.

## Table of Contents

- [Introduction](#introduction)
- [Theoretical Background](#theoretical-background)
- [Features](#features)
- [Methodology Implemented](#methodology-implemented)
- [Core Components (Notebook Structure)](#core-components-notebook-structure)
- [Key Callable: `run_complete_act_tensor_replication`](#key-callable-run_complete_act_tensor_replication)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Input Data Structure](#input-data-structure)
- [Usage](#usage)
- [Output Structure](#output-structure)
- [Project Structure](#project-structure)
- [Customization](#customization)
- [Contributing](#contributing)
- [Recommended Extensions](#recommended-extensions)
- [License](#license)
- [Citation](#citation)
- [Acknowledgments](#acknowledgments)

## Introduction

This project provides a Python implementation of the methodologies presented in the 2025 paper "ACT-Tensor: Tensor Completion Framework for Financial Dataset Imputation." The core of this repository is the iPython Notebook `act_tensor_draft.ipynb`, which contains a comprehensive suite of functions to replicate the paper's findings, from initial data validation to the final generation of all analytical tables and figures.

The paper introduces a novel tensor-based framework to address the pervasive problem of missing data in multi-dimensional financial panels. This codebase operationalizes the ACT-Tensor framework, allowing users to:
-   Rigorously validate and manage the entire experimental configuration via a single YAML file.
-   Process raw CRSP/Compustat data, construct a library of 45 firm characteristics with correct look-ahead bias handling, and transform the data into a 3D tensor.
-   Execute the core ACT-Tensor algorithm: K-Means clustering by data density, followed by a "divide and conquer" CP tensor completion and temporal smoothing.
-   Run a full suite of benchmark models, ablation studies, and sensitivity analyses.
-   Perform a complete downstream asset pricing evaluation to measure the economic significance of the imputed data.

## Theoretical Background

The implemented methods are grounded in multilinear algebra, machine learning, and empirical asset pricing.

**1. CP Tensor Decomposition:**
The core of the imputation model is the CANDECOMP/PARAFAC (CP) decomposition, which approximates a tensor $\mathcal{X}$ as a sum of rank-one tensors. For a 3D tensor, the model is:
$$
\hat{\mathcal{X}} = \sum_{r=1}^{R} \mathbf{u}_r \circ \mathbf{v}_r \circ \mathbf{w}_r
$$
where $\mathbf{U}$, $\mathbf{V}$, and $\mathbf{W}$ are factor matrices. The model is fit by minimizing the reconstruction error on the observed entries $\Omega$ via Alternating Least Squares (ALS).
$$
\min_{\mathbf{U},\mathbf{V},\mathbf{W}} \| \mathcal{P}_{\Omega}(\mathcal{X}) - \mathcal{P}_{\Omega}(\hat{\mathcal{X}}) \|_F^2
$$

**2. Cluster-Based Completion:**
To handle extreme sparsity, firms are first clustered by their data availability patterns using K-Means.
-   **Dense Clusters ($\rho_k \ge \tau$):** Imputed directly using the standard CP-ALS algorithm.
-   **Sparse Clusters ($\rho_k < \tau$):** Imputed by first forming an *augmented* tensor that combines the sparse firms with all dense firms, allowing the sparse firms to "borrow" statistical strength.

**3. Partial Tucker Decomposition (HOSVD):**
For the downstream asset pricing test, a large tensor of portfolio returns $\mathcal{R}$ is created. To extract a parsimonious set of factors, a partial Tucker decomposition is used, which is solved via the Higher-Order Singular Value Decomposition (HOSVD). This finds a low-dimensional core tensor $\mathcal{F}$ and loading matrices that best represent the original data.
$$
\mathcal{R} \approx \mathcal{F} \times_{1} U \times_{2} V \times_{3} W
$$

## Features

The provided iPython Notebook (`act_tensor_draft.ipynb`) implements the full research pipeline, including:

-   **Modular, Multi-Task Architecture:** The entire pipeline is broken down into over 30 distinct, modular tasks, each with its own orchestrator function, ensuring clarity and testability.
-   **Configuration-Driven Design:** All study parameters are managed in an external `config.yaml` file, allowing for easy customization and replication.
-   **High-Fidelity Financial Data Processing:** Includes professional-grade logic for handling CRSP/Compustat data, including look-ahead bias prevention, delisting return incorporation, and outlier cleansing.
-   **Robust Experimental Design:** Programmatically generates the three distinct missingness scenarios (MAR, Block, Logit) with disjoint test sets for rigorous model evaluation.
-   **From-Scratch Algorithm Implementation:** Includes a complete, from-scratch, regularized ALS solver for masked CP tensor decomposition with robust SVD-based initialization.
-   **Comprehensive Evaluation Suite:** Implements not only the main ACT-Tensor model but also all specified baselines, ablation studies, and sensitivity tests, and evaluates them on both statistical and economic metrics.
-   **Automated Reporting:** Concludes by automatically generating all publication-ready tables (including styled LaTeX with color-coding) and figures from the paper.

## Methodology Implemented

The core analytical steps directly implement the methodology from the paper:

1.  **Data Preparation (Tasks 1-6):** Ingests and validates the `config.yaml` and raw data, defines the study universe, constructs 45 characteristics with correct reporting lags, normalizes them, and forms the 3D tensor $\mathcal{X}$.
2.  **Experimental Design (Tasks 7-10):** Generates the three evaluation masks (MAR, Block, Logit) and creates the corresponding training tensors.
3.  **ACT-Tensor Imputation (Tasks 11-18):** Executes the full ACT-Tensor algorithm: K-Means clustering, density partitioning, dense and sparse cluster completion via CP-ALS, global assembly, and temporal smoothing (CMA, EMA, or KF).
4.  **Evaluation & Analysis (Tasks 19-27):** Evaluates the imputation accuracy of ACT-Tensor, all baselines, and all ablation models. Runs the regularization stability test.
5.  **Asset Pricing Pipeline (Tasks 28-34):** Uses the imputed data to construct double-sorted portfolios, extracts latent factors via HOSVD, selects predictive factors via forward stepwise regression, and computes all final asset pricing metrics (alphas, IC, Sharpe Ratio).
6.  **Master Orchestration (Tasks 35-37):** Provides top-level functions to run the entire experimental suite and automatically generate all final reports.

## Core Components (Notebook Structure)

The `act_tensor_draft.ipynb` notebook is structured as a logical pipeline with modular orchestrator functions for each of the major tasks. All functions are self-contained, fully documented with type hints and docstrings, and designed for professional-grade execution.

## Key Callable: `run_complete_act_tensor_replication`

The project is designed around a single, top-level user-facing interface function:

-   **`run_complete_act_tensor_replication`:** This master orchestrator function, located in the final section of the notebook, runs the entire automated research pipeline from end-to-end. A single call to this function reproduces the entire computational portion of the project, from data validation to the final report generation.

## Prerequisites

-   Python 3.9+
-   Core dependencies: `pandas`, `numpy`, `scikit-learn`, `scipy`, `tensorly`, `matplotlib`, `seaborn`, `pyyaml`, `pyarrow`.

## Installation

1.  **Clone the repository:**
    ```sh
    git clone https://github.com/chirindaopensource/act_tensor.git
    cd act_tensor
    ```

2.  **Create and activate a virtual environment (recommended):**
    ```sh
    python -m venv venv
    source venv/bin/activate  # On Windows, use `venv\Scripts\activate`
    ```

3.  **Install Python dependencies:**
    ```sh
    pip install pandas numpy scikit-learn scipy tensorly matplotlib seaborn pyyaml pyarrow
    ```

## Input Data Structure

The pipeline requires a `pandas.DataFrame` with a specific, comprehensive schema containing merged CRSP and Compustat data. The exact schema is validated by the `validate_and_enforce_schema` function in the notebook. All other parameters are controlled by the `config.yaml` file.

## Usage

The `act_tensor_draft.ipynb` notebook provides a complete, step-by-step guide. The primary workflow is to execute the final cell of the notebook, which demonstrates how to use the top-level `run_complete_act_tensor_replication` orchestrator:

```python
# Final cell of the notebook

# This block serves as the main entry point for the entire project.
if __name__ == '__main__':
    # Define the paths to the necessary input files.
    # The user must provide their own raw data file in the specified format.
    # A synthetic data generator is included in the notebook for demonstration.
    RAW_DATA_FILE = "data/raw/crsp_compustat_merged.parquet"
    CONFIG_FILE = "config/act_tensor_config.yaml"
    
    # Define the top-level directory for all outputs.
    RESULTS_DIRECTORY = "replication_output"

    # Execute the entire replication study.
    run_complete_act_tensor_replication(
        data_path=RAW_DATA_FILE,
        config_path=CONFIG_FILE,
        base_output_dir=RESULTS_DIRECTORY
    )
```

## Output Structure

The pipeline creates a `base_output_dir` with a highly structured set of outputs. For each experimental run, a unique timestamped subdirectory is created, containing:
-   A detailed `pipeline_log.log` file.
-   A comprehensive `run_summary.json` with all computed metrics.
-   Subdirectories for all intermediate artifacts (e.g., `X_train.npz`, `cluster_assignments.csv`, `ap_ACT-Tensor_CMA/`).

At the top level, two final directories are created:
-   `final_tables/`: Contains all aggregated results in CSV and styled LaTeX formats.
-   `final_figures/`: Contains all generated plots in PDF format.

## Project Structure

```
act_tensor/
│
├── act_tensor_draft.ipynb    # Main implementation notebook with all 38 tasks
├── config.yaml               # Master configuration file
├── requirements.txt          # Python package dependencies
├── data/
│   └── raw/
│       └── crsp_compustat_merged.parquet (User-provided)
│
├── replication_output/       # Example output directory
│   ├── final_tables/
│   ├── final_figures/
│   └── MAR_CMA_20251026_103000/
│       ├── run_summary.json
│       └── ...
│
├── LICENSE                   # MIT Project License File
└── README.md                 # Project 'README.md' file
```

## Customization

The pipeline is highly customizable via the `config.yaml` file. Users can easily modify all study parameters, including date ranges, model hyperparameters (ranks, clusters), smoother settings, and asset pricing specifications, without altering the core Python code.

## Contributing

Contributions are welcome. Please fork the repository, create a feature branch, and submit a pull request with a clear description of your changes. Adherence to PEP 8, type hinting, and comprehensive docstrings is required.

## Recommended Extensions

Future extensions could include:
-   **Alternative Decompositions:** Integrating other tensor decompositions like the Tucker model or Tensor Train (TT) into the imputation framework.
-   **GPU Acceleration:** Adapting the core numerical algorithms (ALS, SVD) to run on GPUs using libraries like CuPy or PyTorch for significant speedups.
-   **Advanced Clustering:** Exploring more sophisticated clustering methods beyond K-Means, such as hierarchical clustering or density-based methods (DBSCAN).
-   **Dynamic Factor Models:** Extending the downstream asset pricing evaluation to use dynamic factor models that can capture time-varying factor loadings.

## License

This project is licensed under the MIT License.

## Citation

If you use this code or the methodology in your research, please cite the original paper:

```bibtex
@inproceedings{mo2025act,
  author    = {Mo, Junyi and Li, Jiayu and Zhang, Duo and Chen, Elynn},
  title     = {{ACT-Tensor: Tensor Completion Framework for Financial Dataset Imputation}},
  booktitle = {Proceedings of the 6th ACM International Conference on AI in Finance},
  series    = {ICAIF '25},
  year      = {2025},
  publisher = {ACM},
  note      = {arXiv:2508.01861}
}
```

For the implementation itself, you may cite this repository:
```
Chirinda, C. (2025). A Professional-Grade Implementation of the ACT-Tensor Framework.
GitHub repository: https://github.com/chirindaopensource/act_tensor
```

## Acknowledgments

-   Credit to **Junyi Mo, Jiayu Li, Duo Zhang, and Elynn Chen** for the foundational research that forms the entire basis for this computational replication.
-   This project is built upon the exceptional tools provided by the open-source community. Sincere thanks to the developers of the scientific Python ecosystem, including **Pandas, NumPy, Scikit-learn, SciPy, TensorLy, Matplotlib, and Jupyter**.

--

*This README was generated based on the structure and content of the `act_tensor_draft.ipynb` notebook and follows best practices for research software documentation.*

# Paper

Title: "*ACT-Tensor: Tensor Completion Framework for Financial Dataset Imputation*"

Authors: Junyi Mo, Jiayu Li, Duo Zhang, Elynn Chen

E-Journal Submission Date: 3 August 2025 (V1), last revised 8 October 2025 (this version, V3)

Conference Affiliation: The 6th ACM International Conference on AI in Finance (ICAIF 2025)

Link: https://arxiv.org/abs/2508.01861

Abstract:

Missing data in financial panels presents a critical obstacle, undermining asset-pricing models and reducing the effectiveness of investment strategies. Such panels are often inherently multi-dimensional, spanning firms, time, and financial variables, which adds complexity to the imputation task. Conventional imputation methods often fail by flattening the data's multidimensional structure, struggling with heterogeneous missingness patterns, or overfitting in the face of extreme data sparsity. To address these limitations, we introduce an Adaptive, Cluster-based Temporal smoothing tensor completion framework (ACT-Tensor) tailored for severely and heterogeneously missing multi-dimensional financial data panels. ACT-Tensor incorporates two key innovations: a cluster-based completion module that captures cross-sectional heterogeneity by learning group-specific latent structures; and a temporal smoothing module that proactively removes short-lived noise while preserving slow-moving fundamental trends. Extensive experiments show that ACT-Tensor consistently outperforms state-of-the-art benchmarks in terms of imputation accuracy across a range of missing data regimes, including extreme sparsity scenarios. To assess its practical financial utility, we evaluate the imputed data with an asset-pricing pipeline tailored for tensor-structured financial data. Results show that ACT-Tensor not only reduces pricing errors but also significantly improves risk-adjusted returns of the constructed portfolio. These findings confirm that our method delivers highly accurate and informative imputations, offering substantial value for financial decision-making.

# Summary

### Summary: ACT-Tensor Framework

The work provides a sophisticated, domain-aware solution to the problem of imputing missing values in financial panel data. The authors correctly identify that this data is not a simple table but a three-dimensional tensor: **Firms × Characteristics × Time**. Their core contribution is a framework that respects this structure to produce imputations that are not only statistically accurate but, more importantly, financially meaningful.

--

#### **The Problem Formulation and Critique of Existing Methods**

The paper begins by framing the central challenge. Financial datasets, like CRSP/Compustat, are notoriously incomplete. This missingness is not random; it is systematic, disproportionately affecting smaller, younger, or distressed firms. This is a classic selection bias problem.

The authors correctly dismiss naive solutions:
*   **Discarding data:** Leads to a biased sample, focusing only on the most stable, well-documented firms.
*   **Simple fills (e.g., cross-sectional median):** This is a blunt instrument that destroys valuable idiosyncratic information and temporal dynamics.
*   **Matrix-based methods (e.g., PCA, matrix factorization):** These methods are a step up but require "flattening" the 3D tensor into a 2D matrix. This act is destructive, as it either discards the time dimension or treats it as just another feature, ignoring crucial temporal dependencies like autocorrelation and trends.
*   **Standard Tensor Completion:** While structurally superior, these methods often assume a single, global low-rank structure for all firms. This is a strong and likely incorrect assumption, as a tech startup and a utility company do not share the same latent drivers. Furthermore, they become unstable under the extreme sparsity seen in financial data.

--

#### **The Proposed Solution - The ACT-Tensor Architecture**

The authors' proposed solution, ACT-Tensor, is a multi-stage framework designed to overcome these limitations. The name itself is an acronym for its key innovations: **Adaptive, Cluster-based, Temporal smoothing**.

The process can be understood as two primary modules:

**Module A: Cluster-based Completion (The "Divide and Conquer" Strategy)**

This is the most clever part of the framework. Instead of imposing one model on all firms, they partition the firms based on data quality.

1.  **Clustering:** Firms are grouped using K-means clustering. The feature for clustering is not a financial characteristic but the *pattern of missing data* itself—specifically, the observed-entry rate for each firm's time-characteristic matrix.
2.  **Categorization:** Each cluster is labeled as either "dense" (high data availability) or "sparse" (low data availability) based on a predefined threshold (τ = 40%).
3.  **Differential Imputation:**
    *   **For Dense Clusters:** A standard CANDECOMP/PARAFAC (CP) tensor decomposition is applied directly to the sub-tensor for each dense cluster. With sufficient data, this is a stable procedure.
    *   **For Sparse Clusters:** This is where the innovation lies. To avoid overfitting on the sparse data, the model "borrows statistical strength." For each sparse cluster, it creates a temporary, aggregated tensor containing the firms from that sparse cluster *plus all firms from all dense clusters*. It then performs the CP completion on this larger, more stable tensor. Finally, it slices out the imputed data corresponding only to the original sparse firms.

**Module B: Temporal Smoothing (The "Noise Reduction" Filter)**

After the tensor is fully imputed, the authors recognize that the raw imputed values can still contain high-frequency noise. Since most firm characteristics are fundamentals that evolve slowly, they apply a temporal smoothing filter to each firm's characteristic time series. They test three methods:
*   Centered Moving Average (CMA)
*   Exponential Moving Average (EMA)
*   Kalman Filter (KF)

The results show that the simple, symmetric **Centered Moving Average (CMA)** performs best, effectively filtering out noise while preserving underlying trends.

--

#### **The Dual Evaluation Methodology - Statistical and Financial**

This is a mark of excellent, practical research. The authors are not content with merely showing a lower Root Mean Squared Error (RMSE). They test whether their statistically superior imputations translate into tangible financial utility.

**1. Statistical Accuracy Evaluation:**
*   They artificially mask known data points under three realistic scenarios: Missing-at-Random (MAR), Block Missingness (simulating a firm's data being unavailable for a full year), and Logistic Missingness (a more complex, firm-specific pattern).
*   They measure performance using standard metrics: RMSE, MAE, and an imputation R-squared (R²_imp).

**2. Financial Utility Evaluation (The "Downstream Task"):**
This is the crucial out-of-sample test. The imputed data is fed into a state-of-the-art asset pricing pipeline:
1.  **Portfolio Construction:** The imputed characteristics are used to double-sort firms into portfolios (e.g., by size and value), creating a tensor of portfolio returns.
2.  **Factor Extraction:** A Tucker decomposition is used on this return tensor to extract a parsimonious set of latent risk factors that drive returns.
3.  **Prediction and Strategy Formulation:** These factors are used to predict future returns. A "Top-minus-Bottom" (T-B) long-short portfolio is constructed based on these predictions.
4.  **Evaluation Metrics:** The quality is assessed using metrics central to quantitative finance:
    *   **Pricing Error (RMSEα):** How well do the factors explain returns? Lower is better.
    *   **Information Coefficient (IC):** The correlation between predicted and realized returns. Higher is better.
    *   **Sharpe Ratio:** The risk-adjusted return of the T-B strategy. This is the ultimate measure of profitability.

--

#### **The Empirical Results and Key Findings**

The results are compelling and validate the authors' design choices.

*   **Superior Imputation Accuracy:** ACT-Tensor consistently outperforms all benchmarks (Cross-Sectional Median, Global and Local matrix factorization models) across all missingness regimes, particularly the more structured and realistic Block and Logit scenarios (Table 2).
*   **Robustness to Sparsity:** The framework's advantage is most pronounced when stress-tested on the sparsest clusters of firms, demonstrating the success of the "borrowing strength" concept.
*   **Synergy of Modules:** The ablation study confirms that both the clustering and temporal smoothing modules add value, and their combination is synergistic.
*   **Direct Financial Value:** The imputed data from ACT-Tensor leads to asset pricing models with significantly lower pricing errors and, critically, generates trading strategies with **more than double the Information Coefficient and a Sharpe Ratio roughly twice as high** as those from benchmark methods (Table 3). This is a powerful demonstration of turning a data science improvement into alpha.
*   **Inherent Regularization:** The model is shown to be stable without needing an explicit L2 penalty, simplifying its application.

--

#### **Conclusion and My Final Take**

The ACT-Tensor framework is a significant contribution. It provides a robust, principled, and practical solution to a fundamental data problem in quantitative finance.

The key takeaway is that **structure and domain knowledge matter**. By preserving the natural tensor structure of the data, accommodating firm heterogeneity through clustering, and enforcing temporal smoothness consistent with financial theory, the authors have built a tool that doesn't just fill in numbers—it recovers meaningful economic signals. The rigorous dual-evaluation framework, connecting statistical accuracy to portfolio profitability, sets a high standard for future research in this area. This is a fine example of how sophisticated machine learning can be thoughtfully applied to solve real-world financial problems.

# Import Essential Modules

In [None]:
#!/usr/bin/env python3
# ==============================================================================#
#
#  ACT-Tensor: A Complete Implementation of the Tensor Completion Framework
#  for Financial Dataset Imputation
#
#  This module provides a complete, production-grade, and high-fidelity Python
#  implementation of the "Adaptive, Cluster-based Temporal smoothing" (ACT-Tensor)
#  framework, as presented in the paper by Mo, Li, Zhang, and Chen (2025). It
#  delivers a robust, end-to-end system for transforming sparse, multi-dimensional
#  financial data panels into dense, coherent, and machine-readable "digital twins"
#  of the firm-characteristic universe.
#
#  The pipeline is designed to address the critical challenges of missing data in
#  quantitative finance, enabling more accurate asset pricing models, robust risk
#  management, and superior data-driven investment decision-making.
#
#  Core Methodological Components:
#  • Data Preprocessing: Rigorous validation, universe definition, and look-ahead
#    bias-free construction of a 45-characteristic library.
#  • Tensor Representation: Transformation of the panel data into a 3D tensor
#    (Time x Firm x Characteristic) with cross-sectional rank normalization.
#  • Cluster-based Completion: A "divide and conquer" strategy that partitions
#    firms by data density (via K-Means) and applies tailored CP tensor completion
#    (unpenalized for dense clusters, augmented for sparse clusters) using a
#    from-scratch, regularized Alternating Least Squares (ALS) solver.
#  • Temporal Smoothing: A post-processing module to filter noise from imputed
#    series using Centered Moving Average (CMA), Exponential Moving Average (EMA),
#    or a Kalman Filter/Smoother with MLE-based hyperparameter tuning.
#  • Downstream Economic Evaluation: A full-scale asset pricing pipeline that uses
#    the imputed data to construct double-sorted portfolios, extract latent factors
#    via Higher-Order SVD (HOSVD), and build predictive factor models to assess
#    economic significance (e.g., Sharpe Ratios, Information Coefficient).
#
#  Technical Implementation Features:
#  • Modular, Functional Design: The entire pipeline is composed of discrete,
#    reusable, and rigorously tested functions, managed by high-level orchestrators.
#  • Reproducibility Engine: A complete experimental suite manager that handles
#    all three missingness regimes (MAR, Block, Logit), all model variations,
#    and all ablation/sensitivity tests in a fully automated and resumable fashion.
#  • Professional-Grade Code: Features detailed typehints, comprehensive docstrings,
#    line-by-line comments, robust error handling, and adherence to PEP-8 standards.
#  • Performance: Leverages vectorized NumPy, SciPy, and Pandas operations, along
#    with the TensorLy library for optimized multilinear algebra.
#
#  Paper Reference:
#  Mo, J., Li, J., Zhang, D., & Chen, E. (2025). ACT-Tensor: Tensor Completion
#  Framework for Financial Dataset Imputation. In 6th ACM International Conference
#  on AI in Finance (ICAIF '25).
#  https://arxiv.org/abs/2508.01861
#
#  Author: CS Chirinda
#  License: MIT
#  Version: 1.0.0
#
# ==============================================================================#

# ==============================================================================
# Consolidated Imports for the ACT-Tensor Pipeline
# ==============================================================================

# Standard Library Imports
import copy
import json
import logging
import os
import random
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

# Third-Party Scientific Computing Libraries
import numpy as np
import pandas as pd
import tensorly as tl
from pandas.io.formats.style import Styler
from scipy.ndimage import uniform_filter1d
from scipy.signal import lfilter, lfiltic
from scipy.stats import rankdata
from sklearn.cluster import KMeans
from sklearn.linear_model import Ridge
from tensorly.tenalg import multi_mode_dot

# Third-Party Visualization Libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Configure a basic logger for this module
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


# Implementation

## Draft 1

### **Explication of the Key ACT-Tensor Pipeline Callables**

#### **Task 1: `validate_config`**

*   **Inputs:** A Python dictionary (`act_tensor_config`) containing all study parameters.
*   **Process:** This function acts as a rigorous gatekeeper. It systematically validates the input dictionary's structure, types, and value ranges against a predefined schema derived from the paper's methodology. It checks for the presence of all required keys, verifies date consistency, validates numerical hyperparameters (e.g., ensuring `density_threshold_tau` is in $(0, 1]$), and confirms choices for categorical parameters.
*   **Outputs:** The original, validated configuration dictionary. As a side effect, it sets global random seeds for reproducibility and saves a timestamped JSON snapshot of the configuration for an audit trail.
*   **Role in Pipeline:** This callable implements the foundational step of **parameter validation and experimental setup**. It ensures that the entire pipeline operates on a consistent, correct, and explicitly defined set of parameters, preventing a vast class of common errors before any computation begins. It does not directly implement an equation from the paper but is a non-negotiable prerequisite for a reproducible scientific workflow.



#### **Task 2: `validate_and_enforce_schema`**

*   **Inputs:** A raw `pandas.DataFrame` containing the merged CRSP/Compustat data.
*   **Process:** The function transforms the raw, untyped input `DataFrame` into a structured, schema-compliant panel. It verifies the presence of all required columns, converts the `date` column to a `DatetimeIndex`, and rigorously enforces the correct data type for every other column (e.g., `int64`, `float64`, `datetime64[ns]`) using `errors='coerce'` to handle parsing failures gracefully. It concludes with a series of domain-specific sanity checks on core financial fields.
*   **Outputs:** A schema-compliant `pandas.DataFrame` with a `DatetimeIndex`. As a side effect, it saves a data quality report and checkpoints the validated `DataFrame` to a Parquet file.
*   **Role in Pipeline:** This callable performs the critical step of **data validation and sanitization**. It ensures that the raw data conforms to the expected structure and types before any financial logic is applied, preventing downstream errors due to incorrect data formats.



#### **Task 3: `cleanse_and_filter_universe`**

*   **Inputs:** The schema-validated `DataFrame` from Task 2.
*   **Process:** This function sculpts the validated data into the final analytical sample. It applies a sequence of filters: (1) a temporal filter to restrict the date range, (2) a security filter to include only US common stocks on major exchanges (`shrcd`, `exchcd`), (3) a deterministic deduplication process to ensure a unique `(date, permno)` observation, and (4) a set of outlier removal rules for price, returns, and market capitalization. It also correctly incorporates delisting returns into the main return series using the formula $r_{\text{combined}} = (1 + r_{t}) \times (1 + r_{\text{delist}}) - 1$.
*   **Outputs:** A fully cleansed `pandas.DataFrame` with a unique `(date, permno)` `MultiIndex`, representing the final universe of firm-month observations.
*   **Role in Pipeline:** This callable implements the **universe definition and data cleansing** stage, a standard and essential procedure in empirical asset pricing to ensure the analysis is performed on a well-defined and economically meaningful set of securities.



#### **Task 4: `construct_characteristics`**

*   **Inputs:** The cleansed `DataFrame` from Task 3.
*   **Process:** This is the primary feature engineering step. It first performs a critical temporal alignment of quarterly accounting data to the monthly frequency, enforcing a reporting lag (e.g., 6 months) to prevent look-ahead bias. It then uses this aligned data to compute the 45 firm characteristics (e.g., Book-to-Market, Momentum) according to their precise financial formulas.
*   **Outputs:** A wide `pandas.DataFrame` with the `(date, permno)` `MultiIndex` and 45 new columns, one for each raw characteristic.
*   **Role in Pipeline:** This callable **constructs the raw signals** that will be used for imputation and asset pricing. Its most critical role is the rigorous handling of reporting lags to ensure the information used to construct characteristics at month `t` was actually available to investors at that time.



#### **Task 5: `rank_normalize_characteristics`**

*   **Inputs:** The `DataFrame` of raw characteristics from Task 4.
*   **Process:** The function transforms the raw characteristics into a unit-free, comparable format. For each month, it cross-sectionally ranks the firms for each characteristic. It then applies a linear transformation to rescale these ranks to the interval `[-0.5, 0.5]`.
*   **Outputs:** A `pandas.DataFrame` of the same shape, but with the raw values replaced by their normalized ranks.
*   **Role in Pipeline:** This implements the **cross-sectional normalization** described in the "Dataset" section of the paper. This is a crucial step before tensor formation, as it prevents characteristics with large scales (like market cap) from dominating the factorization and ensures all signals are on an equal footing. The transformation for a rank $r_{t,n,\ell}$ out of $N_{t,\ell}$ firms is:
    $$
    \tilde{x}_{t,n,\ell} = \frac{r_{t,n,\ell} - 1}{N_{t,\ell} - 1} - 0.5
    $$



#### **Task 6: `form_tensor_and_observed_set`**

*   **Inputs:** The `DataFrame` of normalized characteristics from Task 5.
*   **Process:** This function transforms the 2D panel data representation into the 3D tensor structure required by the model. It first creates deterministic integer mappings for the time and firm dimensions. It then allocates a `(T, N, L)` NumPy array initialized with `NaN`s and populates it with the observed, normalized characteristic values. Finally, it creates a boolean mask of the same shape to represent the set of observed entries, $\Omega$.
*   **Outputs:** A tuple containing the `(T, N, L)` data tensor $\mathcal{X}$ and the `(T, N, L)` boolean mask $\Omega$.
*   **Role in Pipeline:** This callable performs the **tensorization** of the data, moving from a `pandas` panel to the `numpy` multilinear array that is the central data structure for the ACT-Tensor model.



#### **Task 7: `create_mar_mask`**

*   **Inputs:** The observed set mask $\Omega$.
*   **Process:** This function implements the Missing-At-Random (MAR) experimental condition. It identifies all observed entries in the tensor, and then performs a uniform random sampling without replacement to select 10% of them to be part of the held-out evaluation set.
*   **Outputs:** A boolean tensor of shape `(T, N, L)` where `True` indicates an entry that is part of the MAR test set.
*   **Role in Pipeline:** This is the first of three functions that **create the experimental conditions** for evaluating the imputation models.



#### **Task 8: `create_block_mask`**

*   **Inputs:** The observed set mask $\Omega$.
*   **Process:** This function implements the Block Missingness experimental condition. For each firm-characteristic time series, it probabilistically places a contiguous 12-month block of missingness. The placement is biased towards the beginning of the series to mimic real-world data gaps.
*   **Outputs:** A boolean tensor of shape `(T, N, L)` representing the Block test set.
*   **Role in Pipeline:** This function creates a more structured and challenging test set to evaluate the model's ability to handle non-random, temporally correlated missingness.



#### **Task 9: `create_logit_mask`**

*   **Inputs:** The observed set mask $\Omega$ and the cleansed `DataFrame` (for covariates).
*   **Process:** This function implements the most sophisticated experimental condition, Logistic Missingness. It uses a two-stage probabilistic model where the probability of an entry being missing depends on firm characteristics like size and age. It includes a calibration loop to ensure the total number of masked entries matches the 10% target.
*   **Outputs:** A boolean tensor of shape `(T, N, L)` representing the Logit test set.
*   **Role in Pipeline:** This function simulates a "Missing Not At Random" (MNAR) scenario, providing the most realistic and rigorous test of the imputation framework's ability to handle systematic biases in missingness.



#### **Task 10: `create_training_tensors`**

*   **Inputs:** The ground-truth tensor $\mathcal{X}$ and an evaluation mask (e.g., `mask_eval_MAR`).
*   **Process:** This function prepares the data for model training. It takes the ground-truth tensor and replaces the values at the locations specified by the evaluation mask with `NaN`s. It also creates the corresponding training mask, which is the set of all originally observed entries *minus* the evaluation set.
*   **Outputs:** A tuple containing the training tensor `X_train` and the training mask `mask_train`.
*   **Role in Pipeline:** This callable **enforces the separation of training and testing data** at the tensor level, a critical step for a valid out-of-sample evaluation.



#### **Task 11: `cluster_firms_by_profile`**

*   **Inputs:** The training tensor `X_train`.
*   **Process:** This function implements the first step of the ACT-Tensor algorithm. It vectorizes each firm's `(T, L)` slice of the tensor and applies the K-Means algorithm to partition the `N` firms into `K` clusters based on the similarity of their data profiles (which are dominated by missingness patterns).
*   **Outputs:** A dictionary mapping each cluster ID to a list of the firm indices belonging to it.
*   **Role in Pipeline:** This callable implements the **firm clustering** based on the K-Means objective function:
    $$
    \min_{z \in \{1, \dots, K\}^{N}, \{\boldsymbol{\mu}_{k}\}_{k=1}^{K}} \sum_{n=1}^{N} \left\| \mathbf{v}_{n} - \boldsymbol{\mu}_{z_{n}} \right\|_{2}^{2}
    $$
    where $\mathbf{v}_n$ is the vectorized profile of firm $n$.



#### **Task 12: `compute_densities_and_partition`**

*   **Inputs:** The training mask `mask_train` and the cluster assignments from Task 11.
*   **Process:** For each cluster, this function calculates its data density: the fraction of observed entries in its corresponding sub-tensor. It then compares this density to the threshold $\tau$ (e.g., 0.40) to classify the cluster as either "dense" or "sparse".
*   **Outputs:** Two lists of cluster IDs: one for the dense clusters and one for the sparse clusters.
*   **Role in Pipeline:** This callable implements the **cluster classification** that directs the "divide and conquer" strategy. It is based on the density formula:
    $$
    \rho_{k} = \frac{1}{|\mathcal{I}_{k}| \cdot T \cdot L} \sum_{n \in \mathcal{I}_{k}} \sum_{t=0}^{T-1} \sum_{\ell=0}^{L-1} \mathbf{1}\left[(t, n, \ell) \in \Omega_{\text{train}}\right]
    $$



#### **Task 13 & 14 Fused: `_solve_cp_als`, `complete_dense_clusters`, `complete_sparse_clusters`**

*   **Inputs:** The sub-tensors and sub-masks for each cluster, and the dense/sparse classification.
*   **Process:** These functions form the core of the imputation engine.
    *   `complete_dense_clusters` iterates through the dense clusters and calls the canonical solver `_solve_cp_als` with `lambda_reg=0.0` on each dense sub-tensor.
    *   `complete_sparse_clusters` iterates through the sparse clusters. For each, it first constructs an *augmented* tensor by combining the sparse firms with all dense firms. It then calls `_solve_cp_als` on this larger, more stable tensor. Finally, it slices out the imputed data corresponding to the original sparse firms.
    *   `_solve_cp_als` is the workhorse. It implements the Alternating Least Squares algorithm to solve the regularized CP decomposition objective function by iteratively solving the normal equations for each factor matrix. It includes robust SVD-based initialization.
*   **Outputs:** A set of completed (fully dense) sub-tensors, one for each of the `K` clusters, saved to disk.
*   **Role in Pipeline:** These callables implement the central **cluster-wise tensor completion** algorithm. The objective function solved by the ALS algorithm is:
    $$
    \min_{U, V, W} \left\| \mathcal{P}_{\Omega}(\mathcal{X}) - \mathcal{P}_{\Omega}([\![U, V, W]\!]) \right\|_{F}^{2} + \lambda \left( \|U\|_{F}^{2} + \|V\|_{F}^{2} + \|W\|_{F}^{2} \right)
    $$



#### **Task 15: `assemble_completed_tensor`**

*   **Inputs:** The `K` completed sub-tensors from the previous step and the cluster assignments.
*   **Process:** This function reverses the partitioning. It allocates a new, full-sized `(T, N, L)` tensor and then, for each cluster, places its completed sub-tensor into the correct firm "slots" in the global tensor.
*   **Outputs:** A single, fully dense `(T, N, L)` tensor, $\hat{\mathcal{X}}$.
*   **Role in Pipeline:** This callable performs the **global assembly**, completing the "conquer" phase of the imputation strategy.



#### **Task 16, 17, 18: `apply_cma_smoothing`, `apply_ema_smoothing`, `apply_kf_smoothing`**

*   **Inputs:** The assembled, dense tensor $\hat{\mathcal{X}}$.
*   **Process:** These functions implement the three alternative temporal smoothers. Each is applied to every firm-characteristic time series (i.e., along the time axis of the tensor) to filter out short-term noise.
    *   `apply_cma_smoothing` uses a symmetric Centered Moving Average.
    *   `apply_ema_smoothing` uses a one-sided, recursive Exponential Moving Average.
    *   `apply_kf_smoothing` uses a full Kalman Filter and RTS smoother based on a random walk state-space model, with hyperparameters chosen via a Maximum Likelihood grid search.
*   **Outputs:** The final, smoothed, imputed tensor $\tilde{\mathcal{X}}$.
*   **Role in Pipeline:** These callables implement the **temporal smoothing module**, the second key innovation of the ACT-Tensor framework, designed to improve the signal-to-noise ratio of the imputed data.



#### **Tasks 19-34 (Evaluation and Reporting)**

The remaining callables are part of the evaluation and reporting pipeline. Their roles are as follows:
*   `evaluate_imputation_accuracy`: Implements the four core statistical error metrics (RMSE, MAE, MAPE, R²) to measure imputation quality.
*   `evaluate_sparse_cluster_accuracy`: A specialized orchestrator that applies the evaluation to the sparse-cluster subset, stress-testing the model.
*   `run_baseline_imputations`: Implements the three competitor models (Median, BF-XS, B-XS).
*   `evaluate_baseline_methods`: An orchestrator to evaluate the baselines using the same canonical metrics.
*   `run_ablation_*`: A suite of orchestrators to run the ablation experiments (CP-only, CP+Clustering, etc.).
*   `compile_*_results`: A suite of functions to aggregate raw results into formatted tables.
*   `run_regularization_stability_test`: An orchestrator for the $\lambda$-sensitivity analysis.
*   `construct_portfolio_return_tensor`: Implements the double-sort methodology to create the portfolio return tensor $\mathcal{R}$, the primary input for the asset pricing tests.
*   `extract_latent_factors`: Implements the partial Tucker decomposition (HOSVD) to extract the latent factor matrix $\mathbf{F}$ from $\mathcal{R}$.
*   `select_factors_and_predict`: Implements the forward stepwise selection to choose the best predictive factors and generate return forecasts $\hat{\mathbf{R}}$.
*   `compute_asset_pricing_metrics`: Implements the calculation of pricing errors (RMSEα, MAEα) and predictive skill metrics (IC, MAE-Rank).
*   `construct_tb_portfolio`: Simulates the Top-minus-Bottom long-short strategy based on the model's forecasts.
*   `compute_annualized_sharpe_ratio`: Calculates the final risk-adjusted return metric for the T-B strategy.



#### **Tasks 35, 36, 37 (Top-Level Orchestration)**

*   `run_act_tensor_pipeline`: The orchestrator for a single, complete experimental run.
*   `execute_full_experimental_suite`: The master script that manages the entire campaign of experiments.
*   `generate_publication_artifacts`: The final reporting engine that produces all tables and figures.

This concludes the comprehensive explication of every functional component of the ACT-Tensor pipeline.

<br><br>

### **Usage Example**

### **Discussion of the End-to-End Usage Example**

The example will be structured as a self-contained script or notebook cell that performs three key actions: setting up the environment, defining the inputs, and executing the top-level orchestrator.

#### **1. Environment Setup: Creating Synthetic Inputs**

*   **Underlying Concept:** A reproducible example must not depend on proprietary data access. It must provide its own inputs. Therefore, the first step is to programmatically generate high-fidelity synthetic data that conforms *exactly* to the required input schemas.
*   **Fidelity to Concept:**
    1.  **Synthetic `DataFrame`:** The helper function, `_create_synthetic_data`, generates a `pandas.DataFrame`. This `DataFrame` will not be random noise. It will be structured to mimic the properties of real financial data:
        *   It will have all the required columns with the correct `dtypes`.
        *   It will span the correct date range (`2016-01-01` to `2020-12-31`).
        *   It will contain a small number of firms (`N=100`) and generate both monthly and quarterly records to simulate the CRSP/Compustat merge.
        *   Values will be drawn from plausible distributions (e.g., prices from a log-normal distribution, returns from a normal distribution, accounting variables with realistic positive values).
        *   Crucially, it will introduce `NaN`s to mimic the natural missingness of financial data.
    2.  **Saving the Data:** This synthetic `DataFrame` will be saved to a Parquet file at a specified path (e.g., `data/raw/crsp_compustat_merged.parquet`). This simulates the real-world scenario where the user starts with a prepared data file.
    3.  **Configuration File:** `PyYAML` will be used to load the `config.yaml` YAML file and `json` to dump it to the required `act_tensor_config.json` format. This demonstrates the professional practice of using YAML for human editing and JSON for machine parsing.
*   **Best Practices:** Creating a synthetic data generator is a hallmark of a testable codebase. It allows for unit testing, integration testing, and provides a completely self-contained example for new users, without requiring them to have access to a WRDS subscription.

#### **2. Defining the Inputs for the Orchestrator**

*   **Underlying Concept:** The top-level orchestrator requires three simple string inputs: the path to the data, the path to the config, and the path for the output directory.
*   **Fidelity to Concept:** The example script will define these three path variables explicitly, pointing to the files and directories created in the setup step. This makes the connection between the setup and execution phases perfectly clear.

#### **3. Executing the Top-Level Orchestrator**

*   **Underlying Concept:** The final step is to call the master function that runs the entire study.
*   **Fidelity to Concept:** The script will make a single call to `run_complete_act_tensor_replication`, passing the three path variables. The call will be wrapped in a `if __name__ == "__main__":` block, which is the standard for an executable Python script.
*   **Challenges & Resolutions:** The only challenge is ensuring that all 40+ functions defined throughout our conversation are available in the execution scope. The prompt specifies assuming they are all in a single notebook. In a real project, they would be organized into modules and imported. The example will be written with this assumption in mind.

This complete example will provide a user with everything they need to understand and execute the entire ACT-Tensor replication pipeline on a controlled, synthetic dataset, thereby validating the entire codebase and providing a clear template for running it on their own proprietary data.

I will now provide the complete, professional-grade usage example.

```python
# ==============================================================================
# Final Task: End-to-End Pipeline Usage Example
# ==============================================================================

# Import necessary libraries for this example script
import logging
import os
import json
import random
from typing import Any, Dict, List

import numpy as np
import pandas as pd
import yaml

# Assume all previously defined canonical functions for the ACT-Tensor pipeline
# (Tasks 1-37) are defined in the preceding cells of this notebook.

# Configure a logger for the example script
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# ------------------------------------------------------------------------------
# Step 1: Environment Setup - Synthetic Data and Config File Generation
# ------------------------------------------------------------------------------

def _create_synthetic_data(
    num_firms: int,
    start_date: str,
    end_date: str,
    output_path: str
) -> None:
    """
    Generates a high-fidelity synthetic dataset mimicking a CRSP/Compustat merge.
    """
    logging.info("Generating high-fidelity synthetic data...")
    
    # Define date ranges for monthly and quarterly data.
    dates_monthly = pd.to_datetime(pd.date_range(start=start_date, end=end_date, freq='M'))
    dates_quarterly = pd.to_datetime(pd.date_range(start=start_date, end=end_date, freq='Q'))
    
    # Create unique firm identifiers.
    permnos = np.arange(10001, 10001 + num_firms)
    
    # Create the base DataFrame with all firm-month combinations.
    df = pd.DataFrame(
        index=pd.MultiIndex.from_product([dates_monthly, permnos], names=['date', 'permno'])
    ).reset_index()

    # --- Generate Realistic Data ---
    # Market Data (monthly)
    df['shrcd'] = 11
    df['exchcd'] = np.random.choice([1, 2, 3], size=len(df))
    df['prc'] = np.random.lognormal(mean=3, sigma=1, size=len(df)).clip(1, 10000)
    df['ret'] = np.random.normal(loc=0.01, scale=0.08, size=len(df))
    df['shrout'] = np.random.lognormal(mean=10, sigma=2, size=len(df)) * 1000
    df['RF'] = 0.01 # Simplified constant risk-free rate in percent

    # Accounting Data (quarterly) - create a separate DataFrame first.
    df_q = pd.DataFrame(
        index=pd.MultiIndex.from_product([dates_quarterly, permnos], names=['datadate', 'permno'])
    ).reset_index()
    df_q['atq'] = np.random.lognormal(mean=7, sigma=2, size=len(df_q)) * 100
    df_q['seqq'] = df_q['atq'] * np.random.uniform(0.3, 0.7, size=len(df_q))
    df_q['ibq'] = df_q['seqq'] * np.random.normal(0.02, 0.03, size=len(df_q))
    
    # Merge quarterly data onto the monthly frame.
    # This will naturally create NaNs for non-quarter-end months.
    df['datadate'] = df['date'].where(df['date'].dt.is_quarter_end)
    df = pd.merge(df, df_q, on=['permno', 'datadate'], how='left')
    
    # --- Add All Other Required Columns with Default/NaN Values ---
    # This ensures the DataFrame conforms to the required schema.
    all_cols = [
        "permno", "permco", "comnam", "shrcd", "exchcd", "siccd", "ncusip",
        "prc", "ret", "retx", "shrout", "vol", "stko", "dlstdt", "dlstcd", "dlret",
        "gvkey", "datadate", "fyearq", "fqtr", "fyr", "indfmt", "consol", "popsrc",
        "datafmt", "tic", "cusip", "sich", "atq", "ltq", "seqq", "ceqq", "saleq",
        "ibq", "oibdpq", "pstkq", "cogsq", "xsgaq", "ppentq", "invtq", "lpermno",
        "linktype", "linkprim", "linkdt", "linkenddt", "RF", "namedt", "nameenddt", "shrcls"
    ]
    for col in all_cols:
        if col not in df.columns:
            df[col] = np.nan
            
    # Introduce additional random missingness to simulate reality.
    for col in ['ret', 'prc', 'atq', 'seqq', 'ibq']:
        mask = np.random.rand(len(df)) < 0.2
        df.loc[mask, col] = np.nan

    # Save the synthetic data to a Parquet file.
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    df.to_parquet(output_path)
    logging.info(f"Synthetic data with {len(df)} rows and {len(df.columns)} columns saved to {output_path}")


def _create_json_config_file(yaml_path: str, json_path: str) -> None:
    """
    Creates the JSON equivalent of a YAML config file.
    """

    # Load the YAML and save it as JSON for the pipeline.
    with open(yaml_path, 'r') as f:
        config_dict = yaml.safe_load(f)
    
    os.makedirs(os.path.dirname(json_path), exist_ok=True)
    with open(json_path, 'w') as f:
        json.dump(config_dict, f, indent=4)
    logging.info(f"JSON config saved to {json_path}")


# ------------------------------------------------------------------------------
# Main Execution Block
# ------------------------------------------------------------------------------
if __name__ == '__main__':
    """
    This block serves as the main entry point for demonstrating the use of the
    end-to-end pipeline. It first sets up the necessary environment by creating
    synthetic data and configuration files, and then calls the top-level
    orchestrator to run the full replication study.
    """
    
    # --- Define File Paths ---
    # Define the paths where the synthetic data and config files will be stored,
    # and where all experimental outputs will be saved.
    RAW_DATA_FILE = "data/raw/synthetic_crsp_compustat.parquet"
    CONFIG_YAML_FILE = "config.yaml"
    CONFIG_JSON_FILE = "act_tensor_config.json"
    RESULTS_DIRECTORY = "replication_output"

    # --- 1. Set up the Environment ---
    # Generate the synthetic raw data file.
    _create_synthetic_data(
        num_firms=100,
        start_date="2016-01-01",
        end_date="2020-12-31",
        output_path=RAW_DATA_FILE
    )
    
    # Generate the configuration files.
    _create_json_config_file(
        yaml_path=CONFIG_YAML_FILE,
        json_path=CONFIG_JSON_FILE
    )

    # --- 2. Execute the Entire Replication Study ---
    # This single function call launches the entire end-to-end pipeline.
    # It will run all 9 experimental conditions (3 regimes x 3 smoothers),
    # including all baseline, ablation, and sensitivity analyses, and then
    # aggregate all results into final, publication-ready tables and figures.
    logging.info(f"\n{'='*80}\nLAUNCHING FULL REPLICATION SUITE...\n{'='*80}")
    
    # Call the top-level orchestrator from the previous task.
    final_artifacts = run_complete_act_tensor_replication(
        data_path=RAW_DATA_FILE,
        config_path=CONFIG_JSON_FILE,
        base_output_dir=RESULTS_DIRECTORY
    )

    # --- 3. Conclude ---
    # Print the final output, which contains the paths to all generated artifacts.
    logging.info(f"\n{'='*80}\nREPLICATION SUITE COMPLETE.\n{'='*80}")
    logging.info("Final artifacts generated:")
    logging.info(json.dumps(final_artifacts, indent=4))

```

In [None]:
# Task 1: Validate and Parse Configuration Dictionary

# ==============================================================================
# Task 1: Validate and Parse Configuration Dictionary
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 1, Step 1: Type and structure validation (Helper Function)
# ------------------------------------------------------------------------------
def _validate_structure_and_log(
    config: Dict[str, Any],
    output_dir: str
) -> None:
    """
    Validates the top-level structure of the configuration dictionary and logs it.

    This helper function ensures that the configuration is a dictionary and contains
    all required primary keys. It also verifies that nested values are dictionaries
    where expected. Finally, it creates a timestamped JSON snapshot of the
    configuration for audit and reproducibility purposes.

    Args:
        config: The configuration dictionary to validate.
        output_dir: The directory where the configuration snapshot will be saved.

    Raises:
        TypeError: If the config is not a dictionary or if a nested value has an
                   incorrect type.
        KeyError: If a required top-level key is missing from the config.
    """
    # Assert that the provided configuration is a Python dictionary.
    if not isinstance(config, dict):
        raise TypeError("Configuration must be a dictionary.")

    # Define the set of all required top-level keys for the study.
    required_keys: List[str] = [
        "study_scope", "linking_policy", "tensor_policies",
        "imputation_hparams", "optimization_settings", "smoothing_settings",
        "masking_regimes", "portfolio_settings", "factor_extraction",
        "evaluation_metrics", "stability_test", "reproducibility", "guardrails"
    ]

    # Verify the presence of each required top-level key.
    for key in required_keys:
        if key not in config:
            raise KeyError(f"Missing required top-level key in config: '{key}'")

    # Verify that nested structures are dictionaries, with exceptions.
    for key, value in config.items():
        # The 'evaluation_metrics' key is allowed to contain lists of strings.
        if key == "evaluation_metrics":
            if not isinstance(value, dict):
                raise TypeError(
                    f"Config key '{key}' must be a dictionary."
                )
        # All other top-level keys must correspond to dictionary values.
        elif key in required_keys and not isinstance(value, dict):
            raise TypeError(
                f"Config key '{key}' must be a dictionary, but found {type(value)}."
            )

    # Ensure the output directory for the log file exists.
    os.makedirs(output_dir, exist_ok=True)

    # Generate a timestamp for the configuration snapshot file.
    timestamp: str = datetime.now().strftime("%Y%m%d_%H%M%S")

    # Define the full path for the snapshot file.
    snapshot_path: str = os.path.join(
        output_dir, f"config_snapshot_{timestamp}.json"
    )

    # Write the complete configuration to a JSON file for auditing.
    with open(snapshot_path, 'w') as f:
        json.dump(config, f, indent=4)


# ------------------------------------------------------------------------------
# Task 1, Step 2: Validate study scope and date boundaries (Helper Function)
# ------------------------------------------------------------------------------
def _validate_study_scope(config: Dict[str, Any]) -> None:
    """
    Validates parameters related to the study's scope, dates, and universe.

    This function parses date strings, verifies their chronological order, and
    cross-validates the expected number of months against the configuration.
    It also checks the validity of security filters.

    Args:
        config: The configuration dictionary to validate.

    Raises:
        ValueError: If dates are invalid, out of order, or if the expected
                    number of months is inconsistent.
        TypeError: If filter parameters are not of the expected type (list of ints).
    """
    # Extract the study scope sub-dictionary for easier access.
    scope: Dict[str, Any] = config["study_scope"]

    # Extract the tensor policies sub-dictionary.
    tensor_policies: Dict[str, Any] = config["tensor_policies"]

    # Parse start and end dates into pandas Timestamp objects for robust comparison.
    try:
        start_date: pd.Timestamp = pd.to_datetime(scope["start_date"])
        end_date: pd.Timestamp = pd.to_datetime(scope["end_date"])
    except Exception as e:
        raise ValueError(f"Could not parse 'start_date' or 'end_date'. Error: {e}")

    # Assert that the study's time window is chronologically valid.
    if end_date < start_date:
        raise ValueError(
            f"end_date ({end_date}) cannot be before start_date ({start_date})."
        )

    # Calculate the expected number of months in the study period.
    # Formula: T_expected = 12 * (end_year - start_year) + (end_month - start_month) + 1
    t_expected: int = (
        12 * (end_date.year - start_date.year) +
        (end_date.month - start_date.month) + 1
    )

    # Verify that the calculated number of months matches the value specified in tensor_policies.
    if tensor_policies["T_months_expected"] != t_expected:
        raise ValueError(
            f"Inconsistency in config: 'T_months_expected' is "
            f"{tensor_policies['T_months_expected']}, but calculated period "
            f"is {t_expected} months."
        )

    # Validate the share code filter for common stocks.
    share_codes: List[int] = scope["share_code_filter"]
    if not (
        isinstance(share_codes, list) and
        all(isinstance(i, int) for i in share_codes) and
        share_codes
    ):
        raise TypeError("'share_code_filter' must be a non-empty list of integers.")

    # Ensure share codes are the standard CRSP codes for common stock.
    if not all(code in {10, 11} for code in share_codes):
        raise ValueError(f"Invalid 'share_code_filter' values. Must be in {{10, 11}}.")

    # Validate the exchange code filter.
    exchange_codes: List[int] = scope["include_exchanges"]
    if not (
        isinstance(exchange_codes, list) and
        all(isinstance(i, int) for i in exchange_codes) and
        exchange_codes
    ):
        raise TypeError("'include_exchanges' must be a non-empty list of integers.")


# ------------------------------------------------------------------------------
# Task 1, Step 3: Validate hyperparameters and set seeds (Helper Function)
# ------------------------------------------------------------------------------
def _validate_hyperparameters(config: Dict[str, Any]) -> None:
    """
    Validates numerical and choice-based hyperparameters and sets random seeds.

    This function performs detailed checks on tensor dimensions, optimization
    parameters, and smoothing settings. It concludes by setting the global
    random seeds for NumPy and Python's `random` module to ensure
    reproducibility.

    Args:
        config: The configuration dictionary to validate.

    Raises:
        ValueError: If a hyperparameter is outside its admissible range.
        TypeError: If a hyperparameter has an incorrect type.
    """
    # --- Validate integer parameters ---
    # Define a mapping of parameter paths to their dictionary locations.
    int_params_paths: Dict[str, List[str]] = {
        "CP_rank_R": ["imputation_hparams"],
        "clusters_K": ["imputation_hparams"],
        "L_characteristics": ["tensor_policies"],
        "P_size_buckets": ["portfolio_settings"],
        "Q_char_buckets": ["portfolio_settings"],
        "k_p": ["factor_extraction", "mode_ranks"],
        "k_q": ["factor_extraction", "mode_ranks"],
        "k_ell": ["factor_extraction", "mode_ranks"],
    }
    # Iterate and validate that each required integer parameter is a positive integer.
    for param, path in int_params_paths.items():
        sub_dict = config
        for key in path:
            sub_dict = sub_dict[key]
        value = sub_dict[param]
        if not (isinstance(value, int) and value > 0):
            raise ValueError(f"Parameter '{param}' must be a positive integer.")

    # --- Validate float parameters ---
    # Verify density threshold tau is in the interval (0, 1].
    tau: float = config["imputation_hparams"]["density_threshold_tau"]
    if not (isinstance(tau, float) and 0.0 < tau <= 1.0):
        raise ValueError(
            "'density_threshold_tau' must be a float in (0, 1]."
        )

    # Validate ridge lambda is a non-negative float.
    ridge_lambda: float = config["imputation_hparams"]["ridge_lambda"]
    if not (isinstance(ridge_lambda, float) and ridge_lambda >= 0.0):
        raise ValueError("'ridge_lambda' must be a non-negative float.")

    # --- Validate smoothing settings ---
    smoothing_settings: Dict[str, Any] = config["smoothing_settings"]
    # Check if the selected smoothing method is one of the valid options.
    if smoothing_settings["method"] not in {"CMA", "EMA", "KF"}:
        raise ValueError(
            f"Invalid smoothing method: '{smoothing_settings['method']}'. "
            f"Must be 'CMA', 'EMA', or 'KF'."
        )

    # Validate CMA delta: must be an odd, positive integer.
    cma_delta: int = smoothing_settings["CMA"]["delta"]
    if not (isinstance(cma_delta, int) and cma_delta > 0 and cma_delta % 2 == 1):
        raise ValueError("'CMA:delta' must be an odd positive integer.")

    # Validate EMA theta: must be a float in (0, 1).
    ema_theta: float = smoothing_settings["EMA"]["theta"]
    if not (isinstance(ema_theta, float) and 0.0 < ema_theta < 1.0):
        raise ValueError("'EMA:theta' must be a float in (0, 1).")

    # Validate Kalman Filter grids: must be non-empty lists of positive floats.
    for grid_name in ["process_var_h_grid", "meas_var_r_grid"]:
        grid: List[float] = smoothing_settings["KF"][grid_name]
        if not (
            isinstance(grid, list) and
            all(isinstance(v, (float, int)) and v > 0 for v in grid) and
            grid
        ):
            raise ValueError(
                f"'KF:{grid_name}' must be a non-empty list of positive numbers."
            )

    # --- Validate reproducibility settings and set seeds ---
    repro_config: Dict[str, Any] = config["reproducibility"]
    # Check that all specified seeds are non-negative integers.
    for key, value in repro_config.items():
        if "seed" in key:
            if not (isinstance(value, int) and value >= 0):
                raise ValueError(f"Seed '{key}' must be a non-negative integer.")

    # Set global random seeds immediately after validation for reproducibility.
    # This is a critical step that affects the global state.
    np.random.seed(repro_config["seed_global"])
    random.seed(repro_config["seed_global"])

    # Confirm the specified float dtype is 'float64' for numerical precision.
    if repro_config["float_dtype"] != "float64":
        raise ValueError("'float_dtype' must be 'float64'.")


# ------------------------------------------------------------------------------
# Task 1, Orchestrator Function
# ------------------------------------------------------------------------------
def validate_config(
    config: Dict[str, Any],
    output_dir: str = "config_logs"
) -> Dict[str, Any]:
    """
    Orchestrates the validation of the entire ACT-Tensor configuration dictionary.

    This function serves as the main entry point for configuration validation. It
    sequentially calls helper functions to perform structural, scope-based, and
    hyperparameter-level checks. A valid configuration is returned upon successful
    completion, and a timestamped snapshot is saved to disk. This function is
    the first critical guardrail in the research pipeline.

    Args:
        config: The `act_tensor_config` dictionary to be validated.
        output_dir: Directory to save the configuration snapshot. Defaults to
                    "config_logs".

    Returns:
        The validated configuration dictionary, unchanged.

    Raises:
        TypeError: If the config or its components have incorrect types.
        KeyError: If required keys are missing.
        ValueError: If parameter values are outside their admissible ranges or
                    are inconsistent.
    """
    # Step 1: Validate the overall dictionary structure and create a snapshot.
    _validate_structure_and_log(config, output_dir)

    # Step 2: Validate the study scope, date boundaries, and universe filters.
    _validate_study_scope(config)

    # Step 3: Validate all numerical hyperparameters and set global random seeds.
    _validate_hyperparameters(config)

    # If all validations pass, return the original configuration dictionary.
    return config


In [None]:
# Task 2: Validate and Enforce DataFrame Schema

# ==============================================================================
# Task 2: Validate and Enforce DataFrame Schema
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 2, Step 1: Index and column presence validation (Helper Function)
# ------------------------------------------------------------------------------
def _validate_columns_and_set_index(
    df: pd.DataFrame,
    required_columns: List[str],
    core_columns: List[str]
) -> pd.DataFrame:
    """
    Validates column presence and sets a DatetimeIndex.

    This function ensures that all required columns exist in the DataFrame. It
    raises a critical error if core columns are missing. It then ensures a 'date'
    column exists, converts it to a pandas DatetimeIndex, and drops any rows
    with invalid date entries.

    Args:
        df: The input raw DataFrame.
        required_columns: A list of all column names expected in the DataFrame.
        core_columns: A subset of required_columns that are absolutely essential
                      for the pipeline to run.

    Returns:
        A new DataFrame with a validated DatetimeIndex named 'date'.

    Raises:
        KeyError: If any core columns are missing.
        ValueError: If the 'date' column is missing or cannot be processed.
    """
    # Create a copy to avoid modifying the original DataFrame in place.
    df_validated = df.copy()

    # Verify the presence of all required columns.
    missing_cols = [col for col in required_columns if col not in df_validated.columns]
    if missing_cols:
        # Check if any of the missing columns are core to the analysis.
        missing_core_cols = [col for col in missing_cols if col in core_columns]
        if missing_core_cols:
            # If core columns are missing, this is a fatal error.
            raise KeyError(
                f"Fatal error: Core columns are missing from the DataFrame: "
                f"{missing_core_cols}"
            )
        # If non-core columns are missing, log a warning.
        logging.warning(f"Non-core columns are missing: {missing_cols}")

    # Ensure the 'date' column exists, as it's required for the index.
    if "date" not in df_validated.columns:
        raise ValueError("The required 'date' column is not in the DataFrame.")

    # Convert the 'date' column to datetime objects, coercing errors to NaT.
    df_validated["date"] = pd.to_datetime(df_validated["date"], errors="coerce")

    # Identify and log the number of rows with unparseable dates.
    invalid_dates_count = df_validated["date"].isnull().sum()
    if invalid_dates_count > 0:
        logging.warning(
            f"Found and will drop {invalid_dates_count} rows with invalid dates."
        )
        # Remove rows where the date could not be parsed.
        df_validated.dropna(subset=["date"], inplace=True)

    # Set the 'date' column as the DataFrame's index.
    df_validated.set_index("date", inplace=True)

    return df_validated


# ------------------------------------------------------------------------------
# Task 2, Step 2: Type enforcement and conversion (Helper Function)
# ------------------------------------------------------------------------------
def _enforce_data_types(
    df: pd.DataFrame,
    output_dir: str
) -> pd.DataFrame:
    """
    Enforces strict data types for all columns in the DataFrame.

    This function applies a predefined dtype map to the DataFrame. It uses
    `pd.to_numeric` and `pd.to_datetime` with error coercion, which converts
    unparseable values to NaN/NaT without halting execution. A data quality
    report detailing null counts post-conversion is saved to disk.

    Args:
        df: The DataFrame with a DatetimeIndex.
        output_dir: Directory to save the data quality report.

    Returns:
        A new DataFrame with all columns cast to their specified dtypes.
    """
    # Create a copy to ensure the original DataFrame is not modified.
    df_typed = df.copy()

    # Define the strict schema with target data types for each column group.
    # Note: Pandas will automatically use float for integer columns with NaNs.
    # We use nullable integer 'Int64' for integer columns that might contain nulls.
    dtype_map: Dict[str, str] = {
        # Identifiers (nullable integers)
        "permno": "Int64", "permco": "Int64", "shrcd": "Int64",
        "exchcd": "Int64", "siccd": "Int64", "dlstcd": "Int64",
        "fyearq": "Int64", "fqtr": "Int64", "fyr": "Int64",
        "sich": "Int64", "lpermno": "Int64", "vol": "Int64",
        # Core financial metrics (floats)
        "prc": "float64", "ret": "float64", "retx": "float64",
        "shrout": "float64", "dlret": "float64", "RF": "float64",
        # Compustat quarterly accounting items (floats)
        "atq": "float64", "ltq": "float64", "seqq": "float64",
        "ceqq": "float64", "saleq": "float64", "ibq": "float64",
        "oibdpq": "float64", "pstkq": "float64", "cogsq": "float64",
        "xsgaq": "float64", "ppentq": "float64", "invtq": "float64",
        # String/Object columns
        "comnam": "object", "ncusip": "object", "stko": "object",
        "gvkey": "object", "indfmt": "object", "consol": "object",
        "popsrc": "object", "datafmt": "object", "tic": "object",
        "cusip": "object", "linktype": "object", "linkprim": "object",
        "shrcls": "object",
        # Date columns (already handled in index, but good practice to list)
        "datadate": "object", "dlstdt": "datetime64[ns]",
        "linkdt": "datetime64[ns]", "linkenddt": "datetime64[ns]",
        "namedt": "object", "nameenddt": "object"
    }

    # Iterate through the schema and apply type conversions.
    for col, dtype in dtype_map.items():
        if col in df_typed.columns:
            if 'datetime' in str(dtype):
                # Convert date-related columns, coercing errors to NaT.
                df_typed[col] = pd.to_datetime(df_typed[col], errors='coerce')
            elif 'Int' in str(dtype):
                # Convert to nullable integer type.
                df_typed[col] = pd.to_numeric(df_typed[col], errors='coerce').astype(dtype)
            else:
                # Apply standard type conversion.
                df_typed[col] = df_typed[col].astype(dtype, errors='ignore')

    # --- Generate Data Quality Report ---
    # Calculate the number of null values in each column after type enforcement.
    null_counts: pd.Series = df_typed.isnull().sum()

    # Calculate the percentage of null values for context.
    null_percentages: pd.Series = (null_counts / len(df_typed)) * 100

    # Combine counts and percentages into a report DataFrame.
    quality_report: pd.DataFrame = pd.DataFrame({
        "null_count": null_counts,
        "null_percentage": null_percentages
    })

    # Filter the report to show only columns that contain null values.
    quality_report = quality_report[quality_report["null_count"] > 0]
    quality_report.sort_values(by="null_count", ascending=False, inplace=True)

    # Define the path for the quality report file.
    report_path: str = os.path.join(output_dir, "data_quality_report.txt")

    # Write the report to a text file.
    with open(report_path, "w") as f:
        f.write("Data Quality Report: Null Counts After Type Enforcement\n")
        f.write("=" * 60 + "\n")
        quality_report.to_string(f)

    logging.info(f"Data quality report saved to {report_path}")

    return df_typed


# ------------------------------------------------------------------------------
# Task 2, Step 3: Sanity checks on core financial fields (Helper Function)
# ------------------------------------------------------------------------------
def _perform_sanity_checks(df: pd.DataFrame) -> None:
    """
    Performs domain-specific sanity checks on core financial data fields.

    This function validates that key financial metrics fall within plausible
    ranges. It does not drop data but logs warnings for values that may
    indicate data quality issues, such as negative shares outstanding or
    extreme returns.

    Args:
        df: The type-enforced DataFrame.

    Raises:
        AssertionError: If a fundamental data constraint is violated (e.g.,
                        negative shares).
    """
    # Check 'prc': Absolute value of price should be positive for non-nulls.
    # CRSP uses negative signs to denote bid-ask averages, not negative prices.
    if (df["prc"].dropna().abs() <= 0).any():
        logging.warning("Found non-positive absolute values in 'prc' column.")

    # Check 'shrout': Shares outstanding must be non-negative.
    if (df["shrout"].dropna() < 0).any():
        # This is a critical data error.
        raise AssertionError("Negative values found in 'shrout' column.")

    # Check 'ret', 'retx', 'dlret' for plausible range [-1, 10].
    # A return greater than 1000% in a month is highly suspect.
    for col in ["ret", "retx", "dlret"]:
        if col in df.columns:
            extreme_returns = df[col].dropna().abs() > 10.0
            if extreme_returns.any():
                logging.warning(
                    f"Found {extreme_returns.sum()} returns with absolute value > 1000% in '{col}'."
                )

    # Check 'RF' for plausible monthly risk-free rate range in percent format.
    if "RF" in df.columns:
        rf_out_of_range = ~df["RF"].dropna().between(-0.5, 2.0)
        if rf_out_of_range.any():
            logging.warning(
                f"Found {rf_out_of_range.sum()} 'RF' values outside [-0.5%, 2.0%]."
            )

    # Verify that the index frequency is approximately monthly.
    # We check if the modal day of the month is towards the end (28, 29, 30, 31).
    if not df.index.empty:
        modal_day = df.index.day.mode()
        if not modal_day.empty and modal_day[0] < 28:
            logging.warning(
                f"Modal day of month is {modal_day[0]}, which may indicate "
                f"data is not consistently month-end."
            )


# ------------------------------------------------------------------------------
# Task 2, Orchestrator Function
# ------------------------------------------------------------------------------
def validate_and_enforce_schema(
    df_raw: pd.DataFrame,
    output_dir: str = "data_interim"
) -> pd.DataFrame:
    """
    Orchestrates the validation and schema enforcement for the raw input DataFrame.

    This function serves as a robust pre-processing step to ensure the input data
    conforms to the required structure and types for the ACT-Tensor pipeline. It
    performs a sequence of validations:
    1. Verifies the presence of all required columns and sets a proper DatetimeIndex.
    2. Enforces a strict data type schema on all columns, coercing errors.
    3. Conducts domain-specific sanity checks on key financial variables.
    4. Checkpoints the validated DataFrame to a high-performance Parquet file.

    Args:
        df_raw: The raw, unprocessed pandas DataFrame from data loading.
        output_dir: The directory to save the validated DataFrame and quality
                    reports. Defaults to "data_interim".

    Returns:
        A cleaned, validated, and schema-enforced pandas DataFrame ready for
        downstream processing.

    Raises:
        KeyError: If essential columns are missing.
        ValueError: If the 'date' column is absent or invalid.
        AssertionError: If critical data integrity checks fail (e.g., negative shares).
    """
    # Define the complete list of columns required by the pipeline.
    required_columns: List[str] = [
        "permno", "permco", "comnam", "shrcd", "exchcd", "siccd", "ncusip",
        "prc", "ret", "retx", "shrout", "vol", "stko", "dlstdt", "dlstcd",
        "dlret", "gvkey", "datadate", "fyearq", "fqtr", "fyr", "indfmt",
        "consol", "popsrc", "datafmt", "tic", "cusip", "sich", "atq", "ltq",
        "seqq", "ceqq", "saleq", "ibq", "oibdpq", "pstkq", "cogsq", "xsgaq",
        "ppentq", "invtq", "lpermno", "linktype", "linkprim", "linkdt",
        "linkenddt", "RF", "namedt", "nameenddt", "shrcls"
    ]
    # Define a subset of columns that are absolutely critical.
    core_columns: List[str] = [
        "permno", "date", "ret", "prc", "shrout", "gvkey", "RF"
    ]

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Validate columns and set index ---
    df_indexed = _validate_columns_and_set_index(
        df=df_raw,
        required_columns=required_columns,
        core_columns=core_columns
    )
    logging.info("Step 1/3: Column presence and index validation complete.")

    # --- Step 2: Enforce data types ---
    df_typed = _enforce_data_types(df=df_indexed, output_dir=output_dir)
    logging.info("Step 2/3: Data type enforcement complete.")

    # --- Step 3: Perform sanity checks ---
    _perform_sanity_checks(df=df_typed)
    logging.info("Step 3/3: Sanity checks on financial fields complete.")

    # --- Checkpointing ---
    # Define the path for the checkpoint file.
    checkpoint_path: str = os.path.join(output_dir, "df_validated.parquet")

    # Save the validated DataFrame to Parquet format for efficient downstream I/O.
    df_typed.to_parquet(checkpoint_path, compression="snappy")
    logging.info(f"Validated DataFrame checkpointed to {checkpoint_path}")

    return df_typed


In [None]:
# Task 3: Universe Filtering, Deduplication, and Core Data Cleansing

# ==============================================================================
# Task 3: Universe Filtering, Deduplication, and Core Data Cleansing
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 3, Step 1: Temporal and universe filters (Helper Function)
# ------------------------------------------------------------------------------
def _apply_universe_filters(
    df: pd.DataFrame,
    config: Dict[str, Any],
    log_path: str
) -> pd.DataFrame:
    """
    Applies temporal and security universe filters to the DataFrame.

    This function restricts the data to the specified date range, common stocks
    on major US exchanges, and logs the impact of each filter.

    Args:
        df: The validated DataFrame from the previous task.
        config: The study's configuration dictionary.
        log_path: Path to the file for logging filter counts.

    Returns:
        A new DataFrame containing only the securities within the defined universe
        and time period.
    """
    # Create a copy to avoid side effects on the original DataFrame.
    df_filtered = df.copy()

    # Extract the relevant study scope parameters.
    scope_config = config["study_scope"]

    # Initialize a list to store log messages.
    log_messages = ["Universe Filtering Log\n" + "="*40]
    log_messages.append(f"Initial row count: {len(df_filtered)}")

    # --- 1. Temporal Filter ---
    # Restrict DataFrame to the specified date range.
    start_date = pd.to_datetime(scope_config["start_date"])
    end_date = pd.to_datetime(scope_config["end_date"])
    df_filtered = df_filtered.loc[start_date:end_date]
    log_messages.append(
        f"After temporal filter ({start_date.date()} to {end_date.date()}): "
        f"{len(df_filtered)} rows"
    )

    # --- 2. Common Stock Filter (shrcd) ---
    # Retain only rows with share codes corresponding to common stocks.
    share_code_filter = scope_config["share_code_filter"]
    df_filtered = df_filtered[df_filtered["shrcd"].isin(share_code_filter)]
    log_messages.append(
        f"After share code filter (shrcd in {share_code_filter}): "
        f"{len(df_filtered)} rows"
    )

    # --- 3. Exchange Filter (exchcd) ---
    # Retain only rows for securities listed on specified exchanges.
    exchange_filter = scope_config["include_exchanges"]
    df_filtered = df_filtered[df_filtered["exchcd"].isin(exchange_filter)]
    log_messages.append(
        f"After exchange filter (exchcd in {exchange_filter}): "
        f"{len(df_filtered)} rows"
    )

    # --- 4. Non-Exchange Listing Exclusion ---
    # If configured, explicitly drop rows where exchange code is missing.
    if scope_config.get("exclude_non_exchange_listings", True):
        pre_drop_len = len(df_filtered)
        df_filtered.dropna(subset=["exchcd"], inplace=True)
        post_drop_len = len(df_filtered)
        if pre_drop_len > post_drop_len:
            log_messages.append(
                f"After dropping rows with null exchcd: {post_drop_len} rows"
            )

    # Write all log messages to the specified file.
    with open(log_path, "w") as f:
        f.write("\n".join(log_messages))

    logging.info(f"Universe filter log saved to {log_path}")

    return df_filtered


# ------------------------------------------------------------------------------
# Task 3, Step 2: Deduplicate by (date, permno) (Helper Function)
# ------------------------------------------------------------------------------
def _deduplicate_observations(
    df: pd.DataFrame,
    config: Dict[str, Any],
    log_path: str
) -> pd.DataFrame:
    """
    Deduplicates observations by (date, permno) using a deterministic tie-breaker.

    When multiple entries exist for the same firm-month (e.g., from CRSP-Compustat
    link table), this function applies a multi-level sorting logic to select the
    single best observation.

    Args:
        df: The filtered DataFrame.
        config: The study's configuration dictionary.
        log_path: Path to the file for logging deduplication counts.

    Returns:
        A new DataFrame with a unique (date, permno) MultiIndex.
    """
    # Create a copy to work on.
    df_dedup = df.copy()

    # Reset index to make 'date' a column for sorting and grouping.
    df_dedup.reset_index(inplace=True)

    initial_rows = len(df_dedup)

    # Define the tie-breaking hierarchy.
    # 1. Prefer primary link ('P') over secondary ('C').
    # 2. Prefer allowed link types ('LC', 'LU').
    # 3. Prefer the most recent link start date.
    # 4. As a final deterministic tie-breaker, use the gvkey.
    link_policy = config["linking_policy"]
    df_dedup['linkprim_ord'] = df_dedup['linkprim'].apply(
        lambda x: 0 if x == 'P' else 1 if x == 'C' else 2
    )
    df_dedup['linktype_ord'] = df_dedup['linktype'].apply(
        lambda x: 0 if x in link_policy["allow_linktype"] else 1
    )

    # Sort the DataFrame according to the full tie-breaking logic.
    sort_order = [
        'date', 'permno', 'linkprim_ord', 'linktype_ord', 'linkdt', 'gvkey'
    ]
    ascending_order = [True, True, True, True, False, True] # False for linkdt (latest)
    df_dedup.sort_values(
        by=sort_order,
        ascending=ascending_order,
        inplace=True,
        na_position='last'
    )

    # Drop duplicates, keeping the first entry after sorting.
    df_dedup.drop_duplicates(subset=['date', 'permno'], keep='first', inplace=True)

    # Clean up temporary sorting columns.
    df_dedup.drop(columns=['linkprim_ord', 'linktype_ord'], inplace=True)

    final_rows = len(df_dedup)
    duplicates_removed = initial_rows - final_rows

    # Log the number of duplicates removed.
    with open(log_path, "w") as f:
        f.write("Deduplication Log\n" + "="*40 + "\n")
        f.write(f"Rows before deduplication: {initial_rows}\n")
        f.write(f"Rows after deduplication: {final_rows}\n")
        f.write(f"Duplicates removed: {duplicates_removed}\n")

    logging.info(f"Deduplication log saved to {log_path}")

    # Set and sort the final (date, permno) MultiIndex.
    df_dedup.set_index(['date', 'permno'], inplace=True)
    df_dedup.sort_index(inplace=True)

    return df_dedup


# ------------------------------------------------------------------------------
# Task 3, Step 3: Outlier removal and delisting returns (Helper Function)
# ------------------------------------------------------------------------------
def _handle_outliers_and_delistings(
    df: pd.DataFrame,
    config: Dict[str, Any],
    log_path: str
) -> pd.DataFrame:
    """
    Removes extreme outliers and incorporates CRSP delisting returns.

    This function applies domain-specific filters to remove penny stocks,
    micro-caps, and observations with extreme returns. It then correctly
    compounds delisting returns with regular monthly returns where applicable.

    Args:
        df: The deduplicated DataFrame with a (date, permno) MultiIndex.
        config: The study's configuration dictionary.
        log_path: Path to the file for logging outlier removal counts.

    Returns:
        A new, fully cleansed DataFrame.
    """
    # Create a copy to work on.
    df_clean = df.copy()

    scope_config = config["study_scope"]
    log_messages = ["Outlier Removal & Cleansing Log\n" + "="*50]
    initial_rows = len(df_clean)
    log_messages.append(f"Rows before outlier removal: {initial_rows}")

    # --- 1. Extreme Outlier Removal ---
    if scope_config.get("drop_extreme_outliers", True):
        # Define outlier thresholds.
        thresholds = {
            "min_price": 1.0,
            "max_price": 10000.0,
            "max_abs_ret": 2.0,
            "min_shrout": 10.0, # In thousands of shares
            "min_mktcap_proxy": 1e3 # In thousands of USD ($1 million)
        }
        log_messages.append(f"Applying outlier thresholds: {thresholds}")

        # Create boolean masks for each condition.
        price_mask = df_clean["prc"].abs().between(
            thresholds["min_price"], thresholds["max_price"]
        )
        ret_mask = df_clean["ret"].abs() <= thresholds["max_abs_ret"]
        shrout_mask = df_clean["shrout"] >= thresholds["min_shrout"]

        # Compute market cap proxy for filtering.
        mktcap_proxy = df_clean["prc"].abs() * df_clean["shrout"]
        mktcap_mask = mktcap_proxy >= thresholds["min_mktcap_proxy"]

        # Combine all masks. Keep rows where all conditions are True.
        # NaNs in any of these columns will result in False, thus dropping the row.
        combined_mask = price_mask & ret_mask & shrout_mask & mktcap_mask
        df_clean = df_clean[combined_mask]

        final_rows = len(df_clean)
        log_messages.append(f"Rows after outlier removal: {final_rows}")
        log_messages.append(f"Rows removed: {initial_rows - final_rows}")

    # --- 2. Delisting Return Incorporation ---
    if scope_config.get("use_delisting_returns", True):
        # Identify rows where both regular and delisting returns are available.
        both_valid = df_clean["ret"].notna() & df_clean["dlret"].notna()

        # Formula: r_combined = (1 + r_t) * (1 + r_delist) - 1
        df_clean.loc[both_valid, "ret"] = (
            (1 + df_clean.loc[both_valid, "ret"]) *
            (1 + df_clean.loc[both_valid, "dlret"])
        ) - 1

        # Identify rows where only delisting return is available.
        only_dlret = df_clean["ret"].isna() & df_clean["dlret"].notna()
        df_clean.loc[only_dlret, "ret"] = df_clean.loc[only_dlret, "dlret"]

        log_messages.append(
            f"Incorporated delisting returns for {both_valid.sum()} observations "
            f"and filled {only_dlret.sum()} missing returns with dlret."
        )

    # Write log messages to file.
    with open(log_path, "w") as f:
        f.write("\n".join(log_messages))

    logging.info(f"Outlier and delisting log saved to {log_path}")

    return df_clean


# ------------------------------------------------------------------------------
# Task 3, Orchestrator Function
# ------------------------------------------------------------------------------
def cleanse_and_filter_universe(
    df_validated: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str = "data_interim"
) -> pd.DataFrame:
    """
    Orchestrates the universe definition and core data cleansing pipeline.

    This function takes a schema-validated DataFrame and applies a series of
    rigorous filters and transformations to define the final analytical sample:
    1. Applies temporal and security-type filters (e.g., date range, common stock).
    2. Deduplicates firm-month observations using a deterministic tie-breaker.
    3. Removes extreme outliers and incorporates CRSP delisting returns.
    4. Checkpoints the final cleansed DataFrame to a Parquet file.

    Args:
        df_validated: The DataFrame after schema validation (from Task 2).
        config: The `act_tensor_config` dictionary.
        output_dir: Directory to save the cleansed DataFrame and log files.
                    Defaults to "data_interim".

    Returns:
        A fully cleansed and filtered pandas DataFrame with a unique
        (date, permno) MultiIndex, ready for characteristic construction.
    """
    # Ensure the output directory for logs and checkpoints exists.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Apply temporal and universe filters ---
    universe_log_path = os.path.join(output_dir, "universe_filter_log.txt")
    df_filtered = _apply_universe_filters(
        df=df_validated, config=config, log_path=universe_log_path
    )
    logging.info("Step 1/3: Universe filtering complete.")

    # --- Step 2: Deduplicate by (date, permno) and resolve conflicts ---
    dedup_log_path = os.path.join(output_dir, "deduplication_log.txt")
    df_deduplicated = _deduplicate_observations(
        df=df_filtered, config=config, log_path=dedup_log_path
    )
    logging.info("Step 2/3: Deduplication complete.")

    # --- Step 3: Handle outliers and delisting returns ---
    outlier_log_path = os.path.join(output_dir, "outlier_removal_log.txt")
    df_cleansed = _handle_outliers_and_delistings(
        df=df_deduplicated, config=config, log_path=outlier_log_path
    )
    logging.info("Step 3/3: Outlier removal and delisting return handling complete.")

    # --- Checkpointing ---
    # Define the path for the final cleansed data checkpoint.
    checkpoint_path = os.path.join(output_dir, "df_cleansed.parquet")

    # Save the cleansed DataFrame to Parquet format.
    df_cleansed.to_parquet(checkpoint_path, compression="snappy")
    logging.info(f"Cleansed DataFrame checkpointed to {checkpoint_path}")

    return df_cleansed


In [None]:
# Task 4: Construct the 45-Characteristic Library with Definitions and Timing Conventions

# ==============================================================================
# Task 4: Construct the 45-Characteristic Library and Compute Values
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 4, Step 1: Document characteristic definitions (Helper Function)
# ------------------------------------------------------------------------------
def _create_and_save_char_library(
    output_path: str
) -> List[Dict[str, Any]]:
    """
    Defines and saves the characteristic library metadata to a JSON file.

    This function creates a structured definition for each characteristic, including
    its name, formula, input variables, and type. For this implementation, a
    representative subset of characteristics is defined. A full implementation
    would list all 45. This metadata drives the computation process.

    Args:
        output_path: The file path to save the JSON library (e.g., 'config/char_library.json').

    Returns:
        The characteristic library as a list of dictionaries.
    """
    # Define a representative subset of the 45 characteristics.
    # A full implementation would source all 45 from a paper like Gu, Kelly, Xiu (2020).
    char_library = [
        {
            "char_id": 1,
            "char_name": "market_equity",
            "category": "size",
            "formula": r"ME_t = |prc_t| \times shrout_t",
            "inputs": ["prc", "shrout"],
            "frequency": "monthly"
        },
        {
            "char_id": 2,
            "char_name": "book_to_market",
            "category": "value",
            "formula": r"BM_t = \frac{seqq_{q(t-6m)}}{ME_t}",
            "inputs": ["seqq", "market_equity"],
            "frequency": "hybrid",
            "reporting_lag_months": 6
        },
        {
            "char_id": 3,
            "char_name": "momentum_12_2",
            "category": "momentum",
            "formula": r"MOM_t = \prod_{s=t-12}^{t-2}(1 + r_s) - 1",
            "inputs": ["ret"],
            "frequency": "monthly"
        },
        {
            "char_id": 4,
            "char_name": "return_on_equity",
            "category": "profitability",
            "formula": r"ROE_t = \frac{ibq_{q(t-6m)}}{seqq_{q(t-6m)}}",
            "inputs": ["ibq", "seqq"],
            "frequency": "quarterly",
            "reporting_lag_months": 6
        },
        {
            "char_id": 5,
            "char_name": "asset_growth",
            "category": "investment",
            "formula": r"AG_t = \frac{atq_{q(t-6m)}}{atq_{q(t-18m)}} - 1",
            "inputs": ["atq"],
            "frequency": "quarterly",
            "reporting_lag_months": 6
        }
    ]

    # Ensure the directory for the output path exists.
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    # Write the library to the specified JSON file.
    with open(output_path, 'w') as f:
        json.dump(char_library, f, indent=4)

    logging.info(f"Characteristic library saved to {output_path}")
    return char_library


# ------------------------------------------------------------------------------
# Task 4, Step 2: Align quarterly accounting data (Helper Function)
# ------------------------------------------------------------------------------
def _align_accounting_data(
    df_clean: pd.DataFrame,
    lag_months: int = 6
) -> pd.DataFrame:
    """
    Aligns quarterly Compustat data to a monthly frequency with a reporting lag.

    This function prevents look-ahead bias by ensuring that for any given month `t`,
    the accounting data used is from a fiscal quarter that was publicly available,
    i.e., with a `datadate` at least `lag_months` prior to `t`.

    Args:
        df_clean: The cleansed DataFrame with a (date, permno) MultiIndex.
        lag_months: The number of months to lag accounting data.

    Returns:
        A DataFrame with the same monthly index, now including correctly lagged
        quarterly accounting columns.
    """
    # Reset index to use 'date' and 'permno' as columns for merging.
    monthly_df = df_clean.reset_index()

    # Define the list of quarterly accounting columns to be lagged.
    accounting_cols = [
        'datadate', 'atq', 'ltq', 'seqq', 'ceqq', 'saleq', 'ibq',
        'oibdpq', 'pstkq', 'cogsq', 'xsgaq', 'ppentq', 'invtq'
    ]

    # Create a DataFrame containing only the necessary accounting data.
    # Drop duplicates to ensure one record per firm-quarter.
    quarterly_df = monthly_df[['permno', 'datadate'] + accounting_cols[1:]].copy()
    quarterly_df['datadate'] = pd.to_datetime(quarterly_df['datadate'], errors='coerce')
    quarterly_df.dropna(subset=['permno', 'datadate'], inplace=True)
    quarterly_df.drop_duplicates(subset=['permno', 'datadate'], inplace=True)
    quarterly_df.sort_values(by=['permno', 'datadate'], inplace=True)

    # Create the monthly frame for the merge, containing only identifiers and dates.
    monthly_grid = monthly_df[['date', 'permno']].copy()
    monthly_grid.sort_values(by=['permno', 'date'], inplace=True)

    # Calculate the cutoff date for each month to enforce the reporting lag.
    monthly_grid['cutoff_date'] = monthly_grid['date'] - pd.DateOffset(months=lag_months)

    # Perform the as-of merge. For each firm-month, this finds the most recent
    # accounting data (from quarterly_df) where datadate <= cutoff_date.
    df_aligned = pd.merge_asof(
        left=monthly_grid,
        right=quarterly_df,
        left_on='cutoff_date',
        right_on='datadate',
        by='permno',
        direction='backward' # Find the last observation in `right` at or before `left`'s date
    )

    # Merge the aligned accounting data back to the main monthly DataFrame.
    df_final = pd.merge(
        monthly_df,
        df_aligned.drop(columns=['cutoff_date']),
        on=['date', 'permno'],
        how='left'
    )

    # Set the (date, permno) MultiIndex back.
    df_final.set_index(['date', 'permno'], inplace=True)
    df_final.sort_index(inplace=True)

    logging.info(f"Accounting data aligned with a {lag_months}-month lag.")
    return df_final


# ------------------------------------------------------------------------------
# Task 4, Step 3: Compute characteristics (Helper Functions by Category)
# ------------------------------------------------------------------------------
def _compute_characteristics(
    df_aligned: pd.DataFrame
) -> pd.DataFrame:
    """
    Computes a suite of firm characteristics using vectorized operations.

    This function acts as a factory, calculating characteristics based on their
    definitions. It is designed to be modular, with separate logic for different
    categories of signals (size, value, momentum, etc.).

    Args:
        df_aligned: DataFrame containing both monthly market data and lagged
                    quarterly accounting data.

    Returns:
        A new DataFrame containing the original data plus new columns for each
        computed characteristic.
    """
    # Create a copy to store the new characteristic columns.
    df_chars = df_aligned.copy()

    # --- Size Characteristics ---
    # Market Equity: |Price| * Shares Outstanding.
    df_chars['market_equity'] = df_chars['prc'].abs() * df_chars['shrout']

    # --- Value Characteristics ---
    # Book-to-Market Ratio: Lagged Shareholders' Equity / Market Equity.
    # Replace non-positive book equity with NaN to avoid meaningless ratios.
    book_equity = df_chars['seqq'].where(df_chars['seqq'] > 0)
    df_chars['book_to_market'] = book_equity / df_chars['market_equity']

    # --- Momentum Characteristics ---
    # Momentum (12-2): Compounded return from month t-12 to t-2.
    # Group by firm (permno) to compute rolling returns correctly.
    # Add 1 to returns to calculate cumulative product.
    ret_plus_one = df_chars['ret'].add(1)
    # The rolling window of 11 months captures returns from t-12 to t-2.
    # We then shift by 1 to align the calculation correctly at time t.
    # The product of (1+ret) over 11 months is calculated.
    # We subtract 1 to get the total compounded return.
    # min_periods=11 ensures we only calculate if we have enough data.
    df_chars['momentum_12_2'] = (
        ret_plus_one.groupby('permno')
        .rolling(window=11, min_periods=11)
        .apply(np.prod, raw=True)
        .reset_index(level=0, drop=True)
        .groupby('permno')
        .shift(1)
    ) - 1

    # --- Profitability Characteristics ---
    # Return on Equity: Lagged Income Before Extraordinary Items / Lagged Equity.
    df_chars['return_on_equity'] = df_chars['ibq'] / book_equity

    # --- Investment Characteristics ---
    # Asset Growth: Growth in total assets over the prior year.
    # We need atq from 12 months ago (which corresponds to datadate of t-18m).
    # A groupby().diff(12) on the monthly aligned data approximates this.
    atq_lag_12m = df_chars.groupby('permno')['atq'].shift(12)
    # Replace non-positive lagged assets with NaN.
    atq_lag_12m = atq_lag_12m.where(atq_lag_12m > 0)
    df_chars['asset_growth'] = (df_chars['atq'] / atq_lag_12m) - 1

    logging.info("Computed representative set of characteristics.")
    return df_chars


# ------------------------------------------------------------------------------
# Task 4, Orchestrator Function
# ------------------------------------------------------------------------------
def construct_characteristics(
    df_cleansed: pd.DataFrame,
    config_dir: str = "config",
    output_dir: str = "data_interim"
) -> pd.DataFrame:
    """
    Orchestrates the construction of the firm characteristic library.

    This function executes the full characteristic construction pipeline:
    1. Defines and saves a machine-readable library of characteristic definitions.
    2. Aligns quarterly accounting data to a monthly frequency, rigorously
       enforcing a reporting lag to prevent look-ahead bias.
    3. Computes all defined characteristics using efficient, vectorized operations.
    4. Logs the missingness rate for each characteristic as a diagnostic.
    5. Checkpoints the final DataFrame with raw characteristics to a Parquet file.

    Args:
        df_cleansed: The fully cleansed and filtered DataFrame from Task 3.
        config_dir: Directory to save the characteristic library definition.
        output_dir: Directory to save the final DataFrame and logs.

    Returns:
        A wide DataFrame with a (date, permno) MultiIndex, containing all
        original data plus the newly computed raw characteristic columns.
    """
    # Ensure output directories exist.
    os.makedirs(config_dir, exist_ok=True)
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Define and save the characteristic library ---
    char_lib_path = os.path.join(config_dir, "char_library_definitions.json")
    char_library = _create_and_save_char_library(output_path=char_lib_path)
    logging.info("Step 1/3: Characteristic library defined and saved.")

    # --- Step 2: Align accounting data to prevent look-ahead bias ---
    # Use a default lag of 6 months, standard in academic research.
    # This could be parameterized from the main config if needed.
    df_aligned = _align_accounting_data(df_cleansed, lag_months=6)
    logging.info("Step 2/3: Accounting data alignment complete.")

    # --- Step 3: Compute all defined characteristics ---
    df_with_chars = _compute_characteristics(df_aligned)
    logging.info("Step 3/3: Characteristic computation complete.")

    # --- Finalization and Reporting ---
    # Extract only the characteristic columns for reporting and downstream use.
    char_names = [c['char_name'] for c in char_library]
    df_characteristics_raw = df_with_chars[char_names]

    # Generate and save a missingness report for the computed characteristics.
    missing_report = (
        df_characteristics_raw.isnull().sum() / len(df_characteristics_raw)
    ).sort_values(ascending=False).to_frame('missing_fraction')

    report_path = os.path.join(output_dir, "char_missingness_summary.csv")
    missing_report.to_csv(report_path)
    logging.info(f"Characteristic missingness report saved to {report_path}")

    # Checkpoint the final DataFrame with raw characteristics.
    checkpoint_path = os.path.join(output_dir, "df_characteristics_raw.parquet")
    df_characteristics_raw.to_parquet(checkpoint_path, compression="snappy")
    logging.info(f"Raw characteristics DataFrame checkpointed to {checkpoint_path}")

    return df_characteristics_raw


In [None]:
# Task 5: Cross-Sectional Rank Normalization by Month

# ==============================================================================
# Task 5: Cross-Sectional Rank Normalization by Month
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 5, Orchestrator Function
# ------------------------------------------------------------------------------
def rank_normalize_characteristics(
    df_characteristics_raw: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str = "data_interim"
) -> pd.DataFrame:
    """
    Performs cross-sectional rank normalization on raw characteristic data.

    This function transforms each characteristic to be on a comparable scale,
    which is a crucial preprocessing step for many financial models, including
    tensor decomposition. The process for each characteristic is:
    1. For each month, rank firms based on the characteristic value. Missing
       values are ignored, and ties are handled by assigning the average rank.
    2. Linearly rescale these ranks to a fixed interval of [-0.5, 0.5].

    This procedure makes the resulting data robust to outliers and removes
    arbitrary differences in scale across characteristics.

    Args:
        df_characteristics_raw: A DataFrame with a (date, permno) MultiIndex
                                containing the raw, computed characteristic values.
        config: The study's configuration dictionary, containing normalization
                parameters.
        output_dir: Directory to save the normalized DataFrame and summary logs.

    Returns:
        A new DataFrame of the same shape, with characteristic values replaced
        by their cross-sectionally normalized ranks.
    """
    # --- Input Validation ---
    # Verify that the input is a pandas DataFrame.
    if not isinstance(df_characteristics_raw, pd.DataFrame):
        raise TypeError("Input 'df_characteristics_raw' must be a pandas DataFrame.")

    # Verify that the DataFrame has the expected MultiIndex.
    if not isinstance(df_characteristics_raw.index, pd.MultiIndex) or \
       df_characteristics_raw.index.names != ['date', 'permno']:
        raise ValueError("Input DataFrame must have a MultiIndex named ('date', 'permno').")

    # Create a copy to ensure the original DataFrame is not modified.
    df_normalized = df_characteristics_raw.copy()

    # Extract normalization parameters from the configuration.
    norm_config = config["tensor_policies"]["rank_normalization"]
    rank_method = norm_config["rank_method"]
    target_interval = norm_config["scale_to_interval"]

    # --- Step 1: Rank each characteristic within each month ---
    # Group the DataFrame by date to perform cross-sectional operations.
    # The `level='date'` argument targets the 'date' level of the MultiIndex.
    grouped_by_date = df_normalized.groupby(level='date')

    # Apply the rank transformation to all characteristic columns.
    # `na_option='keep'` ensures that existing NaN values are preserved.
    # `method='average'` handles ties as specified.
    df_ranks = grouped_by_date.rank(method=rank_method, na_option='keep', pct=False)
    logging.info("Step 1/3: Cross-sectional ranking complete.")

    # --- Step 2: Recenter and rescale ranks to the target interval ---
    # First, calculate N_t_ell, the number of non-null observations for each
    # characteristic in each month. `transform('count')` is crucial as it
    # broadcasts the group-wise count back to the original shape.
    n_t_ell = grouped_by_date.transform('count')

    # Handle the edge case where N_t_ell is 1. The denominator becomes 0.
    # In this case, the normalized value should be 0 (the midpoint of [-0.5, 0.5]).
    # We create a mask to identify where the denominator is non-zero.
    denominator = n_t_ell - 1

    # The formula for rescaling is: x_tilde = (rank - 1) / (N - 1) - 0.5
    # This maps rank 1 to -0.5 and rank N to +0.5.
    # We use np.where to safely handle the division-by-zero case.
    df_normalized = np.where(
        denominator > 0,
        (df_ranks - 1) / denominator - 0.5, # Apply formula where N > 1
        0.0                                 # Set to 0 where N <= 1
    )

    # The result of np.where is a NumPy array, so we convert it back to a
    # DataFrame, preserving the original index and columns.
    df_normalized = pd.DataFrame(
        df_normalized,
        index=df_characteristics_raw.index,
        columns=df_characteristics_raw.columns
    )

    # The np.where operation can lose the original NaNs, so we must re-mask them.
    df_normalized[df_characteristics_raw.isnull()] = np.nan
    logging.info("Step 2/3: Rank rescaling to [-0.5, 0.5] complete.")

    # --- Step 3: Assemble, verify, and checkpoint the normalized DataFrame ---
    # Compute summary statistics for validation.
    summary_stats = df_normalized.describe().transpose()

    # Perform sanity checks on the output.
    # The minimum value should be approximately -0.5.
    if not np.allclose(summary_stats['min'].min(), target_interval[0], atol=1e-9):
        logging.warning(f"Minimum normalized value is {summary_stats['min'].min()}, expected {target_interval[0]}.")

    # The maximum value should be approximately 0.5.
    if not np.allclose(summary_stats['max'].max(), target_interval[1], atol=1e-9):
        logging.warning(f"Maximum normalized value is {summary_stats['max'].max()}, expected {target_interval[1]}.")

    # The mean should be close to 0.
    if not (summary_stats['mean'].abs() < 0.1).all():
        logging.warning("Mean of some normalized characteristics is far from 0.")

    # Log the summary statistics to a file for auditing.
    os.makedirs(output_dir, exist_ok=True)
    summary_log_path = os.path.join(output_dir, "normalization_summary_stats.csv")
    summary_stats.to_csv(summary_log_path)
    logging.info(f"Step 3/3: Verification complete. Summary stats saved to {summary_log_path}.")

    # Checkpoint the final normalized DataFrame to a Parquet file.
    checkpoint_path = os.path.join(output_dir, "df_characteristics_normalized.parquet")
    df_normalized.to_parquet(checkpoint_path, compression="snappy")
    logging.info(f"Normalized characteristics DataFrame checkpointed to {checkpoint_path}")

    return df_normalized


In [None]:
# Task 6: Form the 3-Mode Tensor $\mathcal{X} \in \mathbb{R}^{T \times N \times L}$ and Observed-Index Set $\Omega$

# ==============================================================================
# Task 6: Form the 3-Mode Tensor and Observed-Index Set
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 6, Step 1: Construct firm and time index mappings (Helper Function)
# ------------------------------------------------------------------------------
def _create_index_mappings(
    df_normalized: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str
) -> Tuple[Dict[pd.Timestamp, int], Dict[int, int]]:
    """
    Creates and saves mappings from original identifiers to integer indices.

    This function extracts the unique, sorted dates and permnos from the
    DataFrame's MultiIndex and creates dictionaries that map each identifier
    to a zero-based integer. These mappings are essential for constructing the
    tensor and for later interpreting its dimensions.

    Args:
        df_normalized: The normalized characteristics DataFrame.
        config: The study's configuration dictionary.
        output_dir: Directory to save the mapping files.

    Returns:
        A tuple containing two dictionaries: (time_to_idx, firm_to_idx).

    Raises:
        ValueError: If the number of unique months in the data does not match
                    the expected number from the configuration.
    """
    # --- Time Dimension Mapping ---
    # Extract unique dates from the index, sort them, and get the total count T.
    unique_dates = sorted(df_normalized.index.get_level_values('date').unique())
    T = len(unique_dates)

    # Validate that the number of months matches the configuration's expectation.
    t_expected = config["tensor_policies"]["T_months_expected"]
    if T != t_expected:
        raise ValueError(
            f"Number of unique months in data ({T}) does not match "
            f"config 'T_months_expected' ({t_expected})."
        )

    # Create the mapping from Timestamp to integer index.
    time_to_idx = {date: i for i, date in enumerate(unique_dates)}

    # Save the time mapping. Timestamps are converted to ISO format strings for JSON compatibility.
    time_map_path = os.path.join(output_dir, "time_to_idx.json")
    with open(time_map_path, 'w') as f:
        json.dump({d.isoformat(): i for d, i in time_to_idx.items()}, f, indent=4)
    logging.info(f"Time-to-index mapping saved to {time_map_path}")

    # --- Firm Dimension Mapping ---
    # Extract unique permnos, sort them, and get the total count N.
    unique_permnos = sorted(df_normalized.index.get_level_values('permno').unique())

    # Create the mapping from permno to integer index.
    firm_to_idx = {permno: i for i, permno in enumerate(unique_permnos)}

    # Save the firm mapping.
    firm_map_path = os.path.join(output_dir, "firm_to_idx.json")
    with open(firm_map_path, 'w') as f:
        json.dump(firm_to_idx, f, indent=4)
    logging.info(f"Firm-to-index mapping saved to {firm_map_path}")

    return time_to_idx, firm_to_idx


# ------------------------------------------------------------------------------
# Task 6, Orchestrator Function
# ------------------------------------------------------------------------------
def form_tensor_and_observed_set(
    df_normalized: pd.DataFrame,
    config: Dict[str, Any],
    output_dir: str = "data_processed",
    metadata_dir: str = "data_interim"
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Reshapes the normalized panel DataFrame into a 3D NumPy tensor.

    This function orchestrates the transformation of the 2D panel data into a
    3D tensor of shape (T, N, L) as required by the ACT-Tensor model. It performs:
    1. Creation of deterministic mappings from date/permno to integer indices.
    2. Allocation of a NaN-initialized NumPy array for the tensor.
    3. Efficient population of the tensor using the mappings and DataFrame values.
    4. Creation of a boolean mask representing the set of observed entries (Omega).
    5. Calculation and logging of tensor metadata (dimensions, density).
    6. Saving the tensor, mask, and metadata to disk for downstream use.

    Args:
        df_normalized: The DataFrame of normalized characteristics from Task 5.
        config: The study's configuration dictionary.
        output_dir: Directory to save the final tensor and mask files.
        metadata_dir: Directory to save metadata like index mappings.

    Returns:
        A tuple containing:
        - tensor_X (np.ndarray): The 3D tensor of shape (T, N, L) with NaNs
          for missing values.
        - mask_observed (np.ndarray): A boolean tensor of the same shape, where
          True indicates an observed value.
    """
    # --- Input Validation ---
    if not isinstance(df_normalized, pd.DataFrame):
        raise TypeError("Input 'df_normalized' must be a pandas DataFrame.")

    # Ensure output directories exist.
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(metadata_dir, exist_ok=True)

    # --- Step 1: Construct the firm and time index mappings ---
    time_to_idx, firm_to_idx = _create_index_mappings(
        df_normalized, config, metadata_dir
    )
    T = len(time_to_idx)
    N = len(firm_to_idx)
    L = config["tensor_policies"]["L_characteristics"]

    # Verify that the number of columns in the DataFrame matches L.
    if df_normalized.shape[1] != L:
        raise ValueError(
            f"DataFrame has {df_normalized.shape[1]} columns, but config "
            f"'L_characteristics' is {L}."
        )
    logging.info(f"Step 1/3: Index mappings created. Tensor dimensions: T={T}, N={N}, L={L}.")

    # --- Step 2: Allocate the tensor and populate observed entries ---
    # Allocate a 3D NumPy array initialized with np.nan.
    # The dtype is set from the config to ensure high precision.
    float_dtype = config["reproducibility"]["float_dtype"]
    tensor_X = np.full((T, N, L), np.nan, dtype=float_dtype)

    # For efficient population, get integer indices for all rows at once.
    # This is a vectorized operation and vastly outperforms row-wise iteration.
    time_indices = df_normalized.index.get_level_values('date').map(time_to_idx).to_numpy()
    firm_indices = df_normalized.index.get_level_values('permno').map(firm_to_idx).to_numpy()

    # Use advanced NumPy indexing to populate the tensor in one operation.
    tensor_X[time_indices, firm_indices, :] = df_normalized.to_numpy()
    logging.info("Step 2/3: Tensor allocated and populated efficiently.")

    # --- Step 3: Construct the observed-index set and compute metadata ---
    # Create the boolean mask Omega, where True indicates an observed entry.
    # This is the computational representation of the set Ω.
    mask_observed = ~np.isnan(tensor_X)

    # Calculate metadata for logging and validation.
    observed_count = mask_observed.sum()
    total_elements = T * N * L
    global_density = observed_count / total_elements

    # Verify that the observed density is in a plausible range (e.g., ~17% from paper).
    if not (0.05 < global_density < 0.5):
        logging.warning(
            f"Global data density is {global_density:.4f}, which may be "
            f"unexpected. The paper reports ~17% observed (83% missing)."
        )

    # Compile metadata into a dictionary.
    tensor_metadata = {
        "T_dim": T,
        "N_dim": N,
        "L_dim": L,
        "observed_count": int(observed_count),
        "total_elements": total_elements,
        "global_density": global_density
    }

    # Save metadata to a JSON file.
    metadata_path = os.path.join(metadata_dir, "tensor_metadata.json")
    with open(metadata_path, 'w') as f:
        json.dump(tensor_metadata, f, indent=4)
    logging.info(f"Step 3/3: Observed-set mask created. Metadata saved to {metadata_path}.")

    # --- Checkpointing ---
    # Save the final tensor to a compressed NumPy archive.
    tensor_path = os.path.join(output_dir, "tensor_X.npz")
    np.savez_compressed(tensor_path, X=tensor_X)
    logging.info(f"Tensor saved to {tensor_path}")

    # Save the boolean mask to a separate compressed archive.
    mask_path = os.path.join(output_dir, "mask_observed.npz")
    np.savez_compressed(mask_path, mask=mask_observed)
    logging.info(f"Observed mask saved to {mask_path}")

    return tensor_X, mask_observed


In [None]:
# Task 7: Create Evaluation Mask for MAR (Missing-At-Random) Regime

# ==============================================================================
# Task 7: Create Evaluation Mask for MAR (Missing-At-Random) Regime
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 7, Orchestrator Function
# ------------------------------------------------------------------------------
def create_mar_mask(
    mask_observed: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed",
    existing_masks: Optional[List[np.ndarray]] = None
) -> np.ndarray:
    """
    Creates a held-out evaluation mask under the Missing-At-Random (MAR) assumption.

    This function generates a boolean mask that randomly selects a specified
    fraction of the observed entries in the input tensor to be used as a
    held-out test set. The process is governed by a dedicated random seed to
    ensure perfect reproducibility.

    The steps are as follows:
    1. Identify all observed entries from the input `mask_observed`.
    2. Set the specific random seed for MAR masking.
    3. Uniformly sample a specified fraction of these entries without replacement.
    4. Create a new boolean mask of the same shape as the input, with `True` at
       the locations of the sampled entries.
    5. Verify that the new mask is disjoint from any previously created masks.
    6. Save the mask as a compressed NumPy array and the list of masked indices
       as a CSV for auditing.

    Args:
        mask_observed: A boolean NumPy array of shape (T, N, L) where `True`
                       indicates an observed entry in the original tensor.
        config: The study's configuration dictionary.
        output_dir: Directory to save the output mask and audit files.
        existing_masks: An optional list of other boolean masks to check for
                        disjointness.

    Returns:
        A boolean NumPy array of shape (T, N, L) representing the MAR
        evaluation mask.

    Raises:
        ValueError: If the number of observed entries is zero or if the new
                    mask overlaps with an existing mask.
    """
    # --- Input Validation ---
    if not isinstance(mask_observed, np.ndarray) or mask_observed.dtype != bool:
        raise TypeError("'mask_observed' must be a boolean NumPy array.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Identify eligible observed entries and set random seed ---
    # Find the coordinates (t, n, l) of all observed entries.
    # `np.argwhere` returns an array of shape (num_observed, 3).
    observed_indices = np.argwhere(mask_observed)

    # Get the total number of observed entries.
    num_observed = observed_indices.shape[0]
    if num_observed == 0:
        raise ValueError("The 'mask_observed' contains no observed entries to sample from.")

    # Set the specific random seed for MAR masking to ensure reproducibility.
    mar_seed = config["reproducibility"]["seed_mask_mar"]
    np.random.seed(mar_seed)
    logging.info(f"Step 1/3: Identified {num_observed} observed entries. Random seed set to {mar_seed}.")

    # --- Step 2: Uniformly sample a fraction of observed entries ---
    # Get the fraction of observed data to hold out for the test set.
    test_fraction = config["masking_regimes"]["test_fraction_of_observed"]

    # Calculate the target number of entries to mask using the floor function.
    # Formula: M_MAR = floor(test_fraction * |Omega_full|)
    num_to_mask = int(np.floor(test_fraction * num_observed))

    # Uniformly sample `num_to_mask` indices from the list of observed entries
    # without replacement. We sample from the range of indices [0, num_observed-1].
    sampled_linear_indices = np.random.choice(
        num_observed,
        size=num_to_mask,
        replace=False
    )

    # Retrieve the (t, n, l) coordinates corresponding to the sampled indices.
    masked_coords = observed_indices[sampled_linear_indices]
    logging.info(f"Step 2/3: Sampled {num_to_mask} entries for the MAR mask ({test_fraction:.2%}).")

    # --- Step 3: Create, verify, and save the MAR mask ---
    # Create a new boolean mask initialized to all False.
    mask_eval_mar = np.zeros_like(mask_observed, dtype=bool)

    # Use advanced integer indexing to set the sampled locations to True.
    # This is highly efficient. We need to transpose the coordinates for this.
    mask_eval_mar[masked_coords[:, 0], masked_coords[:, 1], masked_coords[:, 2]] = True

    # Verify that the number of True values in the new mask is correct.
    assert mask_eval_mar.sum() == num_to_mask

    # If provided, check that the new mask is disjoint from existing masks.
    if existing_masks:
        for i, existing_mask in enumerate(existing_masks):
            if np.any(mask_eval_mar & existing_mask):
                raise ValueError(f"MAR mask overlaps with existing mask at index {i}.")

    # --- Checkpointing and Reporting ---
    # Save the boolean mask to a compressed NumPy archive.
    mask_path = os.path.join(output_dir, "mask_eval_MAR.npz")
    np.savez_compressed(mask_path, mask=mask_eval_mar)
    logging.info(f"MAR evaluation mask saved to {mask_path}")

    # Save the list of masked indices to a CSV file for audit purposes.
    indices_df = pd.DataFrame(masked_coords, columns=["t_idx", "n_idx", "l_idx"])
    audit_path = os.path.join(output_dir, "mask_MAR_indices.csv")
    indices_df.to_csv(audit_path, index=False)
    logging.info(f"MAR mask indices audit file saved to {audit_path}")

    # Compute and log the effective masking rate for verification.
    effective_rate = num_to_mask / num_observed
    logging.info(f"Step 3/3: Verification complete. Effective masking rate: {effective_rate:.6f}.")

    # Final check on the rate against a tolerance.
    if not np.isclose(effective_rate, test_fraction, atol=1/num_observed):
        logging.warning(
            f"Effective masking rate ({effective_rate:.4f}) differs slightly "
            f"from target rate ({test_fraction:.4f}) due to flooring."
        )

    return mask_eval_mar


In [None]:
# Task 8: Create Evaluation Mask for Block Missingness Regime

# ==============================================================================
# Task 8: Create Evaluation Mask for Block Missingness Regime
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 8, Orchestrator Function
# ------------------------------------------------------------------------------
def create_block_mask(
    mask_observed: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed",
    existing_masks: Optional[List[np.ndarray]] = None
) -> np.ndarray:
    """
    Creates a held-out evaluation mask with structured block missingness.

    This function simulates real-world data outages by masking contiguous blocks
    of data for each firm-characteristic time series. The process is designed to
    test a model's ability to handle structured, non-random missingness.

    The process is as follows:
    1. Sets the specific random seed for block masking to ensure reproducibility.
    2. For each firm-characteristic series, determines the time span of observed data.
    3. With a given probability (p_start), places the block at the beginning of the
       series' observed span. Otherwise, it places the block at a random valid
       starting position within that span.
    4. The mask is applied only to entries that were originally observed.
    5. The process aims for an approximate global masking rate, which is then
       verified.
    6. The final mask is checked for disjointness against any existing masks.
    7. The mask and its indices are saved to disk for evaluation and auditing.

    Performance Note:
    The core logic iterates through each of the N * L time series. For extremely
    large datasets, this Python loop can be a performance bottleneck. For
    production-scale applications, this loop is an ideal candidate for
    parallelization using libraries like `joblib` to distribute the per-series
    computations across multiple CPU cores.

    Args:
        mask_observed: A boolean NumPy array of shape (T, N, L) where `True`
                       indicates an observed entry.
        config: The study's configuration dictionary.
        output_dir: Directory to save the output mask and audit files.
        existing_masks: An optional list of other boolean masks to check for
                        disjointness.

    Returns:
        A boolean NumPy array of shape (T, N, L) representing the Block
        evaluation mask.

    Raises:
        ValueError: If the new mask overlaps with an existing mask.
        TypeError: If input `mask_observed` is not a boolean NumPy array.
    """
    # --- Input Validation ---
    # Verify that the input mask is a 3D boolean NumPy array.
    if not isinstance(mask_observed, np.ndarray) or mask_observed.dtype != bool or mask_observed.ndim != 3:
        raise TypeError("'mask_observed' must be a 3D boolean NumPy array.")

    # Ensure the output directory exists for saving artifacts.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Set random seed and define block parameters ---
    # Set the specific random seed for Block masking from the config to ensure reproducibility.
    block_seed = config["reproducibility"]["seed_mask_block"]
    np.random.seed(block_seed)

    # Extract block missingness parameters from the configuration dictionary.
    block_config = config["masking_regimes"]["Block"]
    block_length: int = block_config["block_length_months"]
    start_prob: float = block_config["start_at_begin_prob"]

    # Get tensor dimensions from the shape of the observed mask.
    T, N, L = mask_observed.shape

    # Initialize the output mask with all False values, with the same shape and type.
    mask_eval_block = np.zeros_like(mask_observed, dtype=bool)

    logging.info(
        f"Step 1/3: Initialized block masking with seed={block_seed}, "
        f"length={block_length}, start_prob={start_prob}."
    )

    # --- Step 2: For each firm-characteristic pair, randomly place one block ---
    # This section iterates through each of the N * L series. For production scale,
    # consider parallelizing this loop (e.g., with `joblib.Parallel`).
    for n in range(N):
        for l in range(L):
            # Extract the 1D boolean vector for the current series' observed status.
            series_mask = mask_observed[:, n, l]

            # Find the time indices 't' where data is observed for this series.
            observed_times = np.where(series_mask)[0]

            # A block can only be placed if there are at least `block_length` observed points.
            if len(observed_times) < block_length:
                continue

            # Determine the full time span of the series' observations.
            min_obs_time = observed_times[0]
            max_obs_time = observed_times[-1]

            # The latest possible start time for a block to fit within the observed span.
            latest_possible_start = max_obs_time - block_length + 1

            # If the observed span is too short to fit a block, skip.
            if min_obs_time > latest_possible_start:
                continue

            # Decide probabilistically whether to place the block at the start or randomly.
            if np.random.rand() < start_prob:
                # Set the block to start at the very first observed data point.
                start_time = min_obs_time
            else:
                # Sample a start time uniformly from the entire valid
                # integer range within the series' observed span.
                start_time = np.random.randint(min_obs_time, latest_possible_start + 1)

            # Define the contiguous time interval for the block.
            end_time = start_time + block_length
            block_interval = np.arange(start_time, end_time)

            # The final mask for this series is the intersection of the contiguous block
            # interval and the set of originally observed time points.
            mask_indices_for_series = np.intersect1d(block_interval, observed_times)

            # Update the global block mask at the identified locations for this series.
            mask_eval_block[mask_indices_for_series, n, l] = True

    logging.info("Step 2/3: Completed block placement for all series.")

    # --- Step 3: Verify overall masking rate and disjointness, then save ---
    # Calculate the total number of entries that were actually masked.
    num_masked = mask_eval_block.sum()
    # Get the total number of originally observed entries.
    num_observed = mask_observed.sum()

    # Handle the edge case where no entries were masked.
    if num_masked == 0:
        logging.warning("Block masking resulted in zero masked entries. This may be expected if data is extremely sparse.")
        return mask_eval_block

    # Calculate the effective global masking rate.
    effective_rate = num_masked / num_observed
    target_fraction = config["masking_regimes"]["test_fraction_of_observed"]

    # Log the result and warn if it deviates significantly from the target range.
    logging.info(f"Total entries masked: {num_masked}. Effective global rate: {effective_rate:.4f}")
    if not (0.08 <= effective_rate <= 0.12):
        logging.warning(
            f"Effective masking rate ({effective_rate:.4f}) deviates "
            f"significantly from the target range [8%, 12%]."
        )

    # Check for disjointness with any previously generated masks. This is a critical check.
    if existing_masks:
        for i, existing_mask in enumerate(existing_masks):
            # The bitwise AND operation finds any overlapping True values.
            if np.any(mask_eval_block & existing_mask):
                overlap_count = (mask_eval_block & existing_mask).sum()
                raise ValueError(
                    f"Block mask overlaps with existing mask at index {i}. "
                    f"Overlap count: {overlap_count}. Experimental design is compromised."
                )

    # --- Checkpointing and Reporting ---
    # Save the final boolean mask to a compressed NumPy archive.
    mask_path = os.path.join(output_dir, "mask_eval_Block.npz")
    np.savez_compressed(mask_path, mask=mask_eval_block)
    logging.info(f"Block evaluation mask saved to {mask_path}")

    # Save the coordinates of the masked entries to a CSV for auditing.
    masked_coords = np.argwhere(mask_eval_block)
    indices_df = pd.DataFrame(masked_coords, columns=["t_idx", "n_idx", "l_idx"])
    audit_path = os.path.join(output_dir, "mask_Block_indices.csv")
    indices_df.to_csv(audit_path, index=False)
    logging.info(f"Block mask indices audit file saved to {audit_path}")
    logging.info("Step 3/3: Verification and saving complete.")

    return mask_eval_block


In [None]:
# Task 9: Create Evaluation Mask for Logistic (Logit) Missingness Regime

# ==============================================================================
# Task 9: Create Evaluation Mask for Logistic (Logit) Missingness Regime
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 9, Step 1: Prepare covariates for the logistic model (Helper Function)
# ------------------------------------------------------------------------------
def _prepare_logit_covariates(
    df_clean: pd.DataFrame,
    df_chars_raw: pd.DataFrame,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int],
    T: int,
    N: int
) -> np.ndarray:
    """
    Pre-computes the static covariates used in the Logit missingness model.

    This function calculates firm-level and firm-month level characteristics
    like log-size, age, and a distress indicator, then aligns them into a
    tensor of shape (T, N, num_covariates) for efficient lookup during simulation.

    Methodological Note: Covariates are generated from the fully cleansed and
    observed panel. The simulation models the probability of an *observed* point
    becoming missing, conditional on these covariates.

    Args:
        df_clean: The cleansed DataFrame from Task 3, containing market cap data.
        df_chars_raw: The raw characteristics DataFrame from Task 4 for distress indicator.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.
        T: Total number of time periods.
        N: Total number of firms.

    Returns:
        A NumPy array of shape (T, N, 3) containing the covariates
        [log_size, age, distress_indicator] for each firm-month.
    """
    # Create a combined DataFrame for easier feature engineering.
    df_cov = df_clean.reset_index().merge(
        df_chars_raw.reset_index(), on=['date', 'permno'], how='left'
    )

    # --- 1. Compute Log Market Equity (logsize) ---
    # Calculate market equity from price and shares outstanding.
    df_cov['market_equity'] = df_cov['prc'].abs() * df_cov['shrout']
    # Compute the natural logarithm of market equity, replacing zeros with NaN before logging.
    df_cov['logsize'] = np.log(df_cov['market_equity'].replace(0, np.nan))

    # --- 2. Compute Firm Age ---
    # Find the first appearance date for each firm using a groupby-transform.
    first_appearance = df_cov.groupby('permno')['date'].transform('min')
    # Calculate age in months as the difference between the current date and first appearance.
    df_cov['age'] = ((df_cov['date'] - first_appearance) / np.timedelta64(1, 'M')).round()

    # --- 3. Compute Distress Indicator ---
    # Define distress as being in the top decile of book-to-market each month.
    df_cov['distress_indicator'] = df_cov.groupby('date')['book_to_market'].transform(
        lambda x: (x.rank(pct=True, ascending=False) <= 0.1).astype(float)
    )

    # --- 4. Assemble Covariate Tensor ---
    # Initialize an empty tensor for the covariates.
    covariate_tensor = np.full((T, N, 3), np.nan, dtype=np.float32)

    # Map date and permno to integer indices for tensor population.
    time_indices = df_cov['date'].map(time_to_idx)
    firm_indices = df_cov['permno'].map(firm_to_idx)

    # Select the computed covariate columns.
    cov_cols = ['logsize', 'age', 'distress_indicator']

    # Create a mask to filter out rows where mapping to tensor indices failed.
    valid_idx = pd.notna(time_indices) & pd.notna(firm_indices)

    # Populate the tensor using advanced indexing with the valid indices.
    covariate_tensor[
        time_indices[valid_idx].astype(int),
        firm_indices[valid_idx].astype(int),
        :
    ] = df_cov.loc[valid_idx, cov_cols].to_numpy(dtype=np.float32)

    # Forward-fill any remaining NaNs within the tensor to ensure covariates are available.
    # This handles cases where a firm exists in the tensor but has no covariates for a given month.
    df_tensor_reshaped = pd.DataFrame(covariate_tensor.reshape(T, -1))
    df_tensor_reshaped.fillna(method='ffill', inplace=True)
    covariate_tensor = df_tensor_reshaped.to_numpy().reshape(T, N, 3)

    logging.info("Prepared static covariates (logsize, age, distress) for Logit model.")
    return covariate_tensor

# ------------------------------------------------------------------------------
# Task 9, Step 2: Simulate missingness (Helper Function)
# ------------------------------------------------------------------------------
def _run_logit_simulation(
    mask_observed: np.ndarray,
    covariate_tensor: np.ndarray,
    alpha: np.ndarray,
    beta: np.ndarray,
    initial_gap_length: int = 6
) -> np.ndarray:
    """
    Runs the two-stage logistic simulation to generate a missingness mask.

    This function is a complete rewrite focusing on vectorization and accuracy.
    Stage 1 (Initial Gap) and Stage 2 (Monthly Conditional) are now implemented
    using efficient NumPy broadcasting to avoid slow Python loops.

    Args:
        mask_observed: Boolean array of observed entries.
        covariate_tensor: 3D array of static covariates (T, N, 3).
        alpha: Coefficients for the initial gap model.
        beta: Coefficients for the monthly conditional model.
        initial_gap_length: The number of initial observed points to mask.

    Returns:
        A boolean array of simulated missingness, before intersection with
        the observed mask.
    """
    # --- Initialization ---
    # Get tensor dimensions.
    T, N, L = mask_observed.shape

    # Initialize the simulation mask that will be populated.
    simulated_mask = np.zeros_like(mask_observed, dtype=bool)

    # Define the sigmoid function for probability calculation.
    def sigmoid(x):
        with np.errstate(over='ignore'):
            return 1 / (1 + np.exp(-x))

    # --- Stage 1: Initial Gap Simulation (Vectorized) ---
    # Use covariates from the first time period for the initial gap model.
    initial_covariates = covariate_tensor[0, :, :]  # Shape (N, 3)
    # Add intercept term by creating a column of ones.
    X_initial = np.hstack([np.ones((N, 1)), np.nan_to_num(initial_covariates)])

    # Compute initial gap probabilities for all N firms.
    pi_initial = sigmoid(X_initial @ alpha)

    # Simulate initial gaps for each of the N*L series.
    initial_gap_draws = np.random.rand(N, L) < pi_initial[:, np.newaxis]

    # Vectorized application of the initial gap mask.
    # Find all (n, l) pairs that need a gap.
    n_coords, l_coords = np.where(initial_gap_draws)

    # Use cumsum to find the k-th observed point for each series efficiently.
    observed_counts = np.cumsum(mask_observed, axis=0)
    # Create a mask for the first `k` observed points.
    is_initial_obs = (observed_counts > 0) & (observed_counts <= initial_gap_length)

    # Apply this logic only to the series selected for an initial gap.
    initial_mask_template = np.zeros_like(mask_observed, dtype=bool)
    initial_mask_template[:, n_coords, l_coords] = is_initial_obs[:, n_coords, l_coords]
    simulated_mask |= initial_mask_template

    # --- Stage 2: Monthly Conditional Simulation (Vectorized) ---
    # Initialize the dynamic 'previous missing' indicator.
    prev_missing_indicator = initial_gap_draws.astype(float)

    # Iterate through time, starting from the second period.
    for t in range(1, T):
        # Extract lagged static covariates for all firms.
        lagged_covs = np.nan_to_num(covariate_tensor[t - 1, :, :]) # Shape (N, 3)

        # Compute the firm-specific part of the score using lagged covariates.
        # beta[1:4] corresponds to [logsize, age, distress].
        firm_score_component = lagged_covs @ beta[1:4] # Shape (N,)

        # Compute the full linear score using broadcasting.
        # score = beta_0 + (firm_component) + beta_4 * prev_missing
        score = beta[0] + firm_score_component[:, np.newaxis] + prev_missing_indicator * beta[4]

        # Calculate probabilities for all N*L series at time t.
        pi_monthly_t = sigmoid(score)

        # Simulate missingness for time t via a vectorized Bernoulli draw.
        monthly_draws = np.random.rand(N, L) < pi_monthly_t

        # Update the mask for the current time slice.
        simulated_mask[t, :, :] |= monthly_draws

        # Update the 'prev_missing_indicator' for the next iteration.
        prev_missing_indicator = monthly_draws.astype(float)

    # The final mask is the intersection with the originally observed set.
    return simulated_mask & mask_observed

# ------------------------------------------------------------------------------
# Task 9, Orchestrator Function
# ------------------------------------------------------------------------------
def create_logit_mask(
    mask_observed: np.ndarray,
    df_clean: pd.DataFrame,
    df_chars_raw: pd.DataFrame,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int],
    config: Dict[str, Any],
    output_dir: str = "data_processed",
    existing_masks: Optional[List[np.ndarray]] = None
) -> np.ndarray:
    """
    Creates a held-out evaluation mask based on a two-stage logistic model.

    This function simulates a realistic, non-random missingness pattern where the
    probability of an entry being missing depends on firm characteristics. This
    re-implementation features a more robust calibration loop and sources model
    parameters from the configuration file.

    Args:
        mask_observed: Boolean array of observed entries.
        df_clean: Cleansed DataFrame for sourcing covariates.
        df_chars_raw: Raw characteristics DataFrame for sourcing covariates.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.
        config: The study's configuration dictionary.
        output_dir: Directory to save the output mask and audit files.
        existing_masks: Optional list of other masks to check for disjointness.

    Returns:
        A boolean NumPy array representing the Logit evaluation mask.
    """
    # --- Step 1: Prepare Covariates, Set Seed, and Load Parameters ---
    T, N, L = mask_observed.shape
    logit_seed = config["reproducibility"]["seed_mask_logit"]
    np.random.seed(logit_seed)

    # Prepare the static covariates needed for the simulation.
    covariate_tensor = _prepare_logit_covariates(
        df_clean, df_chars_raw, time_to_idx, firm_to_idx, T, N
    )

    # Load model parameters from config, ensuring they exist.
    logit_config = config["masking_regimes"]["Logit"]
    alpha = np.array(logit_config["alpha_coeffs"])
    beta_initial = np.array(logit_config["beta_coeffs"])

    logging.info("Step 1/3: Covariate preparation and parameter loading complete.")

    # --- Step 2: Simulate Missingness with a Robust Calibration Loop ---
    target_fraction = config["masking_regimes"]["test_fraction_of_observed"]
    num_observed = mask_observed.sum()

    # Initialize beta and calibration parameters.
    beta = beta_initial.copy()
    learning_rate = 0.5 # Controls speed of convergence for calibration

    for i in range(10): # Max 10 calibration iterations
        # Run the full simulation with the current beta.
        mask_eval_logit = _run_logit_simulation(
            mask_observed, covariate_tensor, alpha, beta
        )
        current_rate = mask_eval_logit.sum() / num_observed
        error = current_rate - target_fraction

        logging.info(f"Calibration iter {i+1}: Intercept={beta[0]:.4f}, Rate={current_rate:.4f}, Error={error:.4f}")

        # Stop if the rate is within a small tolerance of the target.
        if abs(error) < 0.005:
            logging.info("Calibration successful: target rate achieved.")
            break

        # Update the intercept on the log-odds scale to adjust the rate.
        beta[0] -= learning_rate * error

    logging.info("Step 2/3: Logit simulation and calibration complete.")

    # --- Step 3: Verify Disjointness and Save ---
    # Final verification and saving logic remains the same as the original.
    num_masked = mask_eval_logit.sum()
    effective_rate = num_masked / num_observed
    logging.info(f"Final calibrated masking rate: {effective_rate:.4f}")

    if existing_masks:
        for i, existing_mask in enumerate(existing_masks):
            if np.any(mask_eval_logit & existing_mask):
                raise ValueError(f"Logit mask overlaps with existing mask at index {i}.")

    mask_path = os.path.join(output_dir, "mask_eval_Logit.npz")
    np.savez_compressed(mask_path, mask=mask_eval_logit)

    masked_coords = np.argwhere(mask_eval_logit)
    indices_df = pd.DataFrame(masked_coords, columns=["t_idx", "n_idx", "l_idx"])
    audit_path = os.path.join(output_dir, "mask_Logit_indices.csv")
    indices_df.to_csv(audit_path, index=False)
    logging.info(f"Step 3/3: Verification and saving complete. Audit file at {audit_path}")

    return mask_eval_logit


In [None]:
# Task 10: Construct Training Tensors $\mathcal{X}_{\text{train}}$ for Each Masking Regime

# ==============================================================================
# Task 10: Construct Training Tensors for Each Masking Regime
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 10, Orchestrator Function
# ------------------------------------------------------------------------------
def create_training_tensors(
    tensor_X: np.ndarray,
    mask_observed: np.ndarray,
    regimes: List[str],
    data_dir: str = "data_processed"
) -> Dict[str, Dict[str, float]]:
    """
    Constructs and saves training tensors and masks for each evaluation regime.

    For each specified missingness regime (e.g., 'MAR', 'Block', 'Logit'), this
    function takes the complete ground-truth tensor and applies the corresponding
    evaluation mask to create a training tensor. The entries selected for the
    held-out test set are replaced with `np.nan`.

    It also generates and saves the corresponding 'training mask', which represents
    the set of all entries that are both originally observed and not held out for
    evaluation (Ω_train = Ω_full \\ M_regime).

    Args:
        tensor_X: The complete 3D ground-truth tensor with shape (T, N, L),
                  containing NaNs for originally missing data.
        mask_observed: A boolean array of shape (T, N, L) where `True` indicates
                       an originally observed entry (Ω_full).
        regimes: A list of strings identifying the regimes to process,
                 e.g., ['MAR', 'Block', 'Logit'].
        data_dir: The directory where evaluation masks are stored and where
                  training tensors/masks will be saved.

    Returns:
        A dictionary containing a summary of the training set size and density
        for each processed regime.

    Raises:
        FileNotFoundError: If an evaluation mask for a specified regime is not found.
        ValueError: If the shape of a mask does not match the tensor's shape.
    """
    # --- Input Validation ---
    if not isinstance(tensor_X, np.ndarray) or not isinstance(mask_observed, np.ndarray):
        raise TypeError("Inputs 'tensor_X' and 'mask_observed' must be NumPy arrays.")
    if tensor_X.shape != mask_observed.shape:
        raise ValueError("Shape of 'tensor_X' and 'mask_observed' must be identical.")

    # Initialize a dictionary to store metadata for all regimes.
    training_set_summary = {}

    # Iterate through each specified evaluation regime.
    for regime in regimes:
        logging.info(f"--- Processing regime: {regime} ---")

        # --- Step 1: Load the evaluation mask for the current regime ---
        eval_mask_path = os.path.join(data_dir, f"mask_eval_{regime}.npz")
        try:
            # Load the compressed numpy file containing the mask.
            mask_eval_regime = np.load(eval_mask_path)['mask']
        except FileNotFoundError:
            raise FileNotFoundError(f"Evaluation mask not found for regime '{regime}' at {eval_mask_path}")

        # Validate that the loaded mask has the correct shape.
        if mask_eval_regime.shape != tensor_X.shape:
            raise ValueError(
                f"Shape of evaluation mask for regime '{regime}' "
                f"({mask_eval_regime.shape}) does not match tensor shape ({tensor_X.shape})."
            )
        logging.info(f"Step 1/3: Successfully loaded evaluation mask for {regime}.")

        # --- Step 2: Create the training tensor by applying the mask ---
        # Create a copy of the full tensor to avoid modifying the original.
        X_train_regime = tensor_X.copy()

        # Set the held-out entries (where the eval mask is True) to NaN.
        # This is the core operation: X_train = X where mask_eval is False.
        X_train_regime[mask_eval_regime] = np.nan

        # Verification: The number of NaNs in X_train should be the original NaNs
        # plus the number of newly masked entries.
        expected_nans = (~mask_observed).sum() + mask_eval_regime.sum()
        actual_nans = np.isnan(X_train_regime).sum()
        assert expected_nans == actual_nans, "Mismatch in NaN count after masking."

        # Save the resulting training tensor.
        train_tensor_path = os.path.join(data_dir, f"X_train_{regime}.npz")
        np.savez_compressed(train_tensor_path, X_train=X_train_regime)
        logging.info(f"Step 2/3: Training tensor for {regime} created and saved to {train_tensor_path}.")

        # --- Step 3: Construct the training observation mask and log metadata ---
        # The training mask includes entries that were originally observed AND are not in the eval set.
        # Formula: mask_train = mask_observed AND (NOT mask_eval)
        mask_train_regime = mask_observed & (~mask_eval_regime)

        # Save the training mask.
        train_mask_path = os.path.join(data_dir, f"mask_train_{regime}.npz")
        np.savez_compressed(train_mask_path, mask=mask_train_regime)
        logging.info(f"Training mask for {regime} created and saved to {train_mask_path}.")

        # Compute and store metadata for the summary report.
        omega_train_size = mask_train_regime.sum()
        rho_train = omega_train_size / tensor_X.size
        training_set_summary[regime] = {
            "Omega_train_size": int(omega_train_size),
            "rho_train": rho_train
        }
        logging.info(
            f"Step 3/3: Metadata for {regime}: "
            f"Training set size = {int(omega_train_size)}, "
            f"Density = {rho_train:.4f}"
        )

    # --- Finalization ---
    # Save the summary metadata for all processed regimes to a JSON file.
    summary_path = os.path.join(data_dir, "training_set_summary.json")
    with open(summary_path, 'w') as f:
        json.dump(training_set_summary, f, indent=4)
    logging.info(f"Training set summary for all regimes saved to {summary_path}")

    return training_set_summary


In [None]:
# Task 11: Firm Clustering by Time–Characteristic Profiles via K-Means

# ==============================================================================
# Task 11: Firm Clustering by Time–Characteristic Profiles via K-Means
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 11, Orchestrator Function
# ------------------------------------------------------------------------------
def cluster_firms_by_profile(
    X_train: np.ndarray,
    firm_to_idx: Dict[int, int],
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> Dict[int, List[int]]:
    """
    Clusters firms into K groups based on their vectorized time-characteristic profiles.

    This function implements the first core step of the ACT-Tensor framework. It
    groups firms with similar data availability patterns and characteristic profiles
    together using the K-Means algorithm. This allows for group-specific treatment
    during the imputation phase.

    The process is as follows:
    1. For each firm, its (T, L) slice of the training tensor is vectorized into a
       single long vector of length T*L. Missing values (NaN) are filled with zero.
    2. The scikit-learn implementation of K-Means is fitted to this (N, T*L) matrix
       of firm profiles, using hyperparameters from the configuration file.
    3. The resulting cluster labels for each firm are validated, logged, and saved
       to disk for auditing and downstream use.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) for a specific regime.
        firm_to_idx: A dictionary mapping original permno identifiers to the
                     integer indices (0 to N-1) used in the tensor.
        config: The study's configuration dictionary.
        output_dir: Directory to save cluster assignments and diagnostic files.

    Returns:
        A dictionary mapping each cluster ID (int) to a list of firm integer
        indices (List[int]) belonging to that cluster.

    Raises:
        ValueError: If K-Means results in empty clusters.
    """
    # --- Input Validation ---
    if not isinstance(X_train, np.ndarray) or X_train.ndim != 3:
        raise TypeError("Input 'X_train' must be a 3D NumPy array.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Vectorize each firm's time–characteristic slice ---
    # Get tensor dimensions.
    T, N, L = X_train.shape

    # To create the (N, T*L) matrix for clustering, we first transpose the tensor
    # from (T, N, L) to (N, T, L) to bring the firm dimension to the front.
    firm_slices = X_train.transpose(1, 0, 2)

    # Then, we reshape the (N, T, L) tensor into a 2D matrix of shape (N, T*L).
    # Each row `i` of this matrix is the vectorized profile for firm `i`.
    # Formula: v_n = vec(X_{:, n, :})
    vectorized_profiles = firm_slices.reshape(N, T * L)

    # Handle missing values (NaNs) by filling them with 0.0. This is the
    # "Zero-fill" approach (Option B), which is robust for zero-centered data.
    vectorized_profiles = np.nan_to_num(vectorized_profiles, nan=0.0)
    logging.info(
        f"Step 1/3: Vectorized {N} firm profiles into a ({N}, {T*L}) matrix."
    )

    # --- Step 2: Run K-Means clustering ---
    # Extract K-Means hyperparameters from the configuration.
    kmeans_config = config["optimization_settings"]["kmeans"]
    imputation_hparams = config["imputation_hparams"]
    kmeans_seed = config["reproducibility"]["seed_kmeans"]

    # Initialize the KMeans model with parameters ensuring reproducibility and robustness.
    kmeans = KMeans(
        n_clusters=imputation_hparams["clusters_K"],
        init=kmeans_config["init"],
        n_init=kmeans_config["n_init"],
        max_iter=kmeans_config["max_iter"],
        tol=kmeans_config["tol"],
        random_state=kmeans_seed
    )

    # Fit the model to the vectorized profiles and get the cluster label for each firm.
    logging.info("Fitting K-Means model... This may take a few moments.")
    cluster_labels = kmeans.fit_predict(vectorized_profiles)
    logging.info("Step 2/3: K-Means fitting complete.")

    # --- Step 3: Store cluster assignments and verify cluster balance ---
    # Create a reverse mapping from integer index back to permno.
    idx_to_firm = {i: p for p, i in firm_to_idx.items()}

    # Create a DataFrame to store and save the assignments in a readable format.
    assignments_df = pd.DataFrame({
        "permno": [idx_to_firm[i] for i in range(N)],
        "firm_idx": np.arange(N),
        "cluster_id": cluster_labels
    })

    # Save the detailed assignments to a CSV file for auditing.
    assignments_path = os.path.join(output_dir, "cluster_assignments.csv")
    assignments_df.to_csv(assignments_path, index=False)
    logging.info(f"Cluster assignments saved to {assignments_path}")

    # --- Verification and Final Output Preparation ---
    # Group firms by their assigned cluster ID.
    clusters: Dict[int, List[int]] = {
        k: assignments_df[assignments_df['cluster_id'] == k]['firm_idx'].tolist()
        for k in range(imputation_hparams["clusters_K"])
    }

    # Verify that all clusters are non-empty and log their sizes.
    cluster_sizes = {k: len(v) for k, v in clusters.items()}
    if any(size == 0 for size in cluster_sizes.values()):
        raise ValueError(f"K-Means resulted in one or more empty clusters. Sizes: {cluster_sizes}")

    # Save cluster sizes to a JSON file.
    sizes_path = os.path.join(output_dir, "cluster_sizes.json")
    with open(sizes_path, 'w') as f:
        json.dump(cluster_sizes, f, indent=4)
    logging.info(f"Cluster sizes verified and saved to {sizes_path}.")

    # Save the cluster centroids for diagnostic purposes.
    centroids_path = os.path.join(output_dir, "cluster_centroids.npz")
    np.savez_compressed(centroids_path, centroids=kmeans.cluster_centers_)
    logging.info(f"Cluster centroids saved to {centroids_path}")
    logging.info("Step 3/3: Assignments stored and cluster balance verified.")

    return clusters


In [None]:
# Task 12: Compute Cluster Densities and Partition into Dense vs. Sparse

# ==============================================================================
# Task 12: Compute Cluster Densities and Partition into Dense vs. Sparse
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 12, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_densities_and_partition(
    X_train: np.ndarray,
    mask_train: np.ndarray,
    clusters: Dict[int, List[int]],
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> Tuple[List[int], List[int]]:
    """
    Computes data density for each firm cluster and partitions them into dense/sparse.

    This function is a critical step in the ACT-Tensor "divide and conquer"
    strategy. It quantifies the data availability within each cluster and uses a
    threshold to classify them, which determines the subsequent imputation method.

    The process is as follows:
    1. For each cluster, calculate the observed-entry ratio (density) within its
       corresponding sub-tensor in the training set.
    2. Compare each cluster's density to a predefined threshold (tau) to classify
       it as either 'dense' or 'sparse'.
    3. Save the sub-tensor and sub-mask for each cluster to disk for use in the
       next completion steps.
    4. Log a detailed report of the densities and classifications.

    Args:
        X_train: The 3D training tensor of shape (T, N, L).
        mask_train: The boolean training mask of shape (T, N, L).
        clusters: A dictionary mapping each cluster ID to a list of firm indices.
        config: The study's configuration dictionary.
        output_dir: Directory to save the cluster sub-tensors and density logs.

    Returns:
        A tuple containing two lists of integers:
        - dense_cluster_ids (List[int]): The IDs of clusters classified as dense.
        - sparse_cluster_ids (List[int]): The IDs of clusters classified as sparse.

    Raises:
        ValueError: If the union of dense and sparse clusters does not match the
                    total number of clusters.
    """
    # --- Input Validation ---
    if not isinstance(mask_train, np.ndarray) or mask_train.ndim != 3:
        raise TypeError("Input 'mask_train' must be a 3D NumPy array.")
    if not isinstance(clusters, dict):
        raise TypeError("Input 'clusters' must be a dictionary.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # Get the density threshold from the configuration.
    density_threshold_tau = config["imputation_hparams"]["density_threshold_tau"]

    # Initialize lists to hold the results.
    dense_cluster_ids: List[int] = []
    sparse_cluster_ids: List[int] = []
    density_report: Dict[str, Dict[str, Any]] = {}

    # --- Steps 1 & 2: Compute density and partition for each cluster ---
    logging.info(f"Partitioning {len(clusters)} clusters with density threshold tau = {density_threshold_tau:.2f}")

    # Iterate through each cluster ID and its list of firm indices.
    for cluster_id, firm_indices in clusters.items():
        # Handle case of an empty cluster, though this should be caught in Task 11.
        if not firm_indices:
            logging.warning(f"Cluster {cluster_id} is empty. Skipping.")
            continue

        # --- Step 1: Compute observed-entry ratio ---
        # Use advanced integer indexing to slice the sub-mask for the current cluster.
        cluster_mask = mask_train[:, firm_indices, :]

        # The number of observed entries is the sum of the boolean sub-mask.
        observed_count = cluster_mask.sum()

        # The total number of possible entries is the size of the sub-mask.
        total_elements = cluster_mask.size

        # Calculate the density. Handle division by zero for safety.
        density = observed_count / total_elements if total_elements > 0 else 0.0

        # --- Step 2: Partition into dense and sparse sets ---
        # Classify the cluster based on the density threshold.
        if density >= density_threshold_tau:
            classification = "dense"
            dense_cluster_ids.append(cluster_id)
        else:
            classification = "sparse"
            sparse_cluster_ids.append(cluster_id)

        # Store results for the final report.
        density_report[f"cluster_{cluster_id}"] = {
            "density": density,
            "type": classification,
            "size": len(firm_indices),
            "observed_entries": int(observed_count),
            "total_entries": total_elements
        }

        # --- Step 3: Extract and save cluster sub-tensors ---
        # Slice the training tensor to get the sub-tensor for this cluster.
        cluster_tensor = X_train[:, firm_indices, :]

        # Save the sub-tensor and its corresponding mask for the next steps.
        tensor_path = os.path.join(output_dir, f"X_cluster_{cluster_id}.npz")
        np.savez_compressed(tensor_path, X=cluster_tensor)

        mask_path = os.path.join(output_dir, f"mask_cluster_{cluster_id}.npz")
        np.savez_compressed(mask_path, mask=cluster_mask)

    logging.info("Steps 1, 2, 3: Density calculation, partitioning, and sub-tensor saving complete.")

    # --- Final Verification and Logging ---
    # Verify that all clusters have been classified.
    num_clusters = len(clusters)
    if len(dense_cluster_ids) + len(sparse_cluster_ids) != num_clusters:
        raise ValueError("Error in partitioning: Not all clusters were classified.")

    # Sort the lists for deterministic output.
    dense_cluster_ids.sort()
    sparse_cluster_ids.sort()

    # Save the detailed density report to a JSON file.
    report_path = os.path.join(output_dir, "cluster_densities.json")
    with open(report_path, 'w') as f:
        json.dump(density_report, f, indent=4)

    logging.info(f"Cluster density report saved to {report_path}")
    logging.info(f"Found {len(dense_cluster_ids)} dense clusters: {dense_cluster_ids}")
    logging.info(f"Found {len(sparse_cluster_ids)} sparse clusters: {sparse_cluster_ids}")

    return dense_cluster_ids, sparse_cluster_ids


In [None]:
# Task 13: Dense Cluster CP Completion (Unpenalized)

# ==============================================================================
# Task 13: Dense Cluster CP Completion (Unpenalized)
# ==============================================================================

-----------------------------------------------------------------------
# Task 13, Step 2: ALS solver for masked CP decomposition (Helper Function)
# ------------------------------------------------------------------------------

# ------------------------------------------------------------------------------
# Helper Function for SVD Initialization
# ------------------------------------------------------------------------------
def _initialize_factors_svd(
    tensor: np.ndarray,
    rank: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Initializes factor matrices for CP decomposition using SVD of unfolded tensors.

    This method provides a robust, deterministic starting point for the ALS algorithm,
    often leading to faster convergence and better solutions than random initialization.

    Args:
        tensor: The 3D tensor to be decomposed, potentially with NaNs.
        rank: The target rank R for the decomposition.

    Returns:
        A tuple of three initialized factor matrices (U, V, W).
    """
    T, N, L = tensor.shape
    factors = []

    # Fill NaNs with zeros for the purpose of initialization.
    tensor_filled = np.nan_to_num(tensor, nan=0.0)

    # Mode 0 (Time)
    # Unfold the tensor into a T x (N*L) matrix.
    unfolded_0 = tensor_filled.reshape(T, -1)
    # Compute SVD and take the top R left singular vectors.
    U, _, _ = np.linalg.svd(unfolded_0, full_matrices=False)
    factors.append(U[:, :rank])

    # Mode 1 (Firms)
    # Unfold the tensor into an N x (T*L) matrix.
    unfolded_1 = tensor_filled.transpose(1, 0, 2).reshape(N, -1)
    U, _, _ = np.linalg.svd(unfolded_1, full_matrices=False)
    factors.append(U[:, :rank])

    # Mode 2 (Characteristics)
    # Unfold the tensor into an L x (T*N) matrix.
    unfolded_2 = tensor_filled.transpose(2, 0, 1).reshape(L, -1)
    U, _, _ = np.linalg.svd(unfolded_2, full_matrices=False)
    factors.append(U[:, :rank])

    return factors[0], factors[1], factors[2]


# ------------------------------------------------------------------------------
# Canonical ALS Solver (Replaces both previous versions)
# ------------------------------------------------------------------------------
def _solve_cp_als(
    tensor: np.ndarray,
    mask: np.ndarray,
    rank: int,
    lambda_reg: float,
    config: Dict[str, Any]
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Solves for the CP decomposition of a masked tensor using regularized ALS.

    This is the canonical, high-performance solver for the ACT-Tensor pipeline.
    It incorporates:
    1. Robust SVD-based initialization.
    2. An iterative Alternating Least Squares procedure.
    3. Correct implementation of the regularized normal equations for each update step.
    4. Dual convergence criteria (max iterations and relative error improvement).
    5. Optional factor column normalization for stability.

    Args:
        tensor: The 3D tensor to decompose, with NaNs for missing entries.
        mask: A boolean array of the same shape as `tensor`.
        rank: The target rank R for the CP decomposition.
        lambda_reg: The L2 regularization strength (lambda).
        config: The study's configuration dictionary.

    Returns:
        A tuple of three converged factor matrices (U, V, W).
    """
    # --- Initialization ---
    # Get ALS hyperparameters from the configuration.
    als_config = config["optimization_settings"]["cp_als"]
    cp_seed = config["reproducibility"]["seed_cp_als"]
    np.random.seed(cp_seed) # Seed for any stochasticity within ALS if any (none here)

    # Initialize factor matrices using the robust SVD-based method.
    U, V, W = _initialize_factors_svd(tensor, rank)

    # Pre-compute the regularization term (lambda * I).
    reg_term = lambda_reg * np.eye(rank)

    # --- ALS Iteration Loop ---
    reconstruction_error = []
    for sweep in range(als_config["max_sweeps"]):
        # --- Update U (Time factor) ---
        # This loop iterates through each time slice.
        for t in range(tensor.shape[0]):
            # Find observed (firm, char) indices in this time slice.
            observed_indices = np.where(mask[t, :, :])
            # If no data in this slice, skip the update.
            if len(observed_indices[0]) == 0: continue

            # Form the Khatri-Rao product matrix Z from the corresponding rows of V and W.
            Z = V[observed_indices[0], :] * W[observed_indices[1], :]
            # Get the vector of observed values y.
            y = tensor[t, observed_indices[0], observed_indices[1]]

            # Solve the regularized normal equations: (Z'Z + lambda*I)x = Z'y
            gram_matrix = Z.T @ Z + reg_term
            rhs_vector = Z.T @ y
            U[t, :] = np.linalg.solve(gram_matrix, rhs_vector)

        # --- Update V (Firm factor) ---
        # This loop iterates through each firm slice.
        for n in range(tensor.shape[1]):
            observed_indices = np.where(mask[:, n, :])
            if len(observed_indices[0]) == 0: continue

            Z = U[observed_indices[0], :] * W[observed_indices[1], :]
            y = tensor[observed_indices[0], n, observed_indices[1]]

            gram_matrix = Z.T @ Z + reg_term
            rhs_vector = Z.T @ y
            V[n, :] = np.linalg.solve(gram_matrix, rhs_vector)

        # --- Update W (Characteristic factor) ---
        # This loop iterates through each characteristic slice.
        for l in range(tensor.shape[2]):
            observed_indices = np.where(mask[:, :, l])
            if len(observed_indices[0]) == 0: continue

            Z = U[observed_indices[0], :] * V[observed_indices[1], :]
            y = tensor[observed_indices[0], observed_indices[1], l]

            gram_matrix = Z.T @ Z + reg_term
            rhs_vector = Z.T @ y
            W[l, :] = np.linalg.solve(gram_matrix, rhs_vector)

        # --- Normalization and Convergence Check ---
        # Normalize factor columns to unit norm to control scaling and improve stability.
        if als_config["column_normalization"]:
            for mat in [U, V, W]:
                norms = np.linalg.norm(mat, axis=0)
                # Avoid division by zero for columns that are all zero.
                non_zero_norms = norms > 1e-9
                mat[:, non_zero_norms] /= norms[non_zero_norms]

        # Calculate reconstruction error on observed entries to check for convergence.
        recon_tensor = np.einsum('tr,nr,lr->tnl', U, V, W)
        error = np.linalg.norm((tensor - recon_tensor)[mask])
        reconstruction_error.append(error)

        # Check for convergence based on relative improvement in the error.
        if sweep > 0 and reconstruction_error[-2] > 1e-9:
            rel_improvement = abs(reconstruction_error[-2] - error) / reconstruction_error[-2]
            if rel_improvement < als_config["tol_rel_improvement"]:
                logging.info(f"ALS converged after {sweep + 1} sweeps with relative improvement {rel_improvement:.2e}.")
                break

    if sweep == als_config["max_sweeps"] - 1:
        logging.warning(f"ALS reached max sweeps ({als_config['max_sweeps']}) without converging.")

    return U, V, W

# ------------------------------------------------------------------------------
# Task 13, Orchestrator Function
# ------------------------------------------------------------------------------
def complete_dense_clusters(
    dense_cluster_ids: List[int],
    config: Dict[str, Any],
    data_dir: str = "data_processed"
) -> None:
    """
    Orchestrates the unpenalized CP completion for all dense clusters.

    This function iterates through each 'dense' cluster, loads its data, and
    calls the canonical `_solve_cp_als` solver with zero regularization to
    perform the tensor completion.

    Args:
        dense_cluster_ids: A list of integer IDs for the dense clusters.
        config: The study's configuration dictionary.
        data_dir: Directory containing cluster data and for saving results.
    """
    if not dense_cluster_ids:
        logging.warning("No dense clusters to process. Skipping.")
        return

    rank = config["imputation_hparams"]["CP_rank_R"]

    logging.info(f"Starting CP completion for {len(dense_cluster_ids)} dense clusters...")
    for cluster_id in dense_cluster_ids:
        start_time = time.time()
        logging.info(f"--- Processing dense cluster {cluster_id} ---")

        # Load the sub-tensor and sub-mask for the current cluster.
        sub_tensor = np.load(os.path.join(data_dir, f"X_cluster_{cluster_id}.npz"))['X']
        sub_mask = np.load(os.path.join(data_dir, f"mask_cluster_{cluster_id}.npz"))['mask']

        # Call the new, unified ALS solver with lambda = 0.0 for unpenalized completion.
        U, V, W = _solve_cp_als(
            tensor=sub_tensor, mask=sub_mask, rank=rank, lambda_reg=0.0, config=config
        )

        # Reconstruct the full (dense) sub-tensor from the factor matrices.
        completed_sub_tensor = np.einsum('tr,nr,lr->tnl', U, V, W)

        # Save the completed sub-tensor.
        np.savez_compressed(os.path.join(data_dir, f"X_hat_cluster_{cluster_id}.npz"), X_hat=completed_sub_tensor)

        end_time = time.time()
        logging.info(f"Completed dense cluster {cluster_id}. Time: {end_time - start_time:.2f}s.")


In [None]:
# Task 14: Sparse Cluster Aggregation and Augmented CP Completion

# ==============================================================================
# Task 14: Sparse Cluster Aggregation and Augmented CP Completion
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 14, Orchestrator Function
# ------------------------------------------------------------------------------
def complete_sparse_clusters(
    sparse_cluster_ids: List[int],
    dense_cluster_ids: List[int],
    clusters: Dict[int, List[int]],
    X_train: np.ndarray,
    mask_train: np.ndarray,
    config: Dict[str, Any],
    data_dir: str = "data_processed"
) -> None:
    """
    Orchestrates the augmented CP completion for all sparse clusters.

    This function implements the "borrowing" strategy by combining each sparse
    cluster with all dense clusters, running the canonical regularized ALS solver,
    and slicing back the results.

    Args:
        sparse_cluster_ids: A list of integer IDs for the sparse clusters.
        dense_cluster_ids: A list of integer IDs for the dense clusters.
        clusters: A dictionary mapping each cluster ID to a list of firm indices.
        X_train: The full 3D training tensor.
        mask_train: The full boolean training mask.
        config: The study's configuration dictionary.
        data_dir: Directory for saving completed tensors.
    """
    if not sparse_cluster_ids:
        logging.warning("No sparse clusters to process. Skipping.")
        return

    rank = config["imputation_hparams"]["CP_rank_R"]
    lambda_reg = config["imputation_hparams"]["ridge_lambda"]

    # Use a set for efficient union of dense indices.
    all_dense_indices = set(idx for cid in dense_cluster_ids for idx in clusters[cid])

    logging.info(f"Starting augmented CP completion for {len(sparse_cluster_ids)} sparse clusters...")
    for cluster_id in sparse_cluster_ids:
        start_time = time.time()
        logging.info(f"--- Processing sparse cluster {cluster_id} ---")

        # --- Step 1: Form the aggregated tensor ---
        sparse_indices = clusters[cluster_id]

        # Use efficient set union and then sort for deterministic order.
        aggregated_indices = sorted(list(set(sparse_indices) | all_dense_indices))

        # Create mapping from original index to new aggregate index.
        agg_idx_map = {orig_idx: new_idx for new_idx, orig_idx in enumerate(aggregated_indices)}

        # Slice the full tensors to create the aggregated versions.
        X_agg = X_train[:, aggregated_indices, :]
        mask_agg = mask_train[:, aggregated_indices, :]

        # --- Step 2: Solve using the canonical ALS solver ---
        U, V_agg, W = _solve_cp_als(
            tensor=X_agg, mask=mask_agg, rank=rank, lambda_reg=lambda_reg, config=config
        )

        # --- Step 3: Slice back the completed sparse cluster ---
        completed_agg_tensor = np.einsum('tr,nr,lr->tnl', U, V_agg, W)

        # Get the positions of the sparse firms within the aggregated tensor.
        sparse_indices_in_agg = [agg_idx_map[orig_idx] for orig_idx in sparse_indices]

        # Extract the completed data for the sparse cluster.
        completed_sub_tensor = completed_agg_tensor[:, sparse_indices_in_agg, :]

        # Save the result.
        np.savez_compressed(os.path.join(data_dir, f"X_hat_cluster_{cluster_id}.npz"), X_hat=completed_sub_tensor)

        end_time = time.time()
        logging.info(f"Completed sparse cluster {cluster_id}. Time: {end_time - start_time:.2f}s.")


In [None]:
# Task 15: Global Assembly of Completed Cluster Sub-Tensors

# ==============================================================================
# Task 15: Global Assembly of Completed Cluster Sub-Tensors
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 15, Orchestrator Function
# ------------------------------------------------------------------------------
def assemble_completed_tensor(
    clusters: Dict[int, List[int]],
    tensor_shape: Tuple[int, int, int],
    mask_train: np.ndarray,
    X_train: np.ndarray,
    config: Dict[str, Any],
    data_dir: str = "data_processed"
) -> np.ndarray:
    """
    Assembles the globally completed tensor from individual cluster completions.

    This function reverses the "divide" step by taking the imputed sub-tensors for
    all clusters (both dense and sparse) and inserting them back into their
    original positions within a full-sized tensor. This produces the first
    version of the fully imputed data panel, before temporal smoothing.

    The process is as follows:
    1. Initialize a new, empty tensor of the full (T, N, L) shape.
    2. Iterate through all K clusters. For each cluster:
       a. Load its completed sub-tensor from disk.
       b. Retrieve the list of original firm indices belonging to this cluster.
       c. Use advanced indexing to place the sub-tensor into the correct slice
          of the global tensor.
    3. Verify that the assembled tensor is complete (contains no NaNs).
    4. Perform a sanity check by calculating the reconstruction error on the
       training set.
    5. Save the final assembled tensor to disk.

    Args:
        clusters: A dictionary mapping each cluster ID to a list of firm indices.
        tensor_shape: A tuple (T, N, L) representing the full tensor dimensions.
        mask_train: The boolean training mask of shape (T, N, L).
        X_train: The original training tensor with NaNs.
        config: The study's configuration dictionary.
        data_dir: The directory where completed sub-tensors are stored and
                  the final assembled tensor will be saved.

    Returns:
        The fully assembled (but not yet smoothed) 3D tensor of shape (T, N, L).

    Raises:
        FileNotFoundError: If a completed sub-tensor for any cluster is missing.
        ValueError: If the assembled tensor contains NaNs or if there is a
                    shape mismatch during assembly.
    """
    # --- Input Validation ---
    if not isinstance(clusters, dict):
        raise TypeError("Input 'clusters' must be a dictionary.")
    T, N, L = tensor_shape
    if not (isinstance(T, int) and isinstance(N, int) and isinstance(L, int)):
        raise TypeError("tensor_shape must be a tuple of three integers.")

    # --- Step 1: Initialize the global completed tensor ---
    # Allocate a new array of the full tensor shape, initialized with zeros.
    # The dtype is sourced from the config for consistency.
    float_dtype = config["reproducibility"]["float_dtype"]
    X_hat_assembled = np.zeros(tensor_shape, dtype=float_dtype)
    logging.info(f"Step 1/3: Initialized global tensor with shape {tensor_shape}.")

    # --- Step 2: Insert each cluster's completed sub-tensor ---
    num_clusters = len(clusters)
    logging.info(f"Assembling completed sub-tensors from {num_clusters} clusters...")

    # Iterate through all cluster IDs from 0 to K-1.
    for cluster_id in range(num_clusters):
        # Retrieve the list of original firm indices for this cluster.
        firm_indices = clusters.get(cluster_id)
        if firm_indices is None:
            logging.warning(f"Cluster ID {cluster_id} not found in clusters dictionary. Skipping.")
            continue
        if not firm_indices:
            logging.warning(f"Cluster {cluster_id} is empty. Skipping assembly.")
            continue

        # Load the completed sub-tensor for this cluster.
        sub_tensor_path = os.path.join(data_dir, f"X_hat_cluster_{cluster_id}.npz")
        try:
            completed_sub_tensor = np.load(sub_tensor_path)['X_hat']
        except FileNotFoundError:
            raise FileNotFoundError(f"Completed sub-tensor for cluster {cluster_id} not found at {sub_tensor_path}")

        # Verification: Check for shape consistency before assignment.
        if completed_sub_tensor.shape != (T, len(firm_indices), L):
            raise ValueError(
                f"Shape mismatch for cluster {cluster_id}. Expected "
                f"{(T, len(firm_indices), L)}, but found {completed_sub_tensor.shape}."
            )

        # Use advanced integer indexing to place the sub-tensor into the global tensor.
        # Formula: X_hat[:, I_k, :] = X_hat_k
        X_hat_assembled[:, firm_indices, :] = completed_sub_tensor

    logging.info("Step 2/3: All cluster sub-tensors have been assembled.")

    # --- Step 3: Verify completeness and save the global tensor ---
    # Critical verification: The final assembled tensor must not contain any NaNs.
    if np.isnan(X_hat_assembled).any():
        nan_count = np.isnan(X_hat_assembled).sum()
        raise ValueError(
            f"Assembly failed: The final tensor contains {nan_count} NaN values."
        )
    logging.info("Verification successful: Assembled tensor is fully dense (no NaNs).")

    # Sanity Check: Calculate the Root Mean Squared Error on the training set.
    # This error should be small, indicating the model fits the observed data well.
    # We use the training mask to select only the originally observed entries.
    train_errors = (X_train[mask_train] - X_hat_assembled[mask_train])
    train_rmse = np.sqrt(np.mean(train_errors**2))
    logging.info(f"Sanity Check: RMSE on training set (Omega_train): {train_rmse:.6f}")

    # Save the final assembled tensor to a compressed archive.
    output_path = os.path.join(data_dir, "X_hat_assembled.npz")
    np.savez_compressed(output_path, X_hat=X_hat_assembled)
    logging.info(f"Step 3/3: Assembled tensor checkpointed to {output_path}.")

    return X_hat_assembled


In [None]:
# Task 16: Temporal Smoothing via Centered Moving Average (CMA)

# ==============================================================================
# Task 16: Temporal Smoothing via Centered Moving Average (CMA)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 16, Orchestrator Function
# ------------------------------------------------------------------------------
def apply_cma_smoothing(
    X_hat_assembled: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> np.ndarray:
    """
    Applies a Centered Moving Average (CMA) smoother to the imputed tensor.

    This function is the first implementation of the temporal smoothing module from
    the ACT-Tensor framework. It applies a symmetric moving average filter along the
    time dimension (axis 0) for each firm-characteristic series in the tensor.
    This process is designed to filter out short-lived noise while preserving
    slow-moving fundamental trends.

    The process is as follows:
    1. Retrieves the CMA window length (delta) from the configuration and validates it.
    2. Uses the highly optimized `scipy.ndimage.uniform_filter1d` to apply the
       moving average across all N*L time series in a single vectorized operation.
       The boundary condition is handled to mimic a truncated window, as specified.
    3. Verifies the output by ensuring it is dense and by computing a smoothness
       diagnostic that confirms a reduction in series volatility.
    4. Saves the final smoothed tensor to a compressed NumPy archive.

    Args:
        X_hat_assembled: The fully assembled but unsmoothed 3D tensor of shape
                         (T, N, L) from Task 15.
        config: The study's configuration dictionary.
        output_dir: The directory to save the smoothed tensor and diagnostic logs.

    Returns:
        The smoothed 3D tensor `X_tilde_CMA` of shape (T, N, L).

    Raises:
        ValueError: If the CMA window length `delta` is not a positive, odd integer.
        TypeError: If the input is not a NumPy array.
    """
    # --- Input Validation ---
    if not isinstance(X_hat_assembled, np.ndarray) or X_hat_assembled.ndim != 3:
        raise TypeError("Input 'X_hat_assembled' must be a 3D NumPy array.")
    if np.isnan(X_hat_assembled).any():
        raise ValueError("Input 'X_hat_assembled' should not contain NaNs before smoothing.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Retrieve CMA parameters and define the smoothing window ---
    # Extract the CMA window length from the configuration.
    delta = config["smoothing_settings"]["CMA"]["delta"]

    # Validate that delta is a positive, odd integer for a symmetric CMA.
    if not (isinstance(delta, int) and delta > 0 and delta % 2 == 1):
        raise ValueError(
            f"CMA window length 'delta' must be a positive, odd integer, but got {delta}."
        )
    logging.info(f"Step 1/3: Applying CMA with window length delta = {delta}.")

    # --- Step 2: Apply CMA to each firm–characteristic series ---
    # Use scipy's uniform_filter1d for a highly efficient, vectorized implementation.
    # This applies a 1D moving average filter along the specified axis (axis=0 for time).
    # `mode='nearest'` handles boundaries by extending the array with the nearest
    # value, which is arithmetically equivalent to truncating the window at the edges.
    X_tilde_CMA = uniform_filter1d(
        input=X_hat_assembled,
        size=delta,
        axis=0,
        mode='nearest'
    )
    logging.info("Step 2/3: CMA filter applied to all time series.")

    # --- Step 3: Verify smoothness and save the CMA-smoothed tensor ---
    # Verification 1: The output tensor must be fully dense (no NaNs).
    if np.isnan(X_tilde_CMA).any():
        raise RuntimeError("CMA smoothing resulted in a tensor with NaN values.")

    # Verification 2: Compute a smoothness diagnostic.
    # Calculate the mean absolute month-to-month change before smoothing.
    delta_before = np.mean(np.abs(np.diff(X_hat_assembled, axis=0)))

    # Calculate the mean absolute month-to-month change after smoothing.
    delta_after = np.mean(np.abs(np.diff(X_tilde_CMA, axis=0)))

    # The 'after' value must be smaller, confirming that volatility was reduced.
    if delta_after >= delta_before:
        logging.warning(
            f"Smoothing did not reduce series volatility as expected. "
            f"Mean abs change before: {delta_before:.6f}, after: {delta_after:.6f}"
        )
    else:
        logging.info(
            f"Smoothness diagnostic passed. Mean abs change reduced from "
            f"{delta_before:.6f} to {delta_after:.6f}."
        )

    # Save the final smoothed tensor to a compressed archive.
    output_path = os.path.join(output_dir, "X_tilde_CMA.npz")
    np.savez_compressed(output_path, X_tilde=X_tilde_CMA)
    logging.info(f"Step 3/3: Smoothed tensor verified and saved to {output_path}.")

    return X_tilde_CMA


In [None]:
# Task 17: Temporal Smoothing via Exponential Moving Average (EMA)

# ==============================================================================
# Task 17: Temporal Smoothing via Exponential Moving Average (EMA)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 17, Orchestrator Function
# ------------------------------------------------------------------------------
def apply_ema_smoothing(
    X_hat_assembled: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> np.ndarray:
    """
    Applies an Exponential Moving Average (EMA) smoother to the imputed tensor.

    This function implements the EMA smoother, a one-sided, recursive filter that
    gives exponentially decaying weights to past observations. It is more responsive
    to recent data than a centered moving average. The implementation uses the
    efficient `scipy.signal.lfilter` function, which is designed for applying
    such IIR filters along an axis of a NumPy array.

    The process is as follows:
    1. Retrieves the EMA smoothing factor (theta) from the configuration and validates it.
    2. Defines the filter coefficients corresponding to the EMA recursion formula.
    3. Correctly initializes the filter's state to match the paper's prescription
       that the first smoothed value equals the first observed value.
    4. Applies the filter along the time axis (axis=0) for all N*L series.
    5. Verifies the output and saves the final smoothed tensor.

    Args:
        X_hat_assembled: The fully assembled but unsmoothed 3D tensor of shape
                         (T, N, L) from Task 15.
        config: The study's configuration dictionary.
        output_dir: The directory to save the smoothed tensor.

    Returns:
        The smoothed 3D tensor `X_tilde_EMA` of shape (T, N, L).

    Raises:
        ValueError: If the EMA smoothing factor `theta` is not in the interval (0, 1).
        TypeError: If the input is not a NumPy array.
    """
    # --- Input Validation ---
    # Verify that the input is a valid, dense NumPy array.
    if not isinstance(X_hat_assembled, np.ndarray) or X_hat_assembled.ndim != 3:
        raise TypeError("Input 'X_hat_assembled' must be a 3D NumPy array.")
    if np.isnan(X_hat_assembled).any():
        raise ValueError("Input 'X_hat_assembled' should not contain NaNs before smoothing.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Retrieve EMA parameters and define filter ---
    # Extract the EMA smoothing factor from the configuration.
    theta = config["smoothing_settings"]["EMA"]["theta"]

    # Validate that theta is a float in the valid interval (0, 1).
    if not (isinstance(theta, float) and 0.0 < theta < 1.0):
        raise ValueError(
            f"EMA smoothing factor 'theta' must be a float in (0, 1), but got {theta}."
        )
    logging.info(f"Step 1/3: Applying EMA with smoothing factor theta = {theta}.")

    # The EMA recursion is: y[t] = theta*x[t] + (1-theta)*y[t-1]
    # This is an IIR filter. In scipy's lfilter form, y[t] is on the LHS and
    # x[t] is on the RHS. The equation is: y[t] - (1-theta)*y[t-1] = theta*x[t].
    # This corresponds to filter coefficients:
    # a = [1, -(1-theta)]  (feedback coefficients for y)
    # b = [theta]           (feedforward coefficients for x)
    b = np.array([theta])
    a = np.array([1, -(1 - theta)])

    # --- Step 2: Apply EMA recursion to each firm–characteristic series ---
    # To initialize the filter such that y[0] = x[0], we must set the initial
    # state of the filter's internal memory (`zi`). `lfiltic` computes this state.
    # The initial state zi is such that if the input is x[0] repeated, the output is y[0]=x[0].
    # We need one initial state for each of the N*L time series.
    # The first observation for all series is the first time slice of the tensor.
    x0 = X_hat_assembled[0, :, :]
    # `lfiltic` computes the initial state `zi` for a step response.
    # We scale it by x0 to match the initial condition y[0]=x[0].
    zi = lfiltic(b, a, y=[]) * x0[np.newaxis, :, :]

    # Apply the linear filter `lfilter` along the time axis (axis=0).
    # The function takes the coefficients `b` and `a`, the input tensor, the axis,
    # and the initial state `zi`. It returns the filtered output.
    X_tilde_EMA, _ = lfilter(b, a, X_hat_assembled, axis=0, zi=zi)
    logging.info("Step 2/3: EMA filter applied to all time series.")

    # --- Step 3: Verify and save the EMA-smoothed tensor ---
    # Verification 1: The output tensor must be fully dense.
    if np.isnan(X_tilde_EMA).any():
        raise RuntimeError("EMA smoothing resulted in a tensor with NaN values.")

    # Verification 2: Compute the smoothness diagnostic.
    # Calculate the mean absolute month-to-month change before smoothing.
    delta_before = np.mean(np.abs(np.diff(X_hat_assembled, axis=0)))
    # Calculate the mean absolute month-to-month change after smoothing.
    delta_after = np.mean(np.abs(np.diff(X_tilde_EMA, axis=0)))

    # The 'after' value should be smaller, confirming a reduction in volatility.
    if delta_after >= delta_before:
        logging.warning(
            f"Smoothing did not reduce series volatility as expected. "
            f"Mean abs change before: {delta_before:.6f}, after: {delta_after:.6f}"
        )
    else:
        logging.info(
            f"Smoothness diagnostic passed. Mean abs change reduced from "
            f"{delta_before:.6f} to {delta_after:.6f}."
        )

    # Save the final smoothed tensor to a compressed archive.
    output_path = os.path.join(output_dir, "X_tilde_EMA.npz")
    np.savez_compressed(output_path, X_tilde=X_tilde_EMA)
    logging.info(f"Step 3/3: Smoothed tensor verified and saved to {output_path}.")

    return X_tilde_EMA


In [None]:
# Task 18: Temporal Smoothing via Kalman Filter and Rauch–Tung–Striebel Smoother

# ==============================================================================
# Task 18: Temporal Smoothing via Kalman Filter and RTS Smoother
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 18, Step 2: Vectorized Kalman Filter & Smoother (Helper Function)
# ------------------------------------------------------------------------------
def _kalman_smoother_pass(
    observations: np.ndarray,
    h: float,
    r: float
) -> Tuple[np.ndarray, float]:
    """
    Performs a full Kalman filter and RTS smoother pass and computes the log-likelihood.

    This function implements the forward (filter) and backward (smoother) passes
    for a local level (random walk) state-space model. The entire operation
    is vectorized to run efficiently on all N*L time series simultaneously.

    Crucially, it also calculates the exact log-likelihood of the observations
    given the model parameters (h, r) using the prediction error decomposition from
    the forward pass. This is the statistically principled objective function for
    hyperparameter tuning.

    Args:
        observations: The (T, N, L) tensor of imputed values (x_hat).
        h: The process noise variance (state transition variance).
        r: The measurement noise variance.

    Returns:
        A tuple containing:
        - y_smooth (np.ndarray): A (T, N, L) tensor of smoothed state estimates.
        - total_log_likelihood (float): The sum of log-likelihoods over all
          series and time steps.
    """
    # Get tensor dimensions.
    T, N, L = observations.shape

    # --- Kalman Filter (Forward Pass) & Log-Likelihood Calculation ---
    # Initialize arrays to store filtered states and their variances.
    y_filt = np.zeros_like(observations)
    P_filt = np.zeros_like(observations)
    # Store predicted variances for the backward pass.
    P_pred = np.zeros_like(observations)
    # Initialize total log-likelihood.
    total_log_likelihood = 0.0

    # Initialization of the filter at t=0.
    # The initial state estimate is the first observation.
    y_filt[0] = observations[0]
    # The initial variance of the state estimate is the measurement variance.
    P_filt[0] = r

    # Iterate from t=1 to T-1.
    for t in range(1, T):
        # --- Predict Step ---
        # State prediction: y_t|t-1 = y_t-1|t-1
        y_pred_t = y_filt[t - 1]
        # Variance prediction: P_t|t-1 = P_t-1|t-1 + h
        P_pred_t = P_filt[t - 1] + h
        P_pred[t] = P_pred_t  # Store for the smoother pass

        # --- Log-Likelihood Calculation ---
        # Prediction error (innovation): v_t = x_t - y_t|t-1
        prediction_error = observations[t] - y_pred_t
        # Prediction error variance: F_t = P_t|t-1 + r
        prediction_error_var = P_pred_t + r

        # Log-likelihood increment for time t (summed over N*L series).
        # Formula: log L_t = -0.5 * [log(2*pi) + log(F_t) + v_t^2 / F_t]
        log_likelihood_increment = -0.5 * (
            np.log(2 * np.pi) + np.log(prediction_error_var) +
            (prediction_error**2) / prediction_error_var
        )
        total_log_likelihood += np.sum(log_likelihood_increment)

        # --- Update Step ---
        # Kalman Gain: K_t = P_t|t-1 / F_t
        kalman_gain = P_pred_t / prediction_error_var
        # State update: y_t|t = y_t|t-1 + K_t * v_t
        y_filt[t] = y_pred_t + kalman_gain * prediction_error
        # Variance update: P_t|t = (1 - K_t) * P_t|t-1
        P_filt[t] = (1 - kalman_gain) * P_pred_t

    # --- RTS Smoother (Backward Pass) ---
    # Initialize arrays for smoothed states.
    y_smooth = np.zeros_like(observations)
    # The final smoothed state is the final filtered state.
    y_smooth[-1] = y_filt[-1]

    # Iterate backwards from t=T-2 down to 0.
    for t in range(T - 2, -1, -1):
        # Smoother Gain: J_t = P_t|t / P_t+1|t
        # Add a small epsilon to the denominator for numerical stability.
        smoother_gain = P_filt[t] / (P_pred[t + 1] + 1e-9)
        # Smoothed state update: y_t|T = y_t|t + J_t * (y_t+1|T - y_t+1|t)
        # Note: y_{t+1|t} is the prediction for t+1, which is y_filt[t].
        y_smooth[t] = y_filt[t] + smoother_gain * (y_smooth[t + 1] - y_filt[t])

    # Return both the smoothed series and the total log-likelihood.
    return y_smooth, total_log_likelihood


# ------------------------------------------------------------------------------
# Task 18, Orchestrator Function
# ------------------------------------------------------------------------------
def apply_kf_smoothing(
    X_hat_assembled: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> np.ndarray:
    """
    Applies a Kalman Filter/Smoother with MLE-based hyperparameter tuning.

    This function implements the most statistically rigorous temporal smoother. It
    treats each time series as a noisy observation of a latent random walk. It
    performs a grid search over the process (h) and measurement (r) noise
    variances, selecting the pair that **maximizes the log-likelihood** of the
    data. It then applies the smoother with these optimal parameters.

    Args:
        X_hat_assembled: The fully assembled, dense 3D tensor from Task 15.
        config: The study's configuration dictionary.
        output_dir: The directory to save the smoothed tensor and logs.

    Returns:
        The smoothed 3D tensor `X_tilde_KF` of shape (T, N, L).
    """
    # --- Input Validation ---
    if not isinstance(X_hat_assembled, np.ndarray) or X_hat_assembled.ndim != 3:
        raise TypeError("Input 'X_hat_assembled' must be a 3D NumPy array.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Set up State-Space Model and Hyperparameter Grid ---
    # Retrieve the hyperparameter grids from the configuration.
    kf_config = config["smoothing_settings"]["KF"]
    h_grid = kf_config["process_var_h_grid"]
    r_grid = kf_config["meas_var_r_grid"]

    # Initialize variables to track the best parameters found.
    best_h, best_r = None, None
    # We now maximize log-likelihood, so we initialize to negative infinity.
    max_log_likelihood = -np.inf

    logging.info(f"Step 1/3: Starting grid search for Kalman smoother via Maximum Likelihood.")

    # --- Step 2: Grid Search to Maximize Log-Likelihood ---
    # Iterate through all combinations of h and r.
    for h in h_grid:
        for r in r_grid:
            # Run a full smoother pass and get the total log-likelihood.
            _, log_likelihood = _kalman_smoother_pass(X_hat_assembled, h, r)

            # If this is the best log-likelihood so far, store the parameters.
            if log_likelihood > max_log_likelihood:
                max_log_likelihood = log_likelihood
                best_h, best_r = h, r

    logging.info(
        f"Step 2/3: Grid search complete. Optimal parameters via MLE: "
        f"h={best_h}, r={best_r} (Max Log-Likelihood={max_log_likelihood:.2f})."
    )

    # --- Step 3: Apply the Selected Smoother and Save ---
    # Run the smoother one final time with the optimal hyperparameters.
    X_tilde_KF, _ = _kalman_smoother_pass(X_hat_assembled, best_h, best_r)

    # Verification 1: Check for NaNs in the final output.
    if np.isnan(X_tilde_KF).any():
        raise RuntimeError("Kalman smoothing resulted in a tensor with NaN values.")

    # Verification 2: Compute smoothness diagnostic.
    delta_before = np.mean(np.abs(np.diff(X_hat_assembled, axis=0)))
    delta_after = np.mean(np.abs(np.diff(X_tilde_KF, axis=0)))
    logging.info(
        f"Smoothness diagnostic: Mean abs change reduced from "
        f"{delta_before:.6f} to {delta_after:.6f}."
    )

    # Save the final smoothed tensor.
    output_path = os.path.join(output_dir, "X_tilde_KF.npz")
    np.savez_compressed(output_path, X_tilde=X_tilde_KF)

    # Log the selected hyperparameters and the maximized log-likelihood.
    hyperparam_log_path = os.path.join(output_dir, "kf_hyperparameters.json")
    with open(hyperparam_log_path, 'w') as f:
        json.dump(
            {"best_h": best_h, "best_r": best_r, "max_log_likelihood": max_log_likelihood},
            f, indent=4
        )

    logging.info(f"Step 3/3: Final smoothed tensor and optimal hyperparameters saved.")

    # Return the final smoothed tensor.
    return X_tilde_KF


In [None]:
# Task 19: Imputation Accuracy Evaluation on Held-Out Masks

# ==============================================================================
# Task 19: Imputation Accuracy Evaluation on Held-Out Masks
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 19, Orchestrator Function
# ------------------------------------------------------------------------------
def evaluate_imputation_accuracy(
    tensor_true: np.ndarray,
    tensor_imputed: np.ndarray,
    mask_eval: np.ndarray
) -> Dict[str, float]:
    """
    Computes imputation accuracy metrics on a held-out evaluation set.

    This function quantifies the statistical accuracy of an imputation method by
    comparing the imputed values against the known ground-truth values on a
    predefined test set. It calculates four standard metrics: Root Mean Squared
    Error (RMSE), Mean Absolute Error (MAE), Mean Absolute Percentage Error (MAPE),
    and the coefficient of determination (R-squared).

    Args:
        tensor_true: The original, ground-truth 3D tensor, including all
                     originally observed values.
        tensor_imputed: The 3D tensor after imputation and smoothing, fully dense.
        mask_eval: A boolean 3D tensor of the same shape, where `True` indicates
                   the entries to be used for evaluation.

    Returns:
        A dictionary containing the four computed accuracy metrics:
        'RMSE_imp', 'MAE_imp', 'MAPE_imp', 'R2_imp'.

    Raises:
        ValueError: If the input arrays have mismatched shapes or if the
                    evaluation mask is empty.
        TypeError: If inputs are not NumPy arrays.
    """
    # --- Input Validation ---
    # Verify that all inputs are NumPy arrays.
    if not all(isinstance(arr, np.ndarray) for arr in [tensor_true, tensor_imputed, mask_eval]):
        raise TypeError("All inputs must be NumPy arrays.")

    # Verify that the shapes of all input arrays are identical.
    if not (tensor_true.shape == tensor_imputed.shape == mask_eval.shape):
        raise ValueError("All input arrays must have the same shape.")

    # Verify that the evaluation mask is boolean.
    if mask_eval.dtype != bool:
        raise TypeError("'mask_eval' must be a boolean array.")

    # --- Step 1: Extract the held-out ground truth and imputed values ---
    # Use the boolean evaluation mask to select the relevant values from the tensors.
    # This creates 1D arrays of the ground-truth and imputed values for comparison.
    x_true = tensor_true[mask_eval]
    x_imputed = tensor_imputed[mask_eval]

    # Check if the evaluation set is empty.
    if x_true.size == 0:
        logging.warning("Evaluation mask is empty. No metrics can be computed.")
        return {
            'RMSE_imp': np.nan,
            'MAE_imp': np.nan,
            'MAPE_imp': np.nan,
            'R2_imp': np.nan
        }

    logging.info(f"Step 1/2: Extracted {x_true.size} data points for evaluation.")

    # --- Step 2: Compute the four imputation metrics ---
    # Calculate the difference between true and imputed values.
    errors = x_true - x_imputed

    # Metric 1: Root Mean Squared Error (RMSE)
    # Formula: RMSE_imp = sqrt(mean((x - x_hat)^2))
    rmse_imp = np.sqrt(np.mean(errors**2))

    # Metric 2: Mean Absolute Error (MAE)
    # Formula: MAE_imp = mean(|x - x_hat|)
    mae_imp = np.mean(np.abs(errors))

    # Metric 3: Mean Absolute Percentage Error (MAPE)
    # This requires special handling to avoid division by zero.
    # We create a mask to exclude true values that are very close to zero.
    mape_mask = np.abs(x_true) > 1e-6
    if np.any(mape_mask):
        # Formula: MAPE_imp = mean(|(x - x_hat) / x|)
        mape_imp = np.mean(np.abs(errors[mape_mask] / x_true[mape_mask]))
    else:
        # If all true values are zero, MAPE is undefined.
        mape_imp = np.nan

    # Metric 4: R-squared (Coefficient of Determination)
    # Formula: R2_imp = 1 - (sum((x - x_hat)^2) / sum((x - mean(x))^2))
    sum_sq_errors = np.sum(errors**2)
    # Calculate the total sum of squares relative to the mean of the true values.
    sum_sq_total = np.sum((x_true - np.mean(x_true))**2)

    # Avoid division by zero if the true data has no variance.
    if sum_sq_total > 1e-9:
        r2_imp = 1 - (sum_sq_errors / sum_sq_total)
    else:
        # If total variance is zero, R-squared is undefined or can be set to 1 if errors are also zero.
        r2_imp = 1.0 if np.isclose(sum_sq_errors, 0) else np.nan

    logging.info("Step 2/2: All four imputation metrics computed.")

    # --- Finalization: Compile and return results ---
    # Store the computed metrics in a dictionary.
    results = {
        'RMSE_imp': rmse_imp,
        'MAE_imp': mae_imp,
        'MAPE_imp': mape_imp,
        'R2_imp': r2_imp
    }

    return results


In [None]:
# Task 20: Sparse-Cluster Stress-Test Imputation Metrics

# ==============================================================================
# Task 20: Sparse-Cluster Stress-Test Imputation Metrics
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 20, Orchestrator Function
# ------------------------------------------------------------------------------
def evaluate_sparse_cluster_accuracy(
    tensor_true: np.ndarray,
    tensor_imputed: np.ndarray,
    mask_eval: np.ndarray,
    clusters: Dict[int, List[int]],
    sparse_cluster_ids: List[int]
) -> Dict[str, float]:
    """
    Evaluates imputation accuracy specifically on the held-out entries of sparse clusters.

    This function provides a stress test for the imputation model by focusing the
    evaluation on the most challenging subset of the data: firms with the least
    amount of observed data. It isolates the performance of the model's
    "borrowing" mechanism.

    The process is as follows:
    1. Identifies all firm indices that belong to clusters classified as 'sparse'.
    2. Creates a new evaluation mask that is a subset of the main evaluation
       mask, containing only the entries corresponding to these sparse firms.
    3. Calls the canonical `evaluate_imputation_accuracy` function (from Task 19)
       using this new, more restrictive mask.
    4. Returns the computed metrics, providing a focused view on performance
       under extreme data sparsity.

    Args:
        tensor_true: The original, ground-truth 3D tensor.
        tensor_imputed: The 3D tensor after imputation and smoothing.
        mask_eval: The main boolean evaluation mask for the entire test set.
        clusters: A dictionary mapping each cluster ID to a list of firm indices.
        sparse_cluster_ids: A list of integer IDs for the sparse clusters.

    Returns:
        A dictionary containing the four accuracy metrics computed only on the
        sparse cluster subset. The keys are suffixed with '_sparse'.

    Raises:
        ValueError: If input shapes are inconsistent or if sparse cluster IDs
                    are invalid.
        TypeError: If inputs are not of the expected types.
    """
    # --- Input Validation ---
    # Basic type and shape checks.
    if not all(isinstance(arr, np.ndarray) for arr in [tensor_true, tensor_imputed, mask_eval]):
        raise TypeError("All tensor/mask inputs must be NumPy arrays.")
    if not (tensor_true.shape == tensor_imputed.shape == mask_eval.shape):
        raise ValueError("All input arrays must have the same shape.")
    if not isinstance(clusters, dict) or not isinstance(sparse_cluster_ids, list):
        raise TypeError("'clusters' must be a dict and 'sparse_cluster_ids' a list.")

    # --- Step 1: Identify held-out entries within sparse clusters ---
    # Get the total number of firms from the tensor shape.
    N = tensor_true.shape[1]

    # Aggregate all firm indices from the list of sparse clusters.
    # Using a set is efficient for collecting unique indices.
    all_sparse_indices = set()
    for cid in sparse_cluster_ids:
        if cid not in clusters:
            raise ValueError(f"Sparse cluster ID {cid} not found in clusters dictionary.")
        all_sparse_indices.update(clusters[cid])

    # Convert the set of sparse firm indices to a sorted list for deterministic indexing.
    all_sparse_indices_list = sorted(list(all_sparse_indices))

    # Create a boolean vector of length N, where True indicates a sparse firm.
    is_sparse_firm = np.zeros(N, dtype=bool)
    if all_sparse_indices_list: # Avoid error if list is empty
        is_sparse_firm[all_sparse_indices_list] = True

    # Create the sparse evaluation mask using broadcasting.
    # A (t, n, l) entry is in the sparse test set if:
    # 1. It was in the original evaluation set (mask_eval[t, n, l] is True)
    # 2. The firm `n` belongs to a sparse cluster (is_sparse_firm[n] is True)
    # The broadcasting `[np.newaxis, :, np.newaxis]` aligns the (N,) firm vector
    # with the (T, N, L) mask for the bitwise AND operation.
    mask_eval_sparse = mask_eval & is_sparse_firm[np.newaxis, :, np.newaxis]

    num_sparse_eval_points = mask_eval_sparse.sum()
    num_total_eval_points = mask_eval.sum()

    logging.info(
        f"Step 1/2: Created sparse evaluation mask. It contains {num_sparse_eval_points} "
        f"points ({num_sparse_eval_points / num_total_eval_points:.2%} of the total test set)."
    )

    # --- Step 2: Compute imputation metrics on the sparse subset ---
    # Reuse the canonical evaluation function from Task 19. This is a critical
    # best practice to ensure metrics are computed consistently.
    sparse_metrics = evaluate_imputation_accuracy(
        tensor_true=tensor_true,
        tensor_imputed=tensor_imputed,
        mask_eval=mask_eval_sparse
    )
    logging.info("Step 2/2: Computed imputation metrics on the sparse subset.")

    # --- Step 3: Compile and return the sparse-cluster metrics ---
    # Rename the keys to clearly indicate they are for the sparse stress test.
    results_sparse = {f"{key}_sparse": value for key, value in sparse_metrics.items()}

    return results_sparse


In [None]:
# Task 21: Implement Baseline Imputation Methods

# ==============================================================================
# Task 21: Implement Baseline Imputation Methods
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 21, Step 1: Cross-sectional Median Imputation (Helper Function)
# ------------------------------------------------------------------------------
def _impute_cross_sectional_median(
    X_train: np.ndarray,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int]
) -> np.ndarray:
    """
    Imputes missing values using the cross-sectional median for each time period.

    This function implements a simple but common baseline. For each characteristic
    at each point in time, it calculates the median of all observed values across
    firms and uses this median to fill any missing entries for that characteristic
    at that time.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) with NaNs.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.

    Returns:
        A fully imputed (dense) NumPy array of shape (T, N, L).
    """
    # Log the start of the process.
    logging.info("--- Starting Cross-Sectional Median Imputation ---")

    # Get the dimensions of the input tensor for reshaping.
    T, N, L = X_train.shape

    # Create a DataFrame representation for easier groupby operations.
    # The tensor is reshaped to 2D and given a MultiIndex for panel operations.
    df = pd.DataFrame(
        X_train.reshape(T * N, L),
        index=pd.MultiIndex.from_product(
            [time_to_idx.keys(), firm_to_idx.keys()], names=['date', 'permno']
        )
    )

    # Group the DataFrame by date (the first level of the index).
    # For each group (i.e., for each month), apply a transformation.
    # The lambda function fills NaN values in each column with that column's median for the group.
    df_imputed = df.groupby(level='date').transform(lambda x: x.fillna(x.median()))

    # As a fallback, if an entire characteristic column for a given month is NaN,
    # its median will also be NaN. We fill any such remaining NaNs with 0.0,
    # which is a neutral value for the rank-normalized data.
    df_imputed.fillna(0.0, inplace=True)

    # Reshape the now-dense DataFrame back to the original (T, N, L) tensor format.
    X_hat_median = df_imputed.to_numpy().reshape(T, N, L)

    # Log the completion of the process.
    logging.info("Cross-Sectional Median Imputation complete.")

    # Return the imputed tensor.
    return X_hat_median


# ------------------------------------------------------------------------------
# Task 21, Step 2: Global Bidirectional Fill + XS Regression (Helper Function)
# ------------------------------------------------------------------------------
def _impute_global_bf_xs(
    X_train: np.ndarray,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int]
) -> np.ndarray:
    """
    Imputes missing values using a two-stage global method.

    Stage 1: Bidirectional temporal fill (forward then backward) for each
             firm's time series.
    Stage 2: For any remaining missing values, a cross-sectional Ridge
             regression is fitted for each characteristic at each time period,
             using all other characteristics as predictors.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) with NaNs.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.

    Returns:
        A fully imputed (dense) NumPy array of shape (T, N, L).
    """
    # Log the start of the process.
    logging.info("--- Starting Global BF-XS Imputation ---")

    # Get tensor dimensions.
    T, N, L = X_train.shape

    # Create a DataFrame representation for panel operations.
    df = pd.DataFrame(
        X_train.reshape(T * N, L),
        index=pd.MultiIndex.from_product(
            [time_to_idx.keys(), firm_to_idx.keys()], names=['date', 'permno']
        )
    )

    # Stage 1: Bidirectional temporal fill.
    # Group by firm (permno) and apply forward-fill then backward-fill.
    df_filled = df.groupby(level='permno').transform(lambda x: x.ffill().bfill())

    # Stage 2: Cross-sectional ridge regression for any remaining NaNs.
    # Iterate through each time period.
    for date in time_to_idx.keys():
        # Extract the data slice for the current month.
        monthly_slice = df_filled.loc[date]

        # If there are no missing values in this slice, we can skip it.
        if monthly_slice.isnull().sum().sum() == 0:
            continue

        # Iterate through each characteristic (column).
        for l in range(L):
            # Check if the current characteristic has any missing values this month.
            if monthly_slice.iloc[:, l].isnull().any():
                # Isolate the target column (y) and predictor columns (X).
                col_l = monthly_slice.iloc[:, l]
                predictors = monthly_slice.drop(columns=monthly_slice.columns[l])

                # Split the data into a training set (where y is observed)
                # and a prediction set (where y is missing).
                is_observed = col_l.notna()
                X_train_reg = predictors[is_observed]
                y_train_reg = col_l[is_observed]
                X_predict_reg = predictors[~is_observed]

                # Skip if there's no data to train on or no values to predict.
                if y_train_reg.empty or X_predict_reg.empty:
                    continue

                # Handle potential NaNs in the predictor matrices by filling with the median.
                # This is crucial as regression models cannot handle missing inputs.
                train_medians = X_train_reg.median()
                X_train_reg = X_train_reg.fillna(train_medians)
                X_predict_reg = X_predict_reg.fillna(train_medians)

                # Initialize and fit a Ridge regression model.
                model = Ridge(alpha=1.0)
                model.fit(X_train_reg, y_train_reg)

                # Predict the missing values.
                predictions = model.predict(X_predict_reg)

                # Place the predictions back into the main DataFrame.
                df_filled.loc[(date, X_predict_reg.index), df.columns[l]] = predictions

    # Final fallback: If any NaNs still remain (e.g., a firm had all predictors missing),
    # fill them with the cross-sectional median.
    df_filled = df_filled.groupby(level='date').transform(lambda x: x.fillna(x.median()))
    # A final global fill with 0.0 for any completely empty time slices.
    df_filled.fillna(0.0, inplace=True)

    # Reshape back to the original tensor format.
    X_hat_bf_xs = df_filled.to_numpy().reshape(T, N, L)

    # Log completion.
    logging.info("Global BF-XS Imputation complete.")

    # Return the imputed tensor.
    return X_hat_bf_xs


# ------------------------------------------------------------------------------
# Task 21, Step 3: Local Backward Fill + Rolling XS Regression (Helper Function)
# ------------------------------------------------------------------------------
def _impute_local_b_xs(
    X_train: np.ndarray,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int]
) -> np.ndarray:
    """
    Imputes missing values using a two-stage local method.

    This is a computationally intensive baseline.
    Stage 1: Backward-only temporal fill for each firm's time series.
    Stage 2: For each month, a cross-sectional Ridge regression is fitted for
             each characteristic using data from a trailing window of months.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) with NaNs.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.

    Returns:
        A fully imputed (dense) NumPy array of shape (T, N, L).
    """
    # Log the start of the process.
    logging.info("--- Starting Local B-XS Imputation ---")

    # Get tensor dimensions.
    T, N, L = X_train.shape

    # Create a DataFrame representation.
    df = pd.DataFrame(
        X_train.reshape(T * N, L),
        index=pd.MultiIndex.from_product(
            [time_to_idx.keys(), firm_to_idx.keys()], names=['date', 'permno']
        )
    )

    # Stage 1: Backward-only temporal fill.
    df_filled = df.groupby(level='permno').transform(lambda x: x.bfill())

    # Stage 2: Rolling-window cross-sectional ridge regression.
    # A 24-month window is a standard choice in the literature for such tasks.
    window_size = 24
    dates = list(time_to_idx.keys())

    # Iterate through each time period to make predictions for that month.
    for t_idx, date in enumerate(dates):
        # Get the data slice for the current month.
        monthly_slice = df_filled.loc[date]

        # Skip if no NaNs to impute this month.
        if monthly_slice.isnull().sum().sum() == 0:
            continue

        # Define the start of the rolling window for training data.
        start_idx = max(0, t_idx - window_size + 1)
        window_dates = dates[start_idx : t_idx + 1]
        # Extract the full data block for the rolling window.
        rolling_data = df_filled.loc[window_dates]

        # Iterate through each characteristic to fit a model.
        for l in range(L):
            # Check if there are missing values to predict for this characteristic.
            if monthly_slice.iloc[:, l].isnull().any():
                # Isolate the target column (y) and predictors (X) within the window.
                col_l_rolling = rolling_data.iloc[:, l]
                predictors_rolling = rolling_data.drop(columns=rolling_data.columns[l])

                # Get the training data (observed y) from the entire window.
                is_obs_rolling = col_l_rolling.notna()
                X_train_reg = predictors_rolling[is_obs_rolling]
                y_train_reg = col_l_rolling[is_obs_rolling]

                # The data to predict on is only the missing entries in the current month.
                is_missing_current_month = monthly_slice.iloc[:, l].isnull()
                X_predict_reg = monthly_slice.drop(columns=monthly_slice.columns[l])[is_missing_current_month]

                # Skip if no training data or no values to predict.
                if y_train_reg.empty or X_predict_reg.empty:
                    continue

                # Handle NaNs in predictor matrices.
                train_medians = X_train_reg.median()
                X_train_reg = X_train_reg.fillna(train_medians)
                X_predict_reg = X_predict_reg.fillna(train_medians)

                # Fit the Ridge model on the rolling window data.
                model = Ridge(alpha=1.0)
                model.fit(X_train_reg, y_train_reg)

                # Predict the missing values for the current month.
                predictions = model.predict(X_predict_reg)

                # Place predictions back into the main DataFrame.
                df_filled.loc[(date, X_predict_reg.index), df.columns[l]] = predictions

    # Final fallback for any remaining NaNs.
    df_filled = df_filled.groupby(level='date').transform(lambda x: x.fillna(x.median()))
    df_filled.fillna(0.0, inplace=True)

    # Reshape back to tensor format.
    X_hat_b_xs = df_filled.to_numpy().reshape(T, N, L)

    # Log completion.
    logging.info("Local B-XS Imputation complete.")

    # Return the imputed tensor.
    return X_hat_b_xs


# ------------------------------------------------------------------------------
# Task 21, Orchestrator Function
# ------------------------------------------------------------------------------
def run_baseline_imputations(
    X_train: np.ndarray,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int],
    output_dir: str = "data_processed"
) -> Dict[str, np.ndarray]:
    """
    Orchestrates the imputation of missing data using several baseline methods.

    This function runs a suite of benchmark imputation models to provide a
    performance comparison for the main ACT-Tensor framework. Each baseline
    is implemented in a modular helper function and its output is saved to disk.

    The implemented baselines are:
    1. Cross-Sectional Median: Fills NaNs with the monthly median.
    2. Global BF-XS: Bidirectional fill followed by a global cross-sectional regression.
    3. Local B-XS: Backward fill followed by a rolling-window cross-sectional regression.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) with NaNs.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.
        output_dir: Directory to save the imputed tensors from each baseline.

    Returns:
        A dictionary where keys are the names of the baseline methods and
        values are the corresponding fully imputed (dense) tensors.
    """
    # --- Input Validation ---
    # Verify that the main input is a 3D NumPy array.
    if not isinstance(X_train, np.ndarray) or X_train.ndim != 3:
        raise TypeError("Input 'X_train' must be a 3D NumPy array.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    # Initialize a dictionary to hold the results.
    imputed_tensors = {}

    # --- Run Baseline 1: Cross-Sectional Median ---
    # Call the helper function to perform the imputation.
    X_hat_median = _impute_cross_sectional_median(X_train, time_to_idx, firm_to_idx)
    # Save the resulting tensor to a compressed file.
    np.savez_compressed(os.path.join(output_dir, "X_hat_Median.npz"), X_hat=X_hat_median)
    # Store the result in the output dictionary.
    imputed_tensors["Median"] = X_hat_median

    # --- Run Baseline 2: Global BF-XS ---
    # Call the helper function.
    X_hat_bf_xs = _impute_global_bf_xs(X_train, time_to_idx, firm_to_idx)
    # Save the result.
    np.savez_compressed(os.path.join(output_dir, "X_hat_BF_XS.npz"), X_hat=X_hat_bf_xs)
    # Store the result.
    imputed_tensors["BF-XS"] = X_hat_bf_xs

    # --- Run Baseline 3: Local B-XS ---
    # Call the helper function.
    X_hat_b_xs = _impute_local_b_xs(X_train, time_to_idx, firm_to_idx)
    # Save the result.
    np.savez_compressed(os.path.join(output_dir, "X_hat_B_XS.npz"), X_hat=X_hat_b_xs)
    # Store the result.
    imputed_tensors["B-XS"] = X_hat_b_xs

    # Log the completion of all baseline methods.
    logging.info("All baseline imputation methods have been executed and saved.")

    # Return the dictionary of imputed tensors.
    return imputed_tensors


In [None]:
# Task 22: Evaluate Baseline Imputation Methods on Held-Out Masks

# ==============================================================================
# Task 22: Evaluate Baseline Imputation Methods on Held-Out Masks
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 22, Orchestrator Function
# ------------------------------------------------------------------------------
def evaluate_baseline_methods(
    tensor_true: np.ndarray,
    regime: str,
    baseline_methods: List[str],
    data_dir: str = "data_processed"
) -> List[Dict[str, Any]]:
    """
    Evaluates the imputation accuracy of all baseline methods on a held-out mask.

    This function orchestrates the evaluation process for the benchmark models.
    For a given missingness regime, it loads the corresponding evaluation mask and
    the imputed tensors from each baseline method. It then systematically computes
    the standard set of four accuracy metrics for each baseline by calling the
    canonical `evaluate_imputation_accuracy` function.

    Args:
        tensor_true: The original, ground-truth 3D tensor.
        regime: The missingness regime to evaluate (e.g., 'MAR', 'Block', 'Logit').
                This determines which evaluation mask is used.
        baseline_methods: A list of baseline method names to evaluate,
                          e.g., ['Median', 'BF_XS', 'B_XS'].
        data_dir: The directory where the evaluation mask and imputed baseline
                  tensors are stored.

    Returns:
        A list of dictionaries, where each dictionary contains the evaluation
        results for one baseline method, suitable for creating a results table.

    Raises:
        FileNotFoundError: If the evaluation mask or any of the baseline
                           imputed tensors cannot be found.
        ValueError: If there is a shape mismatch between the loaded arrays.
    """
    # --- Input Validation ---
    # Verify that the main input is a 3D NumPy array.
    if not isinstance(tensor_true, np.ndarray) or tensor_true.ndim != 3:
        raise TypeError("Input 'tensor_true' must be a 3D NumPy array.")

    # --- Step 1: Load Evaluation Mask ---
    # Construct the path to the evaluation mask for the specified regime.
    eval_mask_path = os.path.join(data_dir, f"mask_eval_{regime}.npz")
    try:
        # Load the compressed numpy file containing the mask.
        mask_eval = np.load(eval_mask_path)['mask']
        logging.info(f"Successfully loaded evaluation mask for regime '{regime}'.")
    except FileNotFoundError:
        raise FileNotFoundError(f"Evaluation mask for regime '{regime}' not found at {eval_mask_path}")

    # Validate that the loaded mask has the correct shape.
    if mask_eval.shape != tensor_true.shape:
        raise ValueError(
            f"Shape of evaluation mask for regime '{regime}' ({mask_eval.shape}) "
            f"does not match ground-truth tensor shape ({tensor_true.shape})."
        )

    # Initialize a list to store the results for all baselines.
    all_results = []

    # --- Step 2 & 3: Compute Metrics for Each Baseline and Compile Results ---
    # Iterate through the list of specified baseline methods.
    for method_name in baseline_methods:
        logging.info(f"--- Evaluating baseline method: {method_name} ---")

        # Construct the path to the imputed tensor for the current baseline.
        imputed_tensor_path = os.path.join(data_dir, f"X_hat_{method_name}.npz")
        try:
            # Load the imputed tensor.
            tensor_imputed = np.load(imputed_tensor_path)['X_hat']
        except FileNotFoundError:
            raise FileNotFoundError(
                f"Imputed tensor for baseline '{method_name}' not found at {imputed_tensor_path}"
            )

        # Validate the shape of the loaded imputed tensor.
        if tensor_imputed.shape != tensor_true.shape:
            raise ValueError(
                f"Shape of imputed tensor for '{method_name}' ({tensor_imputed.shape}) "
                f"does not match ground-truth tensor shape ({tensor_true.shape})."
            )

        # Call the canonical evaluation function from Task 19 to compute metrics.
        # This ensures consistent metric calculation across all models.
        metrics = evaluate_imputation_accuracy(
            tensor_true=tensor_true,
            tensor_imputed=tensor_imputed,
            mask_eval=mask_eval
        )

        # Log the computed metrics for the current baseline.
        logging.info(f"Results for {method_name}: {metrics}")

        # Compile the results into a structured dictionary for this baseline.
        result_row = {
            "Method": method_name,
            "Regime": regime,
            **metrics  # Unpack the metrics dictionary
        }

        # Append the result to the master list.
        all_results.append(result_row)

    # Log the completion of the evaluation process.
    logging.info(f"Evaluation complete for all baseline methods in regime '{regime}'.")

    # Return the compiled list of results.
    return all_results


In [None]:
# Task 23: Ablation Study—CP Completion Only (No Clustering, No Smoothing)

# ==============================================================================
# Task 23: Ablation Study—CP Completion Only (No Clustering, No Smoothing)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 23, Orchestrator Function
# ------------------------------------------------------------------------------
def run_ablation_cp_only(
    X_train: np.ndarray,
    mask_train: np.ndarray,
    tensor_true: np.ndarray,
    mask_eval: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> Dict[str, float]:
    """
    Runs the "CP Completion Only" ablation study.

    This function serves as a baseline within the tensor framework. It performs
    a global CP decomposition on the entire training tensor without any prior
    clustering or subsequent temporal smoothing. The purpose is to isolate the
    performance of the core tensor factorization algorithm itself, allowing for
    a clear measurement of the incremental benefits of the clustering and
    smoothing modules.

    The process is as follows:
    1. Calls the canonical `_solve_cp_als` solver on the full training tensor
       with zero regularization.
    2. Reconstructs the completed tensor from the resulting factor matrices.
    3. Saves the imputed tensor as a distinct artifact for this ablation study.
    4. Calls the canonical `evaluate_imputation_accuracy` function to compute
       the performance metrics on the held-out evaluation set.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) with NaNs.
        mask_train: The boolean training mask of shape (T, N, L).
        tensor_true: The original, ground-truth 3D tensor.
        mask_eval: The boolean evaluation mask for the test set.
        config: The study's configuration dictionary.
        output_dir: Directory to save the imputed tensor from this ablation run.

    Returns:
        A dictionary containing the four computed accuracy metrics for this
        ablation condition.
    """
    # --- Input Validation ---
    # Verify that all tensor/mask inputs are valid NumPy arrays with matching shapes.
    if not all(isinstance(arr, np.ndarray) and arr.ndim == 3 for arr in [X_train, mask_train, tensor_true, mask_eval]):
        raise TypeError("All tensor/mask inputs must be 3D NumPy arrays.")
    if not (X_train.shape == mask_train.shape == tensor_true.shape == mask_eval.shape):
        raise ValueError("All input arrays must have the same shape.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    logging.info("--- Starting Ablation Study: CP Completion Only ---")
    start_time = time.time()

    # --- Step 1: Set up and run the global masked CP decomposition ---
    # Get the CP rank from the configuration.
    rank = config["imputation_hparams"]["CP_rank_R"]

    # Call the canonical ALS solver on the full training tensor.
    # For this baseline, regularization is turned off (lambda_reg = 0.0).
    logging.info(f"Running global CP decomposition with rank R={rank}...")
    U, V, W = _solve_cp_als(
        tensor=X_train,
        mask=mask_train,
        rank=rank,
        lambda_reg=0.0,  # No regularization for this baseline
        config=config
    )
    logging.info("Step 1/3: Global CP decomposition complete.")

    # --- Step 2: Reconstruct the completed tensor ---
    # Reconstruct the full, dense tensor from the converged factor matrices.
    # Formula: X_hat = [[U, V, W]]
    X_hat_cp_only = np.einsum('tr,nr,lr->tnl', U, V, W)

    # Save the imputed tensor as a specific artifact for this ablation study.
    output_path = os.path.join(output_dir, "X_hat_Ablation_CP_Only.npz")
    np.savez_compressed(output_path, X_hat=X_hat_cp_only)
    logging.info(f"Step 2/3: Reconstructed tensor saved to {output_path}.")

    # --- Step 3: Evaluate and log ablation metrics ---
    # Call the canonical evaluation function to compute accuracy metrics.
    # This ensures a fair comparison with all other methods.
    metrics = evaluate_imputation_accuracy(
        tensor_true=tensor_true,
        tensor_imputed=X_hat_cp_only,
        mask_eval=mask_eval
    )

    end_time = time.time()
    logging.info(f"Step 3/3: Evaluation complete. Time elapsed: {end_time - start_time:.2f} seconds.")
    logging.info(f"Results for CP-Only Ablation: {metrics}")

    # Return the computed metrics.
    return metrics


In [None]:
# Task 24: Ablation Study—CP Completion with Clustering (No Smoothing)

# ==============================================================================
# Task 24: Ablation Study—CP Completion with Clustering (No Smoothing)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 24, Orchestrator Function
# ------------------------------------------------------------------------------
def run_ablation_cp_with_clustering(
    X_train: np.ndarray,
    mask_train: np.ndarray,
    tensor_true: np.ndarray,
    mask_eval: np.ndarray,
    firm_to_idx: Dict[int, int],
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> Dict[str, float]:
    """
    Runs the "CP Completion with Clustering" ablation study.

    This function evaluates the performance of the core cluster-based imputation
    module of ACT-Tensor, deliberately omitting the final temporal smoothing step.
    It allows for the direct measurement of the value added by modeling firm
    heterogeneity through clustering.

    The process is a sequential execution of previously defined pipeline stages:
    1. Clusters firms based on their data profiles (`cluster_firms_by_profile`).
    2. Computes cluster densities and partitions them (`compute_densities_and_partition`).
    3. Imputes dense clusters using unpenalized CP-ALS (`complete_dense_clusters`).
    4. Imputes sparse clusters using the augmented CP-ALS method (`complete_sparse_clusters`).
    5. Assembles the completed sub-tensors into a single global tensor (`assemble_completed_tensor`).
    6. Evaluates the accuracy of this unsmoothed tensor against the ground truth.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) with NaNs.
        mask_train: The boolean training mask of shape (T, N, L).
        tensor_true: The original, ground-truth 3D tensor.
        mask_eval: The boolean evaluation mask for the test set.
        firm_to_idx: Mapping from permno to integer index.
        config: The study's configuration dictionary.
        output_dir: Directory to save intermediate and final artifacts.

    Returns:
        A dictionary containing the four computed accuracy metrics for this
        ablation condition.
    """
    # --- Input Validation ---
    # Ensure all tensor/mask inputs are valid NumPy arrays with matching shapes.
    if not all(isinstance(arr, np.ndarray) and arr.ndim == 3 for arr in [X_train, mask_train, tensor_true, mask_eval]):
        raise TypeError("All tensor/mask inputs must be 3D NumPy arrays.")
    if not (X_train.shape == mask_train.shape == tensor_true.shape == mask_eval.shape):
        raise ValueError("All input arrays must have the same shape.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    logging.info("--- Starting Ablation Study: CP Completion with Clustering ---")
    start_time = time.time()

    # --- Step 1: Reuse cluster assignments and perform cluster-wise CP completion ---
    # This sequence reuses the modular functions from the main pipeline.

    # 1a. Cluster firms.
    clusters = cluster_firms_by_profile(X_train, firm_to_idx, config, output_dir)

    # 1b. Partition clusters into dense and sparse.
    dense_ids, sparse_ids = compute_densities_and_partition(
        X_train, mask_train, clusters, config, output_dir
    )

    # 1c. Impute dense clusters.
    complete_dense_clusters(dense_ids, config, output_dir)

    # 1d. Impute sparse clusters.
    complete_sparse_clusters(
        sparse_ids, dense_ids, clusters, X_train, mask_train, config, output_dir
    )
    logging.info("Step 1/3: Cluster-wise CP completion finished.")

    # --- Step 2: Assemble the completed (unsmoothed) tensor ---
    # Reuse the canonical assembly function.
    X_hat_clustered = assemble_completed_tensor(
        clusters=clusters,
        tensor_shape=X_train.shape,
        mask_train=mask_train,
        X_train=X_train,
        config=config,
        data_dir=output_dir
    )

    # Save the assembled tensor as a specific artifact for this ablation study.
    output_path = os.path.join(output_dir, "X_hat_Ablation_CP_Clustering.npz")
    np.savez_compressed(output_path, X_hat=X_hat_clustered)
    logging.info(f"Step 2/3: Assembled (unsmoothed) tensor saved to {output_path}.")

    # --- Step 3: Evaluate and log ablation metrics ---
    # Call the canonical evaluation function on the unsmoothed, clustered result.
    metrics = evaluate_imputation_accuracy(
        tensor_true=tensor_true,
        tensor_imputed=X_hat_clustered,
        mask_eval=mask_eval
    )

    end_time = time.time()
    logging.info(f"Step 3/3: Evaluation complete. Time elapsed: {end_time - start_time:.2f} seconds.")
    logging.info(f"Results for CP-with-Clustering Ablation: {metrics}")

    # Return the computed metrics.
    return metrics


In [None]:
# Task 25: Ablation Study—CP Completion with Smoothing (No Clustering)

# ==============================================================================
# Task 25: Ablation Study—CP Completion with Smoothing (No Clustering)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 25, Orchestrator Function
# ------------------------------------------------------------------------------
def run_ablation_cp_with_smoothing(
    X_train: np.ndarray,
    mask_train: np.ndarray,
    tensor_true: np.ndarray,
    mask_eval: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> Dict[str, float]:
    """
    Runs the "CP Completion with Smoothing" ablation study.

    This function evaluates the performance of a simplified model that consists of
    a global CP decomposition followed by temporal smoothing, but without any
    firm clustering. Its purpose is to isolate the incremental performance gain
    attributable to the temporal smoothing module alone.

    The process is as follows:
    1. Runs (or loads the pre-computed result of) the "CP-only" ablation to get
       the globally imputed, unsmoothed tensor.
    2. Applies the canonical Centered Moving Average (CMA) smoother to this tensor.
    3. Saves the resulting smoothed tensor as a distinct artifact.
    4. Evaluates the accuracy of this smoothed tensor against the ground truth
       using the canonical evaluation function.

    Args:
        X_train: The 3D training tensor of shape (T, N, L) with NaNs.
        mask_train: The boolean training mask of shape (T, N, L).
        tensor_true: The original, ground-truth 3D tensor.
        mask_eval: The boolean evaluation mask for the test set.
        config: The study's configuration dictionary.
        output_dir: Directory to save the imputed tensor from this ablation run.

    Returns:
        A dictionary containing the four computed accuracy metrics for this
        ablation condition.
    """
    # --- Input Validation ---
    # Ensure all tensor/mask inputs are valid NumPy arrays with matching shapes.
    if not all(isinstance(arr, np.ndarray) and arr.ndim == 3 for arr in [X_train, mask_train, tensor_true, mask_eval]):
        raise TypeError("All tensor/mask inputs must be 3D NumPy arrays.")
    if not (X_train.shape == mask_train.shape == tensor_true.shape == mask_eval.shape):
        raise ValueError("All input arrays must have the same shape.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    logging.info("--- Starting Ablation Study: CP Completion with Smoothing ---")
    start_time = time.time()

    # --- Step 1: Obtain the globally imputed (unsmoothed) tensor ---
    # For efficiency, we first check if the result from the "CP-only" ablation
    # already exists. If not, we run it. This avoids re-computation.
    cp_only_path = os.path.join(output_dir, "X_hat_Ablation_CP_Only.npz")
    if not os.path.exists(cp_only_path):
        logging.info("CP-Only artifact not found. Running 'run_ablation_cp_only' first...")
        # This assumes the function from Task 23 is available and works correctly.
        run_ablation_cp_only(
            X_train, mask_train, tensor_true, mask_eval, config, output_dir
        )

    # Load the pre-computed globally imputed tensor.
    X_hat_cp_only = np.load(cp_only_path)['X_hat']
    logging.info(f"Step 1/3: Loaded globally imputed tensor from {cp_only_path}.")

    # --- Step 2: Apply CMA smoothing ---
    # Pass the globally imputed tensor to the canonical CMA smoothing function.
    # This reuses the exact same smoothing logic as the full ACT-Tensor model.
    X_tilde_cp_cma = apply_cma_smoothing(
        X_hat_assembled=X_hat_cp_only,
        config=config,
        output_dir=output_dir
    )

    # Save the result under a new name specific to this ablation study.
    output_path = os.path.join(output_dir, "X_hat_Ablation_CP_CMA.npz")
    np.savez_compressed(output_path, X_hat=X_tilde_cp_cma)
    logging.info(f"Step 2/3: Smoothed tensor saved to {output_path}.")

    # --- Step 3: Evaluate and log ablation metrics ---
    # Call the canonical evaluation function on the smoothed, unclustered result.
    metrics = evaluate_imputation_accuracy(
        tensor_true=tensor_true,
        tensor_imputed=X_tilde_cp_cma,
        mask_eval=mask_eval
    )

    end_time = time.time()
    logging.info(f"Step 3/3: Evaluation complete. Time elapsed: {end_time - start_time:.2f} seconds.")
    logging.info(f"Results for CP-with-Smoothing Ablation: {metrics}")

    # Return the computed metrics.
    return metrics


In [None]:
# Task 26: Compile Ablation Results Table (Panel C)

# ==============================================================================
# Task 26: Compile Ablation Results Table (Panel C)
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 26, Orchestrator Function
# ------------------------------------------------------------------------------
def compile_ablation_results(
    ablation_results: List[Dict[str, Any]],
    output_dir: str = "results"
) -> pd.DataFrame:
    """
    Compiles, verifies, and saves the results of the ablation studies.

    This function takes the metric dictionaries from the various ablation runs
    (CP-Only, CP+Clustering, CP+Smoothing) and the full ACT-Tensor model, and
    aggregates them into a single, publication-ready DataFrame. It performs
    sanity checks to verify the expected performance improvements from each
    module and saves the final table in multiple formats.

    Args:
        ablation_results: A list of dictionaries, where each dictionary contains
                          the results for one model configuration. Each dict
                          should have keys like 'Method', 'Regime', 'RMSE_imp', etc.
        output_dir: The directory to save the final results tables.

    Returns:
        A pandas DataFrame containing the compiled and formatted ablation results.

    Raises:
        ValueError: If the input list is empty or malformed.
    """
    # --- Input Validation ---
    # Verify that the input is a non-empty list of dictionaries.
    if not isinstance(ablation_results, list) or not ablation_results:
        raise ValueError("Input 'ablation_results' must be a non-empty list of dictionaries.")
    if not all(isinstance(item, dict) for item in ablation_results):
        raise TypeError("All items in 'ablation_results' must be dictionaries.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    logging.info("--- Compiling Ablation Study Results ---")

    # --- Step 1: Aggregate ablation metrics into a DataFrame ---
    # Convert the list of result dictionaries directly into a pandas DataFrame.
    results_df = pd.DataFrame(ablation_results)

    # Define the expected methods for the ablation study panel.
    expected_methods = [
        "CP Completion",
        "CP Completion w/ Clustering",
        "CP Completion w/ CMA",
        "ACT-Tensor w/ CMA"
    ]

    # Set 'Method' and 'Regime' as a MultiIndex for easier slicing and analysis.
    results_df.set_index(['Method', 'Regime'], inplace=True)
    results_df.sort_index(inplace=True)

    logging.info("Step 1/3: Aggregated ablation metrics into a DataFrame.")

    # --- Step 2: Verify incremental improvements ---
    # For each regime, check if adding modules improves performance (lower RMSE).
    for regime in results_df.index.get_level_values('Regime').unique():
        try:
            # Get RMSE for each model in the current regime.
            rmse_cp_only = results_df.loc[("CP Completion", regime), 'RMSE_imp']
            rmse_cp_clust = results_df.loc[("CP Completion w/ Clustering", regime), 'RMSE_imp']
            rmse_cp_cma = results_df.loc[("CP Completion w/ CMA", regime), 'RMSE_imp']
            rmse_act_tensor = results_df.loc[("ACT-Tensor w/ CMA", regime), 'RMSE_imp']

            # Verify that the full model is the best.
            # A small tolerance is added for floating point inconsistencies.
            assert rmse_act_tensor <= (rmse_cp_clust + 1e-9)
            assert rmse_act_tensor <= (rmse_cp_cma + 1e-9)

            # Verify that adding either module is better than CP-only.
            assert rmse_cp_clust <= (rmse_cp_only + 1e-9)
            assert rmse_cp_cma <= (rmse_cp_only + 1e-9)

            logging.info(f"Verification successful for regime '{regime}': Performance improves with added modules.")

        except (KeyError, AssertionError) as e:
            # If the expected ordering does not hold, log a warning. This is a
            # critical finding that deviates from the paper's results.
            logging.warning(
                f"Verification failed for regime '{regime}'. Expected performance "
                f"ordering does not hold. Error: {e}"
            )
            # Note: For rigorous academic work, statistical significance tests
            # (e.g., bootstrapping Diebold-Mariano) would be applied here to
            # confirm if these differences are statistically meaningful.

    logging.info("Step 2/3: Verification of incremental improvements complete.")

    # --- Step 3: Save the ablation table in multiple formats ---
    # Create a display-friendly version of the table, formatted like Panel C.
    # Unstack the 'Regime' level to create columns for each metric under each regime.
    panel_c_df = results_df.unstack(level='Regime')

    # Reorder columns to group by regime, then by metric.
    panel_c_df = panel_c_df.swaplevel(0, 1, axis=1).sort_index(axis=1)

    # Define the desired column order to match the paper's table.
    metric_order = ['RMSE_imp', 'MAE_imp', 'MAPE_imp', 'R2_imp']
    regime_order = results_df.index.get_level_values('Regime').unique()
    final_column_order = [(reg, met) for reg in regime_order for met in metric_order]
    panel_c_df = panel_c_df[final_column_order]

    # Save the raw, stacked data to CSV for easy programmatic access.
    raw_csv_path = os.path.join(output_dir, "results_ablation_raw.csv")
    results_df.to_csv(raw_csv_path)
    logging.info(f"Raw ablation results saved to {raw_csv_path}")

    # Save the formatted, publication-style table to a separate CSV.
    formatted_csv_path = os.path.join(output_dir, "results_ablation_panel_C.csv")
    panel_c_df.round(4).to_csv(formatted_csv_path)
    logging.info(f"Formatted Panel C results saved to {formatted_csv_path}")

    # Save to LaTeX format for easy inclusion in a research paper.
    latex_path = os.path.join(output_dir, "results_ablation_panel_C.tex")
    panel_c_df.round(4).to_latex(latex_path, multirow=True, multicolumn=True)
    logging.info(f"Formatted Panel C results saved to {latex_path}")

    logging.info("Step 3/3: Ablation results tables compiled and saved.")

    return panel_c_df


In [None]:
# Task 27: Regularization-Free Stability Test (Vary $\lambda$ for Sparse Clusters)

# ==============================================================================
# Task 27: Regularization-Free Stability Test
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 27, Step 3: Plotting and Saving Results (Helper Function)
# This helper is retained as its logic was sound.
# ------------------------------------------------------------------------------
def _plot_and_save_stability_results(
    results_df: pd.DataFrame,
    regime: str,
    output_dir: str
) -> None:
    """
    Plots the RMSE vs. lambda results, verifies flatness, and saves artifacts.

    Args:
        results_df: DataFrame with 'lambda' and 'RMSE_imp' columns.
        regime: The missingness regime being tested.
        output_dir: Directory to save the plot and numerical results.
    """
    # --- Save Numerical Results ---
    # Save the detailed numerical results to a CSV file for auditing and analysis.
    results_path = os.path.join(output_dir, f"stability_results_{regime}.csv")
    results_df.to_csv(results_path, index=False)
    logging.info(f"Numerical stability results saved to {results_path}")

    # --- Plotting ---
    # Set a professional plot style for publication quality.
    sns.set_theme(style="whitegrid")
    plt.figure(figsize=(10, 6))

    # Create the line plot of RMSE against lambda.
    ax = sns.lineplot(data=results_df, x='lambda', y='RMSE_imp', marker='o', color='b')

    # Set the x-axis to a logarithmic scale to visualize the wide range of lambda values.
    ax.set_xscale('log')
    # Set plot titles and labels for clarity.
    ax.set_title(f'RMSE Sensitivity to Regularization (λ) under {regime} Missingness', fontsize=16)
    ax.set_xlabel('Regularization Strength (λ)', fontsize=12)
    ax.set_ylabel('Out-of-Sample RMSE', fontsize=12)
    ax.grid(True, which="both", ls="--")

    # Save the plot to a high-resolution PDF file.
    plot_path = os.path.join(output_dir, f"stability_plot_{regime}.pdf")
    plt.savefig(plot_path, bbox_inches='tight')
    plt.close()
    logging.info(f"Stability plot for regime '{regime}' saved to {plot_path}")

    # --- Programmatic Verification of Flatness ---
    # Calculate the absolute range of RMSE values observed across the lambda grid.
    rmse_range = results_df['RMSE_imp'].max() - results_df['RMSE_imp'].min()

    # Define a strict tolerance for what constitutes "flat".
    flatness_tolerance = 1e-4

    # Check if the variation is negligible, confirming the paper's claim.
    if rmse_range < flatness_tolerance:
        logging.info(
            f"VERIFICATION SUCCESSFUL: RMSE is stable for regime '{regime}'. "
            f"Total variation ({rmse_range:.6f}) is less than tolerance ({flatness_tolerance})."
        )
    else:
        logging.warning(
            f"VERIFICATION FAILED: RMSE shows non-negligible variation for regime '{regime}'. "
            f"Total variation = {rmse_range:.6f}"
        )


# ------------------------------------------------------------------------------
# Task 27, Orchestrator Function
# ------------------------------------------------------------------------------
def run_regularization_stability_test(
    regime: str,
    X_train: np.ndarray,
    mask_train: np.ndarray,
    tensor_true: np.ndarray,
    mask_eval: np.ndarray,
    clusters: Dict[int, List[int]],
    dense_cluster_ids: List[int],
    sparse_cluster_ids: List[int],
    config: Dict[str, Any],
    output_dir: str = "results"
) -> pd.DataFrame:
    """
    Runs the regularization stability test for a given missingness regime.

    This function systematically varies the regularization parameter `lambda` for
    the sparse cluster completion step and measures the impact on the final
    out-of-sample imputation RMSE. It is designed to empirically verify the paper's
    claim that the ACT-Tensor framework is intrinsically stable. This is a complete,
    production-grade implementation that avoids re-computing unnecessary steps.

    Args:
        regime: The missingness regime to test (e.g., 'Block', 'Logit').
        X_train, mask_train, tensor_true, mask_eval: The necessary data tensors.
        clusters, dense_cluster_ids, sparse_cluster_ids: The clustering results.
        config: The main configuration dictionary.
        output_dir: Directory to save plots and result data.

    Returns:
        A pandas DataFrame containing the (lambda, RMSE) results for the regime.
    """
    # --- Input Validation and Setup ---
    # Verify that the specified regime is one designated for this test.
    if regime not in config["stability_test"]["regimes"]:
        logging.warning(f"Regime '{regime}' is not in the stability test list defined in config. Skipping.")
        return pd.DataFrame()

    # Create a dedicated subdirectory for this test's artifacts to avoid conflicts.
    test_output_dir = os.path.join(output_dir, f"stability_test_{regime}")
    os.makedirs(test_output_dir, exist_ok=True)

    logging.info(f"--- Starting Regularization Stability Test for Regime: {regime} ---")

    # --- Step 1: One-Time Computation of Dense Clusters ---
    # The dense cluster completion is unpenalized and thus invariant to lambda.
    # We run it once and its results will be loaded by the assembly step in the loop.
    logging.info("Performing one-time completion of dense clusters...")
    complete_dense_clusters(
        dense_cluster_ids=dense_cluster_ids,
        config=config,
        data_dir=test_output_dir  # Use the dedicated directory
    )

    # --- Step 2: Iterate over Lambda Grid for Sparse Clusters ---
    # Extract the grid of lambda values to test from the configuration.
    lambda_grid = config["stability_test"]["lambda_grid"]

    # Initialize a list to store the results from each iteration.
    results = []

    # Loop through each value of lambda in the specified grid.
    for lambda_val in lambda_grid:
        iteration_start_time = time.time()
        logging.info(f"--- Testing lambda = {lambda_val:.1e} ---")

        # Create a deep copy of the config to safely modify it for this specific run.
        temp_config = copy.deepcopy(config)
        # Override the lambda value for the sparse cluster completion step.
        temp_config["imputation_hparams"]["ridge_lambda"] = lambda_val

        # 2a. Re-run sparse cluster completion with the current lambda.
        complete_sparse_clusters(
            sparse_cluster_ids=sparse_cluster_ids,
            dense_cluster_ids=dense_cluster_ids,
            clusters=clusters,
            X_train=X_train,
            mask_train=mask_train,
            config=temp_config,
            data_dir=test_output_dir
        )

        # 2b. Assemble the global tensor. This will load the newly computed sparse
        #     tensors and the static dense tensors from the test directory.
        X_hat_assembled = assemble_completed_tensor(
            clusters=clusters,
            tensor_shape=X_train.shape,
            mask_train=mask_train,
            X_train=X_train,
            config=temp_config,
            data_dir=test_output_dir
        )

        # 2c. Apply temporal smoothing (CMA is the default).
        X_tilde = apply_cma_smoothing(
            X_hat_assembled=X_hat_assembled,
            config=temp_config,
            output_dir=test_output_dir
        )

        # 2d. Evaluate the final out-of-sample RMSE.
        metrics = evaluate_imputation_accuracy(
            tensor_true=tensor_true,
            tensor_imputed=X_tilde,
            mask_eval=mask_eval
        )

        # Store the result for this lambda value.
        results.append({'lambda': lambda_val, 'RMSE_imp': metrics['RMSE_imp']})
        iteration_end_time = time.time()
        logging.info(
            f"Result for lambda = {lambda_val:.1e}: RMSE = {metrics['RMSE_imp']:.6f} "
            f"(took {iteration_end_time - iteration_start_time:.2f}s)"
        )

    # Convert the list of results into a pandas DataFrame for analysis and plotting.
    results_df = pd.DataFrame(results)

    # --- Step 3: Plot RMSE vs. lambda and Save ---
    # Call the helper function to generate the plot, save numerical results, and verify flatness.
    _plot_and_save_stability_results(results_df, regime, test_output_dir)

    return results_df


In [None]:
# Task 28: Construct Double-Sorted Value-Weighted Portfolios and Excess-Return Tensor $\mathcal{R}$

# ==============================================================================
# Task 28: Construct Portfolios and Return Tensor
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 28, Step 1: Prepare Data for Sorting (Helper Function)
# ------------------------------------------------------------------------------
def _prepare_sorting_dataframe(
    X_tilde: np.ndarray,
    df_clean: pd.DataFrame,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int],
    config: Dict[str, Any]
) -> pd.DataFrame:
    """
    Prepares a master DataFrame for portfolio sorting by merging market data and imputed characteristics.

    This function is the critical link between the imputation pipeline and the asset
    pricing evaluation. It takes the final imputed tensor, "un-tensors" it back into
    a panel DataFrame, and merges it with the necessary market data (returns, market
    capitalization) to create a single, unified dataset ready for sorting.

    Args:
        X_tilde: The final smoothed and imputed 3D tensor of shape (T, N, L).
        df_clean: The cleansed DataFrame from Task 3, containing returns and price/shares data.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.
        config: The study's configuration dictionary.

    Returns:
        A DataFrame indexed by (date, permno) ready for sorting, containing
        market equity, excess returns, and all L imputed characteristics.
    """
    # --- Input Validation ---
    if not isinstance(X_tilde, np.ndarray) or X_tilde.ndim != 3:
        raise TypeError("Input 'X_tilde' must be a 3D NumPy array.")
    if not isinstance(df_clean, pd.DataFrame):
        raise TypeError("Input 'df_clean' must be a pandas DataFrame.")

    # --- Prepare Market Data from df_clean ---
    # Select only the necessary columns to minimize memory footprint.
    market_data = df_clean[['prc', 'shrout', 'ret', 'RF']].copy()

    # Calculate Market Equity (ME) in millions of USD for weighting and sorting.
    # Formula: ME = |Price| * Shares Outstanding (in thousands) / 1000
    market_data['market_equity'] = market_data['prc'].abs() * market_data['shrout'] / 1000.0

    # Calculate monthly excess returns for portfolio value calculation.
    # The risk-free rate 'RF' is in percent format and must be converted to decimal.
    market_data['ret_excess'] = market_data['ret'] - (market_data['RF'] / 100.0)

    # --- "Un-tensor" the Imputed Characteristics ---
    # Get tensor dimensions.
    T, N, L = X_tilde.shape

    # Create a DataFrame from the reshaped tensor. This flattens the tensor into a 2D array.
    char_df = pd.DataFrame(
        X_tilde.reshape(T * N, L),
        # Programmatically reconstruct the (date, permno) MultiIndex using the provided mappings.
        # This step is critical for ensuring perfect alignment.
        index=pd.MultiIndex.from_product(
            [time_to_idx.keys(), firm_to_idx.keys()], names=['date', 'permno']
        ),
        # Assign column names to the characteristics.
        columns=[f'char_{i}' for i in range(L)]
    )

    # --- Merge Market Data and Characteristics ---
    # Join the market data and characteristic DataFrames on their common index.
    # An inner join ensures we only keep firm-months with all necessary data.
    df_master = market_data.join(char_df, how='inner')

    # Drop any remaining rows with missing market equity or returns, as these
    # are essential for sorting and weighting.
    df_master.dropna(subset=['market_equity', 'ret_excess'], inplace=True)

    # Log the size of the final prepared DataFrame.
    logging.info(f"Prepared master sorting DataFrame with {len(df_master)} observations.")

    # Return the unified DataFrame.
    return df_master


# ------------------------------------------------------------------------------
# Task 28, Orchestrator Function
# ------------------------------------------------------------------------------
def construct_portfolio_return_tensor(
    X_tilde: np.ndarray,
    df_clean: pd.DataFrame,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int],
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> np.ndarray:
    """
    Constructs a 4D tensor of double-sorted, value-weighted portfolio returns.

    This function is the core of the asset pricing evaluation setup. It translates
    firm-level imputed characteristics into tradable portfolio returns, creating the
    test assets for subsequent factor analysis. The process involves a classic
    Fama-French style double sort, performed cross-sectionally each month.

    Args:
        X_tilde: The final smoothed and imputed 3D tensor from the imputation pipeline.
        df_clean: The cleansed DataFrame from Task 3.
        time_to_idx: Mapping from timestamp to integer index.
        firm_to_idx: Mapping from permno to integer index.
        config: The study's configuration dictionary.
        output_dir: Directory to save the final return tensor and audit files.

    Returns:
        A 4D NumPy array of shape (P, Q, L-1, T) containing the value-weighted
        excess returns for each portfolio over time.
    """
    # Log the start of the task.
    logging.info("--- Starting Portfolio Return Tensor Construction ---")

    # --- Step 1: Prepare the master DataFrame for sorting ---
    # This helper function merges imputed characteristics with market data.
    df_sort = _prepare_sorting_dataframe(X_tilde, df_clean, time_to_idx, firm_to_idx, config)

    # --- Step 2: Perform Double Sorts ---
    # Get portfolio construction parameters from the configuration.
    P = config["portfolio_settings"]["P_size_buckets"]
    Q = config["portfolio_settings"]["Q_char_buckets"]
    L = config["tensor_policies"]["L_characteristics"]

    # Assume 'char_0' is the size characteristic. All other characteristics will be used for the second sort.
    # Note: A more robust implementation might get the size characteristic name from the config.
    size_char_name = 'char_0'
    other_char_names = [f'char_{i}' for i in range(1, L)]

    # Define a function to perform the double sort on a single month's data.
    def perform_double_sort(group: pd.DataFrame) -> pd.DataFrame:
        # First sort: Assign firms to P quantiles based on market equity.
        # `pd.qcut` with `labels=False` creates integer bucket labels.
        # `duplicates='drop'` handles cases with many firms of the same size.
        group['size_bucket'] = pd.qcut(group['market_equity'], P, labels=False, duplicates='drop')

        # Second sort: Within each size bucket, sort firms into Q quantiles for each characteristic.
        # We use `groupby().transform()` for a vectorized implementation of this nested sort.
        for char_name in other_char_names:
            bucket_col_name = f'{char_name}_bucket'
            group[bucket_col_name] = group.groupby('size_bucket')[char_name].transform(
                lambda x: pd.qcut(x, Q, labels=False, duplicates='drop')
            )
        return group

    logging.info(f"Performing {P}x{Q} double sorts for each of {L-1} characteristics...")
    # Apply the sorting function to each monthly group of data.
    # `group_keys=False` prevents the group key from being added to the index.
    df_sorted = df_sort.groupby(level='date', group_keys=False).apply(perform_double_sort)

    # --- Step 3: Compute Value-Weighted Returns and Assemble Tensor ---
    # "Melt" the DataFrame from a wide format (many bucket columns) to a long format.
    # This is a key step for enabling efficient grouped aggregation.
    bucket_cols = [f'{name}_bucket' for name in other_char_names]
    df_long = pd.melt(
        df_sorted.reset_index(),
        id_vars=['date', 'permno', 'ret_excess', 'market_equity', 'size_bucket'],
        value_vars=bucket_cols,
        var_name='characteristic',
        value_name='char_bucket'
    )
    # Clean the characteristic name (e.g., 'char_1_bucket' -> 'char_1').
    df_long['characteristic'] = df_long['characteristic'].str.replace('_bucket', '')
    # Drop rows where a firm could not be assigned to a bucket (e.g., due to NaNs).
    df_long.dropna(subset=['size_bucket', 'char_bucket'], inplace=True)

    # Define the value-weighting aggregation function.
    def value_weighted_mean(group: pd.DataFrame) -> float:
        # Formula: R_p = sum(w_i * R_i) / sum(w_i), where w_i is market equity.
        return np.average(group['ret_excess'], weights=group['market_equity'])

    # Group by all portfolio dimensions (date, characteristic, size, char_bucket).
    # Then apply the value-weighting function to each group.
    portfolio_returns = df_long.groupby(
        ['date', 'characteristic', 'size_bucket', 'char_bucket']
    ).apply(value_weighted_mean, include_groups=False)

    # Convert the resulting Series into a multi-dimensional DataFrame via unstacking.
    portfolio_returns_df = portfolio_returns.unstack(level=['characteristic', 'size_bucket', 'char_bucket'])

    # Reindex to create a complete grid of all possible portfolios, filling any
    # portfolios that were empty in a given month with a return of 0.0.
    full_portfolio_index = pd.MultiIndex.from_product([
        other_char_names, range(P), range(Q)
    ], names=['characteristic', 'size_bucket', 'char_bucket'])
    portfolio_returns_df = portfolio_returns_df.reindex(columns=full_portfolio_index).fillna(0.0)

    # Convert the DataFrame of returns to a NumPy array.
    # The shape will be (T, L-1, P, Q).
    return_array = portfolio_returns_df.to_numpy().reshape(T, L - 1, P, Q)

    # Transpose the array to the final desired shape: (P, Q, L-1, T).
    return_tensor = return_array.transpose(2, 3, 1, 0)

    logging.info(f"Constructed portfolio return tensor with shape {return_tensor.shape}.")

    # Save the final tensor artifact.
    os.makedirs(output_dir, exist_ok=True)
    tensor_path = os.path.join(output_dir, "tensor_R.npz")
    np.savez_compressed(tensor_path, R=return_tensor)
    logging.info(f"Portfolio return tensor saved to {tensor_path}")

    # Return the final tensor.
    return return_tensor


In [None]:
# Task 29: Extract Latent Factors via Partial Tucker Decomposition (HOSVD)

# ==============================================================================
# Task 29: Extract Latent Factors via HOSVD
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 29, Step 2: HOSVD to compute loading matrices (Helper)
# ------------------------------------------------------------------------------
def _compute_hosvd_loadings(
    tensor: np.ndarray,
    modes_to_decompose: List[int],
    mode_ranks: List[int]
) -> List[np.ndarray]:
    """
    Computes the loading matrices for a partial Tucker decomposition via HOSVD.

    This function implements the core of the Higher-Order SVD algorithm. For each
    specified mode, it unfolds the tensor into a matrix, computes the SVD of that
    matrix, and extracts the top `k` left singular vectors to form the orthogonal
    loading matrix (basis) for that mode.

    Args:
        tensor: The N-dimensional NumPy array to decompose.
        modes_to_decompose: A list of integer mode indices to be decomposed.
        mode_ranks: A list of integers specifying the target rank for each mode
                    in `modes_to_decompose`.

    Returns:
        A list of loading matrices (NumPy arrays), one for each factored mode.
    """
    # --- Input Validation ---
    if len(modes_to_decompose) != len(mode_ranks):
        raise ValueError("Length of 'modes_to_decompose' and 'mode_ranks' must be equal.")

    # Initialize a list to store the computed loading matrices.
    loading_matrices = []

    # Iterate through the modes to be decomposed.
    for mode, rank in zip(modes_to_decompose, mode_ranks):
        # --- Unfolding (Matricization) ---
        # Unfold the tensor along the current mode using the tensorly library.
        # This is a robust and standard way to perform matricization.
        unfolded_matrix = tl.unfold(tensor, mode)

        # --- SVD Computation ---
        # Compute the singular value decomposition of the unfolded matrix.
        # `full_matrices=False` is more memory and computationally efficient.
        try:
            U, _, _ = np.linalg.svd(unfolded_matrix, full_matrices=False)
        except np.linalg.LinAlgError as e:
            logging.error(f"SVD computation failed for mode {mode}: {e}")
            raise

        # --- Extraction ---
        # The loading matrix is the first `rank` columns of the left singular vectors.
        # This forms the orthogonal basis for the compressed mode.
        loading_matrix = U[:, :rank]
        loading_matrices.append(loading_matrix)

        logging.info(f"Computed loading matrix for mode {mode} with shape {loading_matrix.shape}.")

    return loading_matrices


# ------------------------------------------------------------------------------
# Task 29, Orchestrator Function
# ------------------------------------------------------------------------------
def extract_latent_factors(
    R_tensor: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> np.ndarray:
    """
    Extracts latent factors from the portfolio return tensor via HOSVD.

    This function implements the dimensionality reduction step of the asset pricing
    pipeline. It takes the high-dimensional tensor of portfolio returns and applies
    a partial Tucker decomposition to find a low-dimensional core tensor. The
    time series of this core tensor represent the latent factors driving returns.

    This implementation uses the `tensorly` library for robust multilinear algebra.

    Args:
        R_tensor: The 4D tensor of portfolio returns with shape (P, Q, L-1, T).
        config: The study's configuration dictionary.
        output_dir: Directory to save the factor matrix and loading matrices.

    Returns:
        A 2D NumPy array of shape (T, k), where k is the total number of latent
        factors, representing the time series of factor realizations.
    """
    # --- Input Validation ---
    if not isinstance(R_tensor, np.ndarray) or R_tensor.ndim != 4:
        raise TypeError("Input 'R_tensor' must be a 4D NumPy array.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)
    logging.info("--- Starting Latent Factor Extraction via HOSVD ---")

    # --- Step 1: Set up the Partial Tucker Decomposition ---
    # Get the target ranks for the portfolio dimensions from the config.
    ranks_config = config["factor_extraction"]["mode_ranks"]
    # The modes to decompose are the first three (0, 1, 2). Mode 3 (time) is preserved.
    modes_to_decompose = [0, 1, 2]
    mode_ranks = [ranks_config["k_p"], ranks_config["k_q"], ranks_config["k_ell"]]

    # --- Step 2: Compute HOSVD to get the loading matrices ---
    # Call the helper function to get the orthogonal bases for the portfolio modes.
    U, V, W = _compute_hosvd_loadings(R_tensor, modes_to_decompose, mode_ranks)

    # Save the loading matrices for diagnostics and potential reconstruction.
    loadings_path = os.path.join(output_dir, "tucker_loadings.npz")
    np.savez_compressed(loadings_path, U=U, V=V, W=W)
    logging.info(f"Step 2/3: Loading matrices computed and saved to {loadings_path}.")

    # --- Step 3: Project back to obtain the core tensor and factor matrix ---
    # The core tensor is found by projecting the original tensor onto the new bases.
    # We use `tensorly.tenalg.multi_mode_dot` for a robust and efficient implementation.
    # Formula: F = R x_1 U^T x_2 V^T x_3 W^T
    core_tensor = multi_mode_dot(R_tensor, [U.T, V.T, W.T], modes=modes_to_decompose)

    # The resulting core tensor has shape (k_p, k_q, k_ell, T).
    logging.info(f"Computed core tensor with shape {core_tensor.shape}.")

    # The time series of latent factors is the vectorized core tensor at each time t.
    # Transpose to (T, k_p, k_q, k_ell) to bring the time dimension to the front.
    factors_over_time = core_tensor.transpose(3, 0, 1, 2)

    # Reshape into the final 2D factor matrix of shape (T, k_p*k_q*k_ell).
    T = R_tensor.shape[3]
    total_factors = np.prod(mode_ranks)
    factor_matrix = factors_over_time.reshape(T, total_factors)

    logging.info(f"Step 3/3: Reshaped core tensor into factor matrix of shape {factor_matrix.shape}.")

    # Save the final factor matrix, which is the primary output of this task.
    factors_path = os.path.join(output_dir, "factors_F.npz")
    np.savez_compressed(factors_path, F=factor_matrix)
    logging.info(f"Factor matrix saved to {factors_path}.")

    # Return the factor matrix for use in the next pipeline step.
    return factor_matrix


In [None]:
# Task 30: Forward Stepwise Factor Selection and Time-Series Regressions

# ==============================================================================
# Task 30: Forward Stepwise Factor Selection and Time-Series Regressions
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 30, Step 2: Pseudo Cross-Sectional R-squared (Helper Function)
# ------------------------------------------------------------------------------
def _calculate_pseudo_r2(
    Y: np.ndarray,
    X_candidate: np.ndarray,
    denom_r2: float
) -> float:
    """
    Calculates the pseudo cross-sectional R-squared for a given set of factors.

    Args:
        Y: Matrix of portfolio returns (N_port, T-1).
        X_candidate: Matrix of candidate factors including intercept (T-1, k_factors).
        denom_r2: The pre-computed denominator for the R-squared calculation.

    Returns:
        The pseudo cross-sectional R-squared value.
    """
    # Solve for the regression coefficients (alphas and betas) for all portfolios
    # simultaneously using a single matrix operation.
    # B = (X'X)^-1 * (X'Y')
    try:
        coeffs = np.linalg.solve(X_candidate.T @ X_candidate, X_candidate.T @ Y.T)
    except np.linalg.LinAlgError:
        # If the matrix is singular, this factor set is invalid.
        return -np.inf

    # The alphas (pricing errors) are the coefficients of the intercept term (first row).
    alphas = coeffs[0, :]

    # The numerator of the R-squared formula is the mean squared alpha.
    numerator_r2 = np.mean(alphas**2)

    # Calculate the pseudo cross-sectional R-squared.
    # Formula: R2_xs = 1 - (mean(alpha^2) / Var_xs(mean_returns))
    r2_xs = 1 - (numerator_r2 / denom_r2)

    return r2_xs


# ------------------------------------------------------------------------------
# Task 30, Orchestrator Function
# ------------------------------------------------------------------------------
def select_factors_and_predict(
    R_tensor: np.ndarray,
    factor_matrix: np.ndarray,
    config: Dict[str, Any],
    output_dir: str = "data_processed"
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Selects predictive factors via forward stepwise regression and computes forecasts.

    This function implements a forward stepwise selection algorithm to choose a
    parsimonious set of latent factors that best explain the cross-section of
    portfolio returns. The selection criterion is the maximization of the pseudo
    cross-sectional R-squared. After selection, it runs the final regressions
    to obtain pricing errors (alphas), factor loadings (betas), and return forecasts.

    Args:
        R_tensor: The 4D tensor of portfolio returns (P, Q, L-1, T).
        factor_matrix: The 2D matrix of all potential latent factors (T, k_total).
        config: The study's configuration dictionary.
        output_dir: Directory to save the selected factors and regression outputs.

    Returns:
        A tuple containing:
        - alphas (np.ndarray): Vector of pricing errors for each portfolio (N_port,).
        - betas (np.ndarray): Matrix of factor loadings for each portfolio (N_port, M).
        - R_hat (np.ndarray): Matrix of predicted returns (N_port, T-1).
    """
    logging.info("--- Starting Factor Selection and Prediction ---")
    os.makedirs(output_dir, exist_ok=True)

    # --- Step 1: Prepare Data for Regression ---
    # Reshape the 4D return tensor into a 2D matrix (N_port, T).
    P, Q, L_minus_1, T = R_tensor.shape
    N_port = P * Q * L_minus_1
    portfolio_returns = R_tensor.reshape(N_port, T)

    # Align dependent and independent variables for predictive regression r_t+1 ~ f_t.
    Y = portfolio_returns[:, 1:]  # Returns from t=1 to T-1
    X_all_factors = factor_matrix[:-1, :] # Factors from t=0 to T-2

    # Add an intercept term to the factor matrix for OLS.
    X_with_intercept = np.hstack([np.ones((T - 1, 1)), X_all_factors])

    logging.info(f"Step 1/3: Prepared data for regression. Y shape: {Y.shape}, X shape: {X_with_intercept.shape}.")

    # --- Step 2: Forward Stepwise Factor Selection ---
    # Pre-compute the constant denominator for the R-squared calculation.
    # This is the cross-sectional variance of the time-series mean portfolio returns.
    mean_returns_xs = np.mean(Y, axis=1)
    denom_r2 = np.var(mean_returns_xs)

    # Get selection parameters from config.
    target_num_factors = config["factor_extraction"]["target_num_factors"]

    # Initialize the set of selected factor indices (plus intercept).
    selected_indices = [0] # Start with the intercept
    # Initialize the set of candidate factor indices.
    candidate_indices = list(range(1, X_with_intercept.shape[1]))

    logging.info(f"Starting forward stepwise selection to find {target_num_factors} factors...")
    for i in range(target_num_factors):
        best_r2 = -np.inf
        best_candidate = -1

        # Iterate through all remaining candidate factors.
        for cand_idx in candidate_indices:
            # Form a temporary set of factors to test.
            temp_indices = selected_indices + [cand_idx]
            X_candidate = X_with_intercept[:, temp_indices]

            # Calculate the R-squared for this set of factors.
            r2 = _calculate_pseudo_r2(Y, X_candidate, denom_r2)

            # If this is the best factor so far, store it.
            if r2 > best_r2:
                best_r2 = r2
                best_candidate = cand_idx

        # Add the best factor to the selected set and remove it from candidates.
        if best_candidate != -1:
            selected_indices.append(best_candidate)
            candidate_indices.remove(best_candidate)
            logging.info(
                f"Step {i+1}/{target_num_factors}: Selected factor index {best_candidate-1} "
                f"(Column {best_candidate}). New R2_xs = {best_r2:.4f}"
            )
        else:
            logging.warning("Could not find a factor to improve R-squared. Stopping selection.")
            break

    # The final selected factor indices (excluding the intercept).
    final_factor_indices = [i - 1 for i in selected_indices if i > 0]
    logging.info(f"Step 2/3: Final selected factor indices: {final_factor_indices}")

    # --- Step 3: Run Final Regressions and Compute Predictions ---
    # Get the final matrix of selected factors (including intercept).
    X_final = X_with_intercept[:, selected_indices]

    # Run the final multivariate OLS regression for all portfolios simultaneously.
    # Coeffs = (X'X)^-1 * (X'Y')
    final_coeffs = np.linalg.solve(X_final.T @ X_final, X_final.T @ Y.T)

    # The alphas are the first row of the coefficient matrix (intercept).
    alphas = final_coeffs[0, :]
    # The betas are the other rows, transposed to be (N_port, M).
    betas = final_coeffs[1:, :].T

    # Compute the matrix of predicted returns.
    # R_hat = X_final * Coeffs
    R_hat = (X_final @ final_coeffs).T # Shape (N_port, T-1)

    logging.info("Step 3/3: Final regressions complete. Alphas, betas, and predictions computed.")

    # --- Save Artifacts ---
    # Save the indices of the selected factors.
    np.savez_compressed(os.path.join(output_dir, "selected_factors.npz"), indices=np.array(final_factor_indices))
    # Save the estimated alphas (pricing errors).
    np.savez_compressed(os.path.join(output_dir, "alphas.npz"), alphas=alphas)
    # Save the estimated betas (factor loadings).
    np.savez_compressed(os.path.join(output_dir, "betas.npz"), betas=betas)
    # Save the predicted returns.
    np.savez_compressed(os.path.join(output_dir, "R_hat.npz"), R_hat=R_hat)

    logging.info("All factor selection and prediction artifacts have been saved.")

    return alphas, betas, R_hat


In [None]:
# Task 31: Compute Asset-Pricing Evaluation Metrics (RMSE_α, MAE_α, MAE-Rank, IC)

# ==============================================================================
# Task 31: Compute Asset-Pricing Evaluation Metrics
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 31, Step 1: Pricing Error Metrics (Helper Function)
# ------------------------------------------------------------------------------
def _compute_pricing_error_metrics(alphas: np.ndarray) -> Tuple[float, float]:
    """
    Computes the root mean squared and mean absolute pricing errors.

    Args:
        alphas: A 1D NumPy array of regression intercepts (pricing errors).

    Returns:
        A tuple containing (RMSE_alpha, MAE_alpha).
    """
    # Formula: RMSE_alpha = sqrt(mean(alpha^2))
    rmse_alpha = np.sqrt(np.mean(alphas**2))

    # Formula: MAE_alpha = mean(|alpha|)
    mae_alpha = np.mean(np.abs(alphas))

    return rmse_alpha, mae_alpha


# ------------------------------------------------------------------------------
# Task 31, Step 2: Predictive Ranking Accuracy (Helper Function)
# ------------------------------------------------------------------------------
def _compute_mae_rank(Y_realized: np.ndarray, Y_predicted: np.ndarray) -> float:
    """
    Computes the Mean Absolute Rank Error between realized and predicted returns.

    Args:
        Y_realized: A 2D (N_port, T-1) array of realized portfolio returns.
        Y_predicted: A 2D (N_port, T-1) array of predicted portfolio returns.

    Returns:
        The time-series average of the cross-sectional MAE-Rank.
    """
    # Get the number of time periods for averaging.
    num_periods = Y_realized.shape[1]

    # Initialize a list to store the MAE-Rank for each time period.
    monthly_mae_ranks = []

    # Iterate through each time period (column).
    for t in range(num_periods):
        # Extract the cross-section of realized and predicted returns for time t.
        realized_t = Y_realized[:, t]
        predicted_t = Y_predicted[:, t]

        # Compute ordinal ranks for both vectors. 'ordinal' assigns unique ranks.
        rank_realized = rankdata(realized_t, method='ordinal')
        rank_predicted = rankdata(predicted_t, method='ordinal')

        # Calculate the Mean Absolute Rank Error for the current time period.
        mae_rank_t = np.mean(np.abs(rank_realized - rank_predicted))
        monthly_mae_ranks.append(mae_rank_t)

    # The final metric is the time-series average of the monthly MAE-Ranks.
    # Formula: MAE-Rank = (1/T) * sum(MAE-Rank_t)
    mean_absolute_rank_error = np.mean(monthly_mae_ranks)

    return mean_absolute_rank_error


# ------------------------------------------------------------------------------
# Task 31, Step 3: Information Coefficient (Helper Function)
# ------------------------------------------------------------------------------
def _compute_information_coefficient(Y_realized: np.ndarray, Y_predicted: np.ndarray) -> float:
    """
    Computes the Information Coefficient (IC).

    The IC is the time-series average of the cross-sectional Pearson correlation
    between predicted and realized returns.

    Args:
        Y_realized: A 2D (N_port, T-1) array of realized portfolio returns.
        Y_predicted: A 2D (N_port, T-1) array of predicted portfolio returns.

    Returns:
        The Information Coefficient.
    """
    # Get the number of time periods.
    num_periods = Y_realized.shape[1]

    # Initialize a list to store the IC for each time period.
    monthly_ics = []

    # Iterate through each time period.
    for t in range(num_periods):
        # Extract the cross-section of returns for time t.
        realized_t = Y_realized[:, t]
        predicted_t = Y_predicted[:, t]

        # Handle the edge case where a cross-section has zero variance.
        if np.std(realized_t) < 1e-9 or np.std(predicted_t) < 1e-9:
            # Correlation is undefined, append NaN and handle it during averaging.
            monthly_ics.append(np.nan)
            continue

        # Compute the Pearson correlation for the current time period.
        # Formula: IC_t = corr(r_t, r_hat_t)
        ic_t = np.corrcoef(realized_t, predicted_t)[0, 1]
        monthly_ics.append(ic_t)

    # The final IC is the time-series average of the monthly correlations.
    # `np.nanmean` safely ignores any periods where correlation was undefined.
    # Formula: IC = (1/T) * sum(IC_t)
    information_coefficient = np.nanmean(monthly_ics)

    return information_coefficient


# ------------------------------------------------------------------------------
# Task 31, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_asset_pricing_metrics(
    alphas: np.ndarray,
    R_hat: np.ndarray,
    R_tensor: np.ndarray
) -> Dict[str, float]:
    """
    Computes a suite of standard asset pricing evaluation metrics.

    This function quantifies the performance of the factor model by calculating:
    1. Pricing Accuracy: How well the factors explain average returns, measured
       by the magnitude of the pricing errors (alphas).
    2. Predictive Power: How well the model's forecasts align with realized
       outcomes, measured by rank correlation (MAE-Rank) and linear correlation (IC).

    Args:
        alphas: 1D array of pricing errors from the time-series regressions.
        R_hat: 2D array (N_port, T-1) of predicted portfolio returns.
        R_tensor: The original 4D tensor (P, Q, L-1, T) of realized returns.

    Returns:
        A dictionary containing the computed metrics: 'RMSE_alpha', 'MAE_alpha',
        'MAE_Rank', and 'IC'.
    """
    # --- Input Validation ---
    if not all(isinstance(arr, np.ndarray) for arr in [alphas, R_hat, R_tensor]):
        raise TypeError("All inputs must be NumPy arrays.")

    logging.info("--- Computing Asset Pricing Evaluation Metrics ---")

    # --- Prepare Data ---
    # Reshape the realized return tensor to match the (N_port, T) format.
    P, Q, L_minus_1, T = R_tensor.shape
    N_port = P * Q * L_minus_1
    portfolio_returns = R_tensor.reshape(N_port, T)

    # Align realized returns with the prediction period.
    Y_realized = portfolio_returns[:, 1:]

    # Validate shapes.
    if R_hat.shape != Y_realized.shape:
        raise ValueError("Shape mismatch between predicted and realized returns.")
    if alphas.shape[0] != N_port:
        raise ValueError("Length of alphas does not match number of portfolios.")

    # --- Step 1: Compute Pricing Error Metrics ---
    # Call the helper to compute RMSE and MAE of the pricing errors.
    rmse_alpha, mae_alpha = _compute_pricing_error_metrics(alphas)
    logging.info(f"Pricing Accuracy: RMSE_alpha = {rmse_alpha:.4f}, MAE_alpha = {mae_alpha:.4f}")

    # --- Step 2: Compute Predictive Ranking Accuracy ---
    # Call the helper to compute the Mean Absolute Rank Error.
    mae_rank = _compute_mae_rank(Y_realized, R_hat)
    logging.info(f"Predictive Power (Rank): MAE-Rank = {mae_rank:.2f}")

    # --- Step 3: Compute Information Coefficient ---
    # Call the helper to compute the Information Coefficient.
    ic = _compute_information_coefficient(Y_realized, R_hat)
    logging.info(f"Predictive Power (Linear): IC = {ic:.4f}")

    # --- Compile and Return Results ---
    # Aggregate all computed metrics into a single dictionary.
    metrics = {
        "RMSE_alpha": rmse_alpha,
        "MAE_alpha": mae_alpha,
        "MAE_Rank": mae_rank,
        "IC": ic
    }

    return metrics


In [None]:
# Task 32: Construct the Top-Minus-Bottom (T–B) Long-Short Portfolio

# ==============================================================================
# Task 32: Construct the Top-Minus-Bottom (T–B) Long-Short Portfolio
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 32, Orchestrator Function
# ------------------------------------------------------------------------------
def construct_tb_portfolio(
    R_hat: np.ndarray,
    R_tensor: np.ndarray,
    output_dir: str = "data_processed"
) -> np.ndarray:
    """
    Constructs the time series of returns for a Top-minus-Bottom (T-B) strategy.

    This function simulates an investable long-short portfolio strategy based on the
    model's return predictions. Each month, it identifies the top and bottom deciles
    of test portfolios based on their predicted returns. It then calculates the
    realized return of a strategy that goes long the top decile and short the
    bottom decile, assuming equal weighting within each leg.

    Args:
        R_hat: 2D array (N_port, T-1) of predicted portfolio returns from Task 30.
        R_tensor: The original 4D tensor (P, Q, L-1, T) of realized returns.
        output_dir: Directory to save the T-B return series.

    Returns:
        A 1D NumPy array of length (T-1) containing the time series of the
        T-B portfolio's excess returns.

    Raises:
        ValueError: If input shapes are inconsistent.
        TypeError: If inputs are not NumPy arrays.
    """
    # --- Input Validation ---
    # Verify that inputs are NumPy arrays.
    if not all(isinstance(arr, np.ndarray) for arr in [R_hat, R_tensor]):
        raise TypeError("All inputs must be NumPy arrays.")

    logging.info("--- Constructing Top-minus-Bottom (T-B) Portfolio ---")

    # --- Prepare Data ---
    # Reshape the realized return tensor to match the (N_port, T) format.
    P, Q, L_minus_1, T = R_tensor.shape
    N_port = P * Q * L_minus_1
    portfolio_returns = R_tensor.reshape(N_port, T)

    # Align realized returns with the prediction period (t=1 to T-1).
    Y_realized = portfolio_returns[:, 1:]

    # Validate that the shapes of predicted and realized returns match.
    if R_hat.shape != Y_realized.shape:
        raise ValueError(
            f"Shape mismatch: R_hat shape {R_hat.shape} does not match "
            f"realized returns shape {Y_realized.shape}."
        )

    # Get the number of time periods for the strategy.
    num_periods = R_hat.shape[1]

    # Initialize a list to store the monthly T-B returns.
    tb_returns = []

    # --- Main Loop: Iterate Through Time to Form Portfolios ---
    # Loop from t=0 to T-2 to form portfolios for month t+1.
    for t in range(num_periods):
        # --- Step 1: Identify Top and Bottom Deciles ---
        # Extract the cross-section of predicted returns for the current period.
        predictions_t = R_hat[:, t]

        # Calculate the 10th and 90th percentile breakpoints for this month's predictions.
        bottom_decile_breakpoint = np.percentile(predictions_t, 10)
        top_decile_breakpoint = np.percentile(predictions_t, 90)

        # Create boolean masks to identify portfolios in the top and bottom deciles.
        in_bottom_decile = predictions_t <= bottom_decile_breakpoint
        in_top_decile = predictions_t >= top_decile_breakpoint

        # --- Step 2: Compute the T-B Portfolio Return ---
        # Extract the realized returns for the next period (t+1).
        realized_returns_t_plus_1 = Y_realized[:, t]

        # Calculate the equally-weighted average realized return of the top decile portfolios.
        top_leg_return = np.mean(realized_returns_t_plus_1[in_top_decile])

        # Calculate the equally-weighted average realized return of the bottom decile portfolios.
        bottom_leg_return = np.mean(realized_returns_t_plus_1[in_bottom_decile])

        # The T-B return is the difference between the long (top) and short (bottom) legs.
        # Formula: r_TB = r_top - r_bottom
        monthly_tb_return = top_leg_return - bottom_leg_return

        # Append the calculated return to our time series.
        tb_returns.append(monthly_tb_return)

    # Convert the list of monthly returns to a NumPy array.
    tb_returns_series = np.array(tb_returns)
    logging.info("Step 1 & 2: Monthly T-B returns calculated for all periods.")

    # --- Step 3: Verify and Save the T-B Return Series ---
    # Perform a basic sanity check on the computed returns.
    if np.isnan(tb_returns_series).any():
        logging.warning("T-B return series contains NaN values.")

    # Log summary statistics of the strategy returns.
    logging.info(
        f"T-B Strategy Summary: Mean Monthly Return = {np.mean(tb_returns_series):.4f}, "
        f"Std Dev = {np.std(tb_returns_series):.4f}"
    )

    # Save the T-B return series to a compressed archive.
    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, "TB_returns.npz")
    np.savez_compressed(output_path, r_TB=tb_returns_series)
    logging.info(f"Step 3/3: T-B return series saved to {output_path}.")

    # Return the final time series of strategy returns.
    return tb_returns_series


In [None]:
# Task 33: Compute Annualized Sharpe Ratio for the T–B Strategy

# ==============================================================================
# Task 33: Compute Annualized Sharpe Ratio for the T–B Strategy
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 33, Orchestrator Function
# ------------------------------------------------------------------------------
def compute_annualized_sharpe_ratio(
    r_tb_series: np.ndarray,
    config: Dict[str, Any]
) -> float:
    """
    Computes the annualized Sharpe Ratio for a time series of monthly returns.

    This function calculates the most common metric of risk-adjusted return. It
    takes a series of monthly excess returns (assumed to be from a zero-investment,
    e.g., long-short, portfolio), computes its mean and standard deviation, and
    annualizes the resulting ratio.

    The statistical significance of the Sharpe Ratio is not computed here but is
    an important consideration for a full academic analysis (e.g., via bootstrapping
    or the methods of Ledoit & Wolf).

    Args:
        r_tb_series: A 1D NumPy array of monthly excess returns for the T-B strategy.
        config: The study's configuration dictionary, from which the annualization
                factor is sourced.

    Returns:
        The annualized Sharpe Ratio as a float. Returns np.nan if the ratio
        cannot be computed (e.g., zero volatility).

    Raises:
        ValueError: If the input series is too short to be meaningful.
        TypeError: If the input is not a 1D NumPy array.
    """
    # --- Input Validation ---
    # Verify that the input is a 1D NumPy array.
    if not isinstance(r_tb_series, np.ndarray) or r_tb_series.ndim != 1:
        raise TypeError("Input 'r_tb_series' must be a 1D NumPy array.")

    # Verify that the series is long enough for a meaningful standard deviation calculation.
    if len(r_tb_series) < 2:
        logging.warning("Return series is too short (length < 2) to compute Sharpe Ratio. Returning NaN.")
        return np.nan

    logging.info("--- Computing Annualized Sharpe Ratio for T-B Strategy ---")

    # --- Step 1: Compute the Annualized Sharpe Ratio ---
    # Calculate the arithmetic mean of the monthly returns.
    mean_monthly_return = np.mean(r_tb_series)

    # Calculate the sample standard deviation of the monthly returns.
    # `ddof=1` is critical for an unbiased estimate of the population standard deviation.
    std_monthly_return = np.std(r_tb_series, ddof=1)

    # Edge Case Handling: If volatility is zero or near-zero, the Sharpe Ratio is undefined.
    if std_monthly_return < 1e-9:
        logging.warning("Standard deviation of returns is near zero. Sharpe Ratio is undefined. Returning NaN.")
        return np.nan

    # Get the annualization factor from the configuration (typically sqrt(12) for monthly data).
    annualization_factor = np.sqrt(config["evaluation_metrics"]["sharpe_annualization_factor"])

    # Calculate the annualized Sharpe Ratio.
    # Formula: Sharpe_annual = sqrt(12) * (mean(r) / std(r))
    annualized_sharpe = annualization_factor * (mean_monthly_return / std_monthly_return)

    # --- Step 2: Verify Plausibility and Log ---
    # Log the components of the calculation for auditability.
    logging.info(f"Mean Monthly Return: {mean_monthly_return:.6f}")
    logging.info(f"Std Dev Monthly Return: {std_monthly_return:.6f}")
    logging.info(f"Annualization Factor: {annualization_factor:.4f}")
    logging.info(f"Final Annualized Sharpe Ratio: {annualized_sharpe:.4f}")

    # A sanity check for an unusually high Sharpe Ratio.
    if abs(annualized_sharpe) > 5.0:
        logging.warning(f"Calculated Sharpe Ratio ({annualized_sharpe:.2f}) is unusually high.")

    # Return the final scalar value.
    return annualized_sharpe


In [None]:
# Task 34: Compile the Asset-Pricing Results Table

# ==============================================================================
# Task 34: Compile the Asset-Pricing Results Table
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 34, Orchestrator Function
# ------------------------------------------------------------------------------
def compile_asset_pricing_results(
    results_list: List[Dict[str, Any]],
    output_dir: str = "results"
) -> pd.DataFrame:
    """
    Compiles all asset-pricing evaluation metrics into a final results table.

    This function aggregates the performance metrics (pricing errors, predictive
    power, and Sharpe ratio) from various model runs (e.g., ACT-Tensor variants,
    baselines) into a single, structured pandas DataFrame. It then formats this
    DataFrame to be publication-ready and saves it in multiple formats (CSV, JSON,
    LaTeX) for analysis and reporting.

    Args:
        results_list: A list of dictionaries, where each dictionary contains the
                      full set of asset pricing results for a single model run.
                      Required keys: 'Method', 'Regime', 'RMSE_alpha', 'MAE_alpha',
                      'MAE_Rank', 'IC', 'TB_Sharpe'.
        output_dir: The directory to save the final results tables.

    Returns:
        A pandas DataFrame containing the compiled and formatted asset pricing
        results, indexed by ('Regime', 'Method').

    Raises:
        ValueError: If the input list is empty or if dictionaries are missing
                    required keys.
    """
    # --- Input Validation ---
    # Verify that the input is a non-empty list of dictionaries.
    if not isinstance(results_list, list) or not results_list:
        raise ValueError("Input 'results_list' must be a non-empty list of dictionaries.")
    if not all(isinstance(item, dict) for item in results_list):
        raise TypeError("All items in 'results_list' must be dictionaries.")

    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    logging.info("--- Compiling Asset-Pricing Results Table ---")

    # --- Step 1: Aggregate Metrics into a DataFrame ---
    # Convert the list of result dictionaries directly into a pandas DataFrame.
    try:
        results_df = pd.DataFrame(results_list)
    except Exception as e:
        raise ValueError(f"Failed to create DataFrame from results list. Error: {e}")

    # Define the expected columns for validation.
    expected_cols = ['Method', 'Regime', 'RMSE_alpha', 'MAE_alpha', 'MAE_Rank', 'IC', 'TB_Sharpe']
    if not all(col in results_df.columns for col in expected_cols):
        missing = set(expected_cols) - set(results_df.columns)
        raise ValueError(f"Input dictionaries are missing required keys: {missing}")

    # Set a MultiIndex on 'Regime' and 'Method' for clear, hierarchical structuring.
    results_df.set_index(['Regime', 'Method'], inplace=True)
    # Sort the index to ensure a consistent and logical order.
    results_df.sort_index(inplace=True)

    logging.info("Step 1/3: Aggregated asset pricing metrics into a DataFrame.")

    # --- Step 2: Format the Table ---
    # Round all numerical metric columns to 4 decimal places for publication-quality precision.
    formatted_df = results_df.round(4)

    # Log a summary of the best performing method for key metrics in each regime.
    for regime in formatted_df.index.get_level_values('Regime').unique():
        regime_slice = formatted_df.loc[regime]
        best_sharpe_method = regime_slice['TB_Sharpe'].idxmax()
        best_ic_method = regime_slice['IC'].idxmax()
        best_rmse_method = regime_slice['RMSE_alpha'].idxmin()
        logging.info(f"Summary for Regime '{regime}':")
        logging.info(f"  - Best T-B Sharpe: {regime_slice.loc[best_sharpe_method, 'TB_Sharpe']:.4f} (Method: {best_sharpe_method})")
        logging.info(f"  - Best IC: {regime_slice.loc[best_ic_method, 'IC']:.4f} (Method: {best_ic_method})")
        logging.info(f"  - Best RMSE_alpha: {regime_slice.loc[best_rmse_method, 'RMSE_alpha']:.4f} (Method: {best_rmse_method})")

    logging.info("Step 2/3: Formatted results and logged performance summary.")

    # --- Step 3: Save the Results Table in Multiple Formats ---
    # Save the formatted, indexed data to CSV for easy viewing and analysis.
    csv_path = os.path.join(output_dir, "results_asset_pricing.csv")
    formatted_df.to_csv(csv_path)
    logging.info(f"Formatted asset pricing results saved to {csv_path}")

    # Save to JSON format with an index-oriented structure.
    json_path = os.path.join(output_dir, "results_asset_pricing.json")
    formatted_df.to_json(json_path, orient='index', indent=4)
    logging.info(f"Formatted asset pricing results saved to {json_path}")

    # Save to LaTeX format for direct inclusion in academic papers.
    latex_path = os.path.join(output_dir, "results_asset_pricing.tex")
    # The `to_latex` method creates a clean, publication-ready table.
    formatted_df.to_latex(latex_path, multirow=True)
    logging.info(f"Formatted asset pricing results saved to {latex_path}")

    logging.info("Step 3/3: All asset pricing results tables have been saved.")

    # Return the final formatted DataFrame.
    return formatted_df


In [None]:
# Task 35: Create an End-to-End Orchestrator Function for the Full Pipeline

# ==============================================================================
# Task 35: End-to-End Orchestrator Function for the Full Pipeline
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 35, Helper Function for Logging Setup
# ------------------------------------------------------------------------------
def _setup_logger(name: str, log_file: str, level: int = logging.INFO) -> logging.Logger:
    """
    Sets up a dedicated logger for a pipeline run to file and console.

    This helper function creates a logger instance that is isolated to a specific
    pipeline run. It configures it to write detailed logs to a specified file
    and simultaneously print informational messages to the console. This ensures
    a complete audit trail for each experiment while providing real-time feedback.

    Args:
        name: A unique name for the logger instance.
        log_file: The full path to the file where logs will be saved.
        level: The logging level (e.g., logging.INFO, logging.DEBUG).

    Returns:
        A configured logging.Logger instance.
    """
    # Get a logger instance with a unique name for this run.
    logger = logging.getLogger(name)

    # If handlers already exist (e.g., from a previous run in the same session), clear them.
    if logger.hasHandlers():
        logger.handlers.clear()

    # Set the logger's reporting level.
    logger.setLevel(level)

    # Create a file handler to write detailed logs to a file.
    # The mode 'w' ensures that the log file is overwritten for each new run.
    file_handler = logging.FileHandler(log_file, mode='w')
    # Define a detailed format for the file log.
    file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(file_formatter)

    # Create a stream handler to print concise logs to the console.
    stream_handler = logging.StreamHandler()
    # Define a simpler format for console output.
    stream_formatter = logging.Formatter('%(message)s')
    stream_handler.setFormatter(stream_formatter)

    # Add both handlers to the logger.
    logger.addHandler(file_handler)
    logger.addHandler(stream_handler)

    # Prevent the logger from propagating messages to the root logger.
    logger.propagate = False

    return logger

# ------------------------------------------------------------------------------
# Task 35, Helper for Downstream Asset Pricing Pipeline
# ------------------------------------------------------------------------------
def _run_downstream_pipeline(
    X_tilde: np.ndarray,
    df_clean: pd.DataFrame,
    time_to_idx: Dict[pd.Timestamp, int],
    firm_to_idx: Dict[int, int],
    config: Dict[str, Any],
    output_dir: str,
    model_name: str
) -> Dict[str, float]:
    """
    Executes the complete downstream asset pricing evaluation pipeline.

    This helper function encapsulates the entire workflow from portfolio
    construction to the final Sharpe Ratio calculation (Tasks 28-33). It takes
    a fully imputed tensor as input and returns a dictionary of all key asset
    pricing performance metrics. This modular design ensures that all models
    (e.g., ACT-Tensor and various baselines) are evaluated using the exact same,
    consistent methodology, which is critical for a fair comparison.

    Args:
        X_tilde: A fully imputed and smoothed 3D tensor of shape (T, N, L).
        df_clean: The cleansed DataFrame from Task 3, containing necessary
                  market data like prices, shares, and returns.
        time_to_idx: A dictionary mapping from pd.Timestamp to integer index
                     for the time dimension.
        firm_to_idx: A dictionary mapping from permno to integer index for the
                     firm dimension.
        config: The main configuration dictionary for the study.
        output_dir: The base directory to save intermediate artifacts generated
                    during the asset pricing pipeline.
        model_name: A string identifier for the model being evaluated (e.g.,
                    'ACT-Tensor_CMA', 'Baseline_Median'), used for logging and
                    creating unique subdirectories.

    Returns:
        A dictionary containing all computed asset pricing metrics for the given
        model, including RMSE_alpha, MAE_alpha, MAE_Rank, IC, and TB_Sharpe.
    """
    # Log the start of this specific downstream evaluation.
    logging.info(f"--- Starting Downstream Asset Pricing Pipeline for: {model_name} ---")

    # Create a dedicated subdirectory for this model's asset pricing artifacts
    # to prevent file name collisions between different models in the same run.
    ap_output_dir = os.path.join(output_dir, f"ap_{model_name}")
    os.makedirs(ap_output_dir, exist_ok=True)

    # Task 28: Construct a 4D tensor of double-sorted, value-weighted portfolio returns
    # using the provided imputed characteristics tensor `X_tilde`.
    R_tensor = construct_portfolio_return_tensor(
        X_tilde, df_clean, time_to_idx, firm_to_idx, config, ap_output_dir
    )

    # Task 29: Extract a parsimonious set of latent factors from the portfolio return tensor
    # using a partial Tucker decomposition (HOSVD).
    factor_matrix = extract_latent_factors(R_tensor, config, ap_output_dir)

    # Task 30: Use forward stepwise selection to identify the most predictive factors
    # and run final time-series regressions to get alphas, betas, and return forecasts.
    alphas, betas, R_hat = select_factors_and_predict(
        R_tensor, factor_matrix, config, ap_output_dir
    )

    # Task 31: Compute the core asset pricing metrics that measure pricing accuracy
    # (RMSE_alpha, MAE_alpha) and predictive skill (MAE_Rank, IC).
    ap_metrics = compute_asset_pricing_metrics(alphas, R_hat, R_tensor)

    # Task 32: Construct the time series of returns for a Top-minus-Bottom (T-B)
    # long-short strategy based on the model's return forecasts.
    r_tb = construct_tb_portfolio(R_hat, R_tensor, ap_output_dir)

    # Task 33: Compute the final, and most important, risk-adjusted performance metric:
    # the annualized Sharpe Ratio of the T-B strategy.
    sharpe = compute_annualized_sharpe_ratio(r_tb, config)
    # Add the Sharpe Ratio to the metrics dictionary.
    ap_metrics['TB_Sharpe'] = sharpe

    # Log the completion of this downstream evaluation.
    logging.info(f"--- Downstream Asset Pricing Pipeline for {model_name} COMPLETE ---")

    # Return the comprehensive dictionary of asset pricing metrics.
    return ap_metrics


# ------------------------------------------------------------------------------
# Task 35: End-to-End Orchestrator Function
# ------------------------------------------------------------------------------
def run_act_tensor_pipeline(
    df_raw: pd.DataFrame,
    config: Dict[str, Any],
    regime: str,
    smoother: str,
    run_baselines: bool = True,
    run_ablations: bool = True,
    run_stability_test: bool = False,
    base_output_dir: str = "experiment_results"
) -> Dict[str, Any]:
    """
    Executes the complete end-to-end ACT-Tensor research pipeline for a single experimental condition.

    This master orchestrator manages the entire workflow from raw data ingestion
    to the generation of final performance tables. This amended version includes
    a full, parallel asset pricing evaluation for baseline models, ensuring a
    complete and fair comparison, and removes all placeholders.

    Args:
        df_raw: The raw, unprocessed pandas DataFrame from data loading.
        config: The main `act_tensor_config` dictionary.
        regime: The evaluation regime to run, one of {"MAR", "Block", "Logit"}.
        smoother: The temporal smoothing method to use, one of {"CMA", "EMA", "KF"}.
        run_baselines: If True, also run and evaluate all baseline models, including
                       their downstream asset pricing performance.
        run_ablations: If True, also run and evaluate all ablation models.
        run_stability_test: If True, run the lambda-sensitivity test.
        base_output_dir: The root directory where all outputs for this run will be saved.

    Returns:
        A dictionary containing a comprehensive summary of all evaluation metrics
        and metadata for the completed run.
    """
    # --- 0. Setup: Directory, Logging, and Timestamp ---
    # Create a unique, timestamped directory name for this specific experimental run
    # to ensure that artifacts from different runs are isolated.
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    run_name = f"{regime}_{smoother}_{timestamp}"
    # Define the full path for this run's output directory.
    output_dir = os.path.join(base_output_dir, run_name)
    # Create the directory. `exist_ok=True` prevents an error if it already exists.
    os.makedirs(output_dir, exist_ok=True)

    # Set up a dedicated logger for this run, directing output to a unique log file.
    logger = _setup_logger(run_name, os.path.join(output_dir, f"pipeline_log_{run_name}.log"))

    # Log the start of the pipeline run with its unique identifier.
    logger.info(f"STARTING PIPELINE RUN: {run_name}")
    # Record the start time to calculate the total execution duration.
    start_time = time.time()

    # Initialize the final results dictionary with metadata about the run.
    final_results = {
        "run_info": {"name": run_name, "regime": regime, "smoother": smoother, "status": "IN_PROGRESS"},
        "artifacts": {"output_directory": output_dir}
    }

    # Use a top-level try...except...finally block for robust error handling and cleanup.
    try:
        # --- 1. Data Preparation (Tasks 1-6) ---
        logger.info("--- STAGE 1: DATA PREPARATION ---")
        # Validate the configuration dictionary.
        config = validate_config(config, output_dir)
        # Validate and enforce the schema of the raw DataFrame.
        df_validated = validate_and_enforce_schema(df_raw, output_dir)
        # Apply universe filters and core data cleansing.
        df_clean = cleanse_and_filter_universe(df_validated, config, output_dir)
        # Construct the raw characteristic signals.
        df_chars_raw = construct_characteristics(df_clean, config, output_dir)
        # Perform cross-sectional rank normalization.
        df_normalized = rank_normalize_characteristics(df_chars_raw, config, output_dir)
        # Form the final 3D tensor and observed set mask.
        tensor_X, mask_observed, time_to_idx, firm_to_idx = form_tensor_and_observed_set(
            df_normalized, config, output_dir, output_dir
        )

        # --- 2. Experimental Setup (Tasks 7-10) ---
        logger.info("--- STAGE 2: EXPERIMENTAL SETUP ---")
        # Generate all three evaluation masks to ensure they are disjoint.
        mask_mar = create_mar_mask(mask_observed, config, output_dir)
        mask_block = create_block_mask(mask_observed, config, output_dir, existing_masks=[mask_mar])
        mask_logit = create_logit_mask(
            mask_observed, df_clean, df_chars_raw, time_to_idx, firm_to_idx,
            config, output_dir, existing_masks=[mask_mar, mask_block]
        )

        # Select the specific evaluation mask for the current experimental regime.
        eval_masks = {"MAR": mask_mar, "Block": mask_block, "Logit": mask_logit}
        mask_eval = eval_masks[regime]

        # Create the training tensor and mask by removing the evaluation set.
        create_training_tensors(tensor_X, mask_observed, [regime], output_dir)
        X_train = np.load(os.path.join(output_dir, f"X_train_{regime}.npz"))['X_train']
        mask_train = np.load(os.path.join(output_dir, f"mask_train_{regime}.npz"))['mask']

        # --- 3. ACT-Tensor Imputation & Smoothing (Tasks 11-18) ---
        logger.info("--- STAGE 3: ACT-TENSOR IMPUTATION & SMOOTHING ---")
        clusters = cluster_firms_by_profile(X_train, firm_to_idx, config, output_dir)
        dense_ids, sparse_ids = compute_densities_and_partition(
            X_train, mask_train, clusters, config, output_dir
        )
        complete_dense_clusters(dense_ids, config, output_dir)
        complete_sparse_clusters(sparse_ids, dense_ids, clusters, X_train, mask_train, config, output_dir)
        X_hat = assemble_completed_tensor(clusters, X_train.shape, mask_train, X_train, config, output_dir)
        smoother_func = {"CMA": apply_cma_smoothing, "EMA": apply_ema_smoothing, "KF": apply_kf_smoothing}[smoother]
        X_tilde_act_tensor = smoother_func(X_hat, config, output_dir)

        # --- 4. Imputation Accuracy Evaluation (Tasks 19-20) ---
        logger.info("--- STAGE 4: IMPUTATION ACCURACY EVALUATION (ACT-TENSOR) ---")
        final_results["imputation_accuracy_main"] = evaluate_imputation_accuracy(tensor_X, X_tilde_act_tensor, mask_eval)
        final_results["imputation_accuracy_sparse"] = evaluate_sparse_cluster_accuracy(tensor_X, X_tilde_act_tensor, mask_eval, clusters, sparse_ids)

        # --- 5. Downstream Asset Pricing Evaluation (Tasks 28-34) for ACT-Tensor ---
        logger.info("--- STAGE 5: ASSET PRICING EVALUATION (ACT-TENSOR) ---")
        final_results["asset_pricing_performance"] = _run_downstream_pipeline(
            X_tilde_act_tensor, df_clean, time_to_idx, firm_to_idx, config, output_dir, f"ACT-Tensor_{smoother}"
        )

        # --- Optional Analyses (Full Implementation) ---
        if run_baselines:
            logger.info("--- STAGE 6: BASELINE EVALUATION (IMPUTATION & ASSET PRICING) ---")
            # Run all baseline imputation methods.
            baseline_tensors = run_baseline_imputations(X_train, time_to_idx, firm_to_idx, output_dir)

            # Evaluate the imputation accuracy of each baseline.
            baseline_imputation_results = evaluate_baseline_methods(tensor_X, regime, list(baseline_tensors.keys()), output_dir)
            final_results["baseline_imputation_performance"] = baseline_imputation_results

            # Run the full downstream asset pricing pipeline for each baseline.
            baseline_ap_results = []
            for method_name, X_tilde_baseline in baseline_tensors.items():
                ap_metrics = _run_downstream_pipeline(
                    X_tilde_baseline, df_clean, time_to_idx, firm_to_idx, config, output_dir, f"Baseline_{method_name}"
                )
                baseline_ap_results.append({"Method": method_name, "Regime": regime, **ap_metrics})
            final_results["baseline_asset_pricing_performance"] = baseline_ap_results

        if run_ablations:
            logger.info("--- STAGE 7: ABLATION STUDIES ---")
            ablation_results = []
            metrics_cp_only = run_ablation_cp_only(X_train, mask_train, tensor_X, mask_eval, config, output_dir)
            ablation_results.append({"Method": "CP Completion", "Regime": regime, **metrics_cp_only})
            metrics_cp_clust = run_ablation_cp_with_clustering(X_train, mask_train, tensor_X, mask_eval, firm_to_idx, config, output_dir)
            ablation_results.append({"Method": "CP Completion w/ Clustering", "Regime": regime, **metrics_cp_clust})
            metrics_cp_smooth = run_ablation_cp_with_smoothing(X_train, mask_train, tensor_X, mask_eval, config, output_dir)
            ablation_results.append({"Method": "CP Completion w/ CMA", "Regime": regime, **metrics_cp_smooth})
            final_results["ablation_studies"] = ablation_results

        if run_stability_test:
            logger.info("--- STAGE 8: STABILITY TEST ---")
            stability_df = run_regularization_stability_test(
                regime, X_train, mask_train, tensor_X, mask_eval, clusters,
                dense_ids, sparse_ids, config, output_dir
            )
            final_results["stability_test_results"] = stability_df.to_dict('records')

        # Mark the run as successful in the final summary.
        final_results["run_info"]["status"] = "SUCCESS"

    except Exception as e:
        # If any part of the pipeline fails, log the full error and traceback.
        logger.error(f"PIPELINE FAILED: An unhandled exception occurred.", exc_info=True)
        # Update the status in the final summary.
        final_results["run_info"]["status"] = "FAILED"
        final_results["run_info"]["error_message"] = str(e)

    finally:
        # --- Finalization ---
        # This block executes regardless of success or failure.
        end_time = time.time()
        duration = end_time - start_time
        final_results["run_info"]["execution_time_seconds"] = duration
        logger.info(f"PIPELINE FINISHED. Status: {final_results['run_info']['status']}. Total duration: {duration:.2f} seconds.")

        # Save the final, comprehensive summary dictionary to a JSON file.
        summary_path = os.path.join(output_dir, "run_summary.json")
        with open(summary_path, 'w') as f:
            # Define a custom JSON encoder to handle NumPy data types for serialization.
            class NpEncoder(json.JSONEncoder):
                def default(self, obj):
                    if isinstance(obj, (np.integer, np.floating)): return float(obj)
                    if isinstance(obj, np.ndarray): return obj.tolist()
                    return super(NpEncoder, self).default(obj)
            # Write the dictionary to the file with pretty printing.
            json.dump(final_results, f, indent=4, cls=NpEncoder)

        # Clean up logging handlers to prevent interference with subsequent runs in the same session.
        logging.getLogger(run_name).handlers.clear()

    # Return the final results dictionary.
    return final_results



In [None]:
# Task 36: Execute the Full Pipeline for All Regimes and Smoothers

# ==============================================================================
# Task 36: Execute the Full Pipeline for All Regimes and Smoothers
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 36, Helper for Compiling Imputation Tables
# ------------------------------------------------------------------------------
def compile_imputation_results(
    df_imputation: pd.DataFrame,
    df_ablation: pd.DataFrame,
    output_dir: str
) -> None:
    """
    Generates and saves all final, formatted imputation accuracy tables (Panels A, B, C).

    This function takes the parsed DataFrames of imputation and ablation results,
    formats them into the three distinct panels as seen in the paper, and saves
    them as both CSV and styled LaTeX files.

    Args:
        df_imputation: A DataFrame containing the main and baseline imputation
                       results for both 'Overall' and 'Sparse' scopes.
        df_ablation: A DataFrame containing the results of the ablation studies.
        output_dir: The directory in which to save the final table artifacts.
    """
    # --- Input Validation ---
    if not isinstance(df_imputation, pd.DataFrame) or not isinstance(df_ablation, pd.DataFrame):
        raise TypeError("Inputs must be pandas DataFrames.")

    # --- Define Table Structure ---
    # Define the canonical order of methods for table rows to ensure consistency.
    method_order_main = [
        "Cross-Sectional Median", "Global BF-XS", "Local B-XS",
        "ACT-Tensor w/ EMA", "ACT-Tensor w/ KF", "ACT-Tensor w/ CMA"
    ]
    method_order_ablation = [
        "CP Completion", "CP Completion w/ Clustering",
        "CP Completion w/ CMA", "ACT-Tensor w/ CMA"
    ]
    # Define the order of metrics for table columns.
    metric_cols = ['RMSE_imp', 'MAE_imp', 'MAPE_imp', 'R2_imp']
    # Specify which metrics are better when higher (for styling purposes).
    higher_is_better_imp = ['R2_imp']

    # --- Generate Each Panel ---
    # Use a loop to generate Panels A, B, and C with their respective configurations.
    for panel, scope, df, order in [
        ("A", "Overall", df_imputation, method_order_main),
        ("B", "Sparse", df_imputation, method_order_main),
        ("C", "Overall", df_ablation, method_order_ablation)
    ]:
        # Filter the data for the current panel based on its scope.
        df_panel_data = df[df['Scope'] == scope] if 'Scope' in df.columns else df

        # If no data exists for this panel, log a warning and skip.
        if df_panel_data.empty:
            logging.warning(f"No data found for Imputation Panel {panel}. Skipping.")
            continue

        # Pivot the table to the desired layout: Methods as index, Regime/Metrics as columns.
        df_panel = df_panel_data.pivot(index='Method', columns='Regime', values=metric_cols)

        # Enforce the canonical row order and remove any all-NaN rows that might result.
        if order:
            df_panel = df_panel.reindex(order).dropna(how='all')

        # Save the unstyled, rounded data to a CSV file for easy analysis.
        df_panel.round(4).to_csv(os.path.join(output_dir, f"table_imputation_panel_{panel}.csv"))

        # Generate and save the styled LaTeX table with color-coding.
        _style_and_save_table(df_panel, higher_is_better_imp, f"table_imputation_panel_{panel}_styled", output_dir)

        # Log the successful generation of the panel.
        logging.info(f"Generated and saved Imputation Results Panel {panel}.")


# ------------------------------------------------------------------------------
# Task 36, Helper for Final Aggregation and Reporting
# ------------------------------------------------------------------------------
def _aggregate_and_generate_final_tables(
    summary_paths: List[str],
    base_output_dir: str
) -> Dict[str, str]:
    """
    Parses all run summaries and generates all final publication artifacts.

    This is the master reporting function. It ingests the raw JSON outputs from
    all experimental runs, parses them into structured DataFrames, and then calls
    dedicated compiler functions to produce all the final tables and figures
    required for the paper's replication.

    Args:
        summary_paths: A list of file paths to the `run_summary.json` files.
        base_output_dir: The root directory to save the final artifacts.

    Returns:
        A dictionary containing the paths to the key output directories.
    """
    # Log the start of the final aggregation stage.
    logging.info(f"\n{'='*80}\nAGGREGATING FINAL RESULTS FROM {len(summary_paths)} RUNS.\n{'='*80}")

    # --- Setup: Create output directories ---
    # Create dedicated subdirectories for the final aggregated artifacts.
    tables_dir = os.path.join(base_output_dir, "final_tables")
    figures_dir = os.path.join(base_output_dir, "final_figures")
    os.makedirs(tables_dir, exist_ok=True)
    os.makedirs(figures_dir, exist_ok=True)

    # --- Step 1: Parse all raw summary files ---
    # Call the parsing helper to convert all JSONs into a dictionary of DataFrames.
    try:
        parsed_data = _parse_run_summaries(summary_paths)
    except ValueError as e:
        logging.error(f"Failed to parse summary files: {e}. Aborting artifact generation.")
        return {}

    # --- Step 2: Generate All Final Tables ---
    # Generate the three imputation tables (Panels A, B, C) by calling the new compiler.
    compile_imputation_results(
        df_imputation=parsed_data["imputation"],
        df_ablation=parsed_data["ablation"],
        output_dir=tables_dir
    )

    # Generate the final asset pricing table if data is available.
    if not parsed_data["asset_pricing"].empty:
        # This call now includes results from both ACT-Tensor and baselines.
        compile_asset_pricing_results(
            results_list=parsed_data["asset_pricing"].to_dict('records'),
            output_dir=tables_dir
        )

    # --- Step 3: Generate All Final Figures ---
    # Generate stability plots if data is available.
    df_stability = parsed_data["stability"]
    if not df_stability.empty:
        # This assumes a simple plotting helper `_generate_stability_plots` exists.
        _generate_stability_plots(df_stability, figures_dir)

    # Log the completion of the artifact generation process.
    logging.info("All final publication artifacts have been successfully generated.")

    # Return a dictionary of paths to the key output directories for verification.
    return {
        "imputation_tables_dir": tables_dir,
        "asset_pricing_tables_dir": tables_dir,
        "figures_dir": figures_dir
    }


# ------------------------------------------------------------------------------
# Task 36, Main Execution Function
# ------------------------------------------------------------------------------
def execute_full_experimental_suite(
    data_path: str,
    config_path: str,
    base_output_dir: str
) -> Dict[str, Any]:
    """
    Executes the full suite of ACT-Tensor experiments and aggregates all results.

    This master function is the single entry point for a complete replication.
    It systematically iterates through all experimental conditions, calls the main
    pipeline orchestrator for each, and then calls a final aggregation function
    to produce all publication-ready tables and figures.

    Args:
        data_path: The file path to the raw input DataFrame (e.g., Parquet file).
        config_path: The file path to the main JSON configuration file.
        base_output_dir: The root directory where all experimental results will be saved.

    Returns:
        A dictionary containing the paths to the final aggregated result artifacts.
    """
    # --- Step 1: Load Global Inputs and Set Seed ---
    # Create the base output directory if it doesn't exist.
    os.makedirs(base_output_dir, exist_ok=True)
    # Log the start of the entire process.
    logging.info("--- MASTER SCRIPT: STARTING FULL EXPERIMENTAL SUITE ---")

    # Load the raw financial data panel, with robust error handling.
    try:
        df_raw = pd.read_parquet(data_path)
    except FileNotFoundError as e:
        logging.error(f"FATAL: Could not load raw data file at '{data_path}'. Error: {e}. Exiting.")
        raise

    # Load the main configuration file, with robust error handling.
    try:
        with open(config_path, 'r') as f:
            config = json.load(f)
    except FileNotFoundError as e:
        logging.error(f"FATAL: Could not load config file at '{config_path}'. Error: {e}. Exiting.")
        raise

    # Set the global random seeds from the configuration for top-level reproducibility.
    np.random.seed(config["reproducibility"]["seed_global"])
    random.seed(config["reproducibility"]["seed_global"])

    # --- Step 2: Execute the Orchestrator for All Combinations ---
    # Define the full factorial experimental design.
    regimes_to_run: List[str] = ["MAR", "Block", "Logit"]
    smoothers_to_run: List[str] = ["CMA", "EMA", "KF"]
    # This list will collect the paths to the summary file from each successful run.
    summary_file_paths: List[str] = []

    # Loop through every combination of regime and smoother.
    for regime in regimes_to_run:
        for smoother in smoothers_to_run:
            # Define a unique prefix for this experimental condition.
            run_name_prefix = f"{regime}_{smoother}"
            logging.info(f"\n{'='*80}\nPreparing experiment: {run_name_prefix}\n{'='*80}")

            # --- Resumability Logic ---
            # Check if a summary file already exists for a run with this prefix.
            existing_summary_path = None
            for run_dir in os.listdir(base_output_dir):
                if run_dir.startswith(run_name_prefix):
                    summary_path = os.path.join(base_output_dir, run_dir, "run_summary.json")
                    if os.path.exists(summary_path):
                        logging.warning(f"Found existing completed run in '{run_dir}'. Loading path and skipping.")
                        existing_summary_path = summary_path
                        break

            # If a completed run was found, add its path and continue to the next experiment.
            if existing_summary_path:
                summary_file_paths.append(existing_summary_path)
                continue

            # Execute the full end-to-end pipeline for one experimental condition.
            run_summary = run_act_tensor_pipeline(
                df_raw=df_raw, config=config, regime=regime, smoother=smoother,
                run_baselines=True, run_ablations=True,
                run_stability_test=(regime in ["Block", "Logit"]),
                base_output_dir=base_output_dir
            )

            # Collect the path to the summary file if the run was successful.
            if run_summary.get("run_info", {}).get("status") == "SUCCESS":
                run_output_dir = run_summary.get("artifacts", {}).get("output_directory")
                if run_output_dir:
                    summary_file_paths.append(os.path.join(run_output_dir, "run_summary.json"))

    # --- Step 3: Aggregate All Results into Final Artifacts ---
    # Call the dedicated aggregation helper to generate all final tables and figures.
    final_artifact_paths = _aggregate_and_generate_final_tables(summary_file_paths, base_output_dir)

    # Log the successful completion of the entire suite of experiments.
    logging.info("--- MASTER SCRIPT: EXECUTION AND AGGREGATION FINISHED ---")

    # Return the dictionary of paths to the final generated artifacts.
    return final_artifact_paths



In [None]:
# Task 37: Generate Publication-Ready Tables and Figures

# ==============================================================================
# Task 37: Generate Publication-Ready Tables and Figures
# ==============================================================================

# ------------------------------------------------------------------------------
# Task 37, Helper for Parsing Raw JSON Summaries
# ------------------------------------------------------------------------------
def _parse_run_summaries(summary_paths: List[str]) -> Dict[str, pd.DataFrame]:
    """
    Parses all raw run_summary.json files into clean, aggregated DataFrames.

    This function reads a list of JSON summary files generated by the main
    pipeline, extracts all relevant metrics for imputation, asset pricing,
    ablations, and other tests, and consolidates them into separate, well-structured
    pandas DataFrames for easy analysis and reporting.

    Args:
        summary_paths: A list of file paths to the `run_summary.json` files
                       from all experimental runs.

    Returns:
        A dictionary of pandas DataFrames, with keys like 'imputation',
        'ablation', 'asset_pricing', and 'stability'.

    Raises:
        ValueError: If no successful runs are found in the provided summary files.
    """
    # Load all specified JSON files into a list of dictionaries.
    all_summaries = []
    for path in summary_paths:
        try:
            with open(path, 'r') as f:
                all_summaries.append(json.load(f))
        except (FileNotFoundError, json.JSONDecodeError) as e:
            logging.error(f"Could not load or parse summary file {path}. Skipping. Error: {e}")
            continue

    # Filter for runs that completed successfully.
    successful_runs = [s for s in all_summaries if s.get("run_info", {}).get("status") == "SUCCESS"]
    if not successful_runs:
        raise ValueError("No successful runs found to aggregate.")

    # Initialize record lists for each category of result.
    records = {"imputation": [], "ablation": [], "asset_pricing": [], "stability": []}

    # Iterate through each successful run and extract all relevant metrics.
    for summary in successful_runs:
        # Get metadata for the current run.
        info = summary.get('run_info', {})
        method = f"ACT-Tensor w/ {info.get('smoother')}"
        regime = info.get('regime')

        # Extract main and sparse imputation results for the primary model.
        for scope, key in [("Overall", "imputation_accuracy_main"), ("Sparse", "imputation_accuracy_sparse")]:
            if key in summary:
                records["imputation"].append({"Method": method, "Regime": regime, "Scope": scope, **summary[key]})

        # Extract imputation results for baseline models.
        if "baseline_imputation_performance" in summary:
            records["imputation"].extend(summary["baseline_imputation_performance"])

        # Extract ablation study results.
        if "ablation_studies" in summary:
            records["ablation"].extend(summary["ablation_studies"])

        # Extract asset pricing results for the primary model.
        if "asset_pricing_performance" in summary:
            records["asset_pricing"].append({"Method": method, "Regime": regime, **summary['asset_pricing_performance']})

        # Extract asset pricing results for baseline models.
        if "baseline_asset_pricing_performance" in summary:
            records["asset_pricing"].extend(summary["baseline_asset_pricing_performance"])

        # Extract stability test results.
        if "stability_test_results" in summary:
            for res in summary["stability_test_results"]:
                records["stability"].append({"Regime": regime, **res})

    # Convert lists of records to DataFrames, dropping duplicates to ensure uniqueness.
    return {key: pd.DataFrame(val).drop_duplicates().reset_index(drop=True) for key, val in records.items()}

# ------------------------------------------------------------------------------
# Task 37, Helper for Formatting and Saving Tables
# ------------------------------------------------------------------------------
def _style_and_save_table(
    df: pd.DataFrame,
    higher_is_better_cols: List[str],
    table_name: str,
    output_dir: str
) -> None:
    """
    Applies conditional color formatting to a DataFrame and saves it as a styled LaTeX table.

    This function takes a DataFrame of results, ranks each column, and applies
    a green color scale to the top 3 performers. It then exports the styled
    table to a .tex file, complete with the necessary LaTeX package headers.

    Args:
        df: The pandas DataFrame to be styled. It should be pivoted with methods
            as the index and metrics as columns.
        higher_is_better_cols: A list of column names for which a higher value
                               is better (e.g., 'IC', 'R2_imp'). For all other
                               columns, lower is assumed to be better.
        table_name: The base name for the output .tex file.
        output_dir: The directory in which to save the file.
    """
    # Define the color palette for highlighting (lightest to darkest green).
    colors = ['#d9ead3', '#b6d7a8', '#93c47d']

    # Create a pandas Styler object from the DataFrame.
    styler = df.style

    # Iterate through each column of the DataFrame to apply conditional styling.
    for col in df.columns:
        # Determine the ranking direction for this metric.
        is_ascending = col not in higher_is_better_cols

        # Rank the column values to identify the top performers. `method='min'` handles ties.
        ranks = df[col].rank(method='min', ascending=is_ascending)

        # Define a styling function that returns a CSS background-color style.
        def color_by_rank(val: float) -> str:
            # This inner function will be applied to each cell in the column.
            # It uses the pre-computed ranks to determine the color.
            rank = ranks[val] if pd.notna(val) else -1
            if rank == 1:
                return f'background-color: {colors[2]}'  # Darkest green for 1st place
            elif rank == 2:
                return f'background-color: {colors[1]}'  # Medium green for 2nd place
            elif rank == 3:
                return f'background-color: {colors[0]}'   # Lightest green for 3rd place
            return '' # No style for other ranks

        # Apply the styling function to the current column.
        styler.apply(lambda s: s.map(lambda v: color_by_rank(v)), subset=[col])

    # Apply a consistent floating-point format to all cells.
    styler.format("{:.4f}")

    # Generate the LaTeX string from the Styler object.
    # `convert_css=True` is essential for translating background colors to \cellcolor.
    latex_string = styler.to_latex(
        hrules=True,
        convert_css=True,
        multicol_align='c'
    )

    # Prepend the necessary LaTeX package headers for color and multi-row/col support.
    latex_header = "\\usepackage{colortbl}\n\\usepackage{multirow}\n"
    # Insert the headers before the `\begin{tabular}` environment.
    tabular_pos = latex_string.find("\\begin{tabular}")
    final_latex = latex_header + latex_string[:tabular_pos] + latex_string[tabular_pos:]

    # Write the final, complete LaTeX code to the output file.
    with open(os.path.join(output_dir, f"{table_name}.tex"), 'w') as f:
        f.write(final_latex)


# ------------------------------------------------------------------------------
# Task 37, Orchestrator Function
# ------------------------------------------------------------------------------
def generate_publication_artifacts(
    summary_paths: List[str],
    base_output_dir: str = "results"
) -> None:
    """
    Generates all publication-ready tables and figures from experimental results.

    This function serves as the final reporting step of the pipeline. It takes the
    raw, structured results from all experimental runs (via their summary files)
    and transforms them into the final tables and figures presented in the
    research paper, including formatted and color-coded LaTeX tables.

    Args:
        summary_paths: A list of file paths to the `run_summary.json` files
                       generated by `execute_full_experimental_suite`.
        base_output_dir: The root directory to save the final tables and figures.
    """
    # --- Setup: Create output directories ---
    # Create dedicated subdirectories for tables and figures to keep outputs organized.
    tables_dir = os.path.join(base_output_dir, "final_tables")
    figures_dir = os.path.join(base_output_dir, "final_figures")
    os.makedirs(tables_dir, exist_ok=True)
    os.makedirs(figures_dir, exist_ok=True)

    logging.info("--- Generating All Publication Artifacts ---")

    # --- Step 1: Parse all raw summary files into structured DataFrames ---
    try:
        parsed_data = _parse_run_summaries(summary_paths)
    except ValueError as e:
        logging.error(f"Failed to parse summary files: {e}. Aborting artifact generation.")
        return

    # --- Step 2: Generate Imputation Tables (Panels A, B, C) ---
    # Extract the relevant DataFrames from the parsed data.
    df_imputation = parsed_data["imputation"]
    df_ablation = parsed_data["ablation"]

    # Define the canonical order of methods for table rows.
    method_order_main = ["Cross-Sectional Median", "Global BF-XS", "Local B-XS", "ACT-Tensor w/ EMA", "ACT-Tensor w/ KF", "ACT-Tensor w/ CMA"]
    method_order_ablation = ["CP Completion", "CP Completion w/ Clustering", "CP Completion w/ CMA", "ACT-Tensor w/ CMA"]
    # Define the order of metrics for table columns.
    metric_cols = ['RMSE_imp', 'MAE_imp', 'MAPE_imp', 'R2_imp']
    # Specify which metrics are better when higher.
    higher_is_better_imp = ['R2_imp']

    # Generate each panel by filtering, pivoting, and styling.
    for panel, scope, df, order in [("A", "Overall", df_imputation, method_order_main),
                                    ("B", "Sparse", df_imputation, method_order_main),
                                    ("C", "Overall", df_ablation, method_order_ablation)]:
        # Filter the data for the current panel.
        df_panel_data = df[df['Scope'] == scope] if 'Scope' in df.columns else df
        if df_panel_data.empty: continue
        # Pivot the table to the desired layout (Methods as index, Regime/Metrics as columns).
        df_panel = df_panel_data.pivot(index='Method', columns='Regime', values=metric_cols)
        # Enforce the canonical row order.
        if order:
            df_panel = df_panel.reindex(order).dropna(how='all')
        # Save the unstyled CSV.
        df_panel.round(4).to_csv(os.path.join(tables_dir, f"table_imputation_panel_{panel}.csv"))
        # Generate and save the styled LaTeX table.
        _style_and_save_table(df_panel, higher_is_better_imp, f"table_imputation_panel_{panel}_styled", tables_dir)
        logging.info(f"Generated and saved Imputation Results Panel {panel}.")

    # --- Step 3: Generate Asset-Pricing Table ---
    df_ap = parsed_data["asset_pricing"]
    if not df_ap.empty:
        # Pivot the asset pricing data.
        df_ap_pivot = df_ap.pivot(index='Method', columns='Regime', values=['RMSE_alpha', 'MAE_alpha', 'MAE_Rank', 'IC', 'TB_Sharpe'])
        # Reorder rows to match the main method order.
        df_ap_pivot = df_ap_pivot.reindex(method_order_main).dropna(how='all')
        # Specify which metrics are better when higher.
        higher_is_better_ap = ['IC', 'TB_Sharpe']
        # Save the unstyled CSV.
        df_ap_pivot.round(4).to_csv(os.path.join(tables_dir, "table_asset_pricing.csv"))
        # Generate and save the styled LaTeX table.
        _style_and_save_table(df_ap_pivot, higher_is_better_ap, "table_asset_pricing_styled", tables_dir)
        logging.info("Generated and saved Asset Pricing Results Table.")

    # --- Step 4: Generate Stability Plots ---
    df_stability = parsed_data["stability"]
    if not df_stability.empty:
        # Generate one plot for each regime tested.
        for regime in df_stability['Regime'].unique():
            plt.figure(figsize=(10, 6))
            ax = sns.lineplot(data=df_stability[df_stability['Regime'] == regime], x='lambda', y='RMSE_imp', marker='o')
            ax.set_xscale('log')
            ax.set_title(f'RMSE Sensitivity to Regularization (λ) under {regime} Missingness')
            ax.set_xlabel('Regularization Strength (λ)')
            ax.set_ylabel('Out-of-Sample RMSE')
            plot_path = os.path.join(figures_dir, f"figure_stability_{regime}.pdf")
            plt.savefig(plot_path, bbox_inches='tight')
            plt.close()
            logging.info(f"Generated and saved stability plot for regime '{regime}'.")

    logging.info("All publication artifacts have been successfully generated.")


In [None]:
# Top-Level Orchestrator Function

# ==============================================================================
# Final Fused Task: A Singular Top-Level Pipeline Orchestrator
# ==============================================================================

# ------------------------------------------------------------------------------
# Final Fused Orchestrator Function
# ------------------------------------------------------------------------------
def run_complete_act_tensor_replication(
    data_path: str,
    config_path: str,
    base_output_dir: str
) -> Dict[str, Any]:
    """
    Executes the entire ACT-Tensor replication study and generates all final artifacts.

    This singular, top-level orchestrator serves as the master entry point for the
    entire project. It is a complete, production-grade "run-and-report" engine that:
    1. Manages the full experimental campaign by iterating through all specified
       regimes and smoothers.
    2. Calls the robust `run_act_tensor_pipeline` for each experimental condition,
       which handles the end-to-end workflow from data prep to evaluation.
    3. Implements resumability by skipping previously completed runs.
    4. After all experiments are complete, automatically aggregates the results
       from all runs.
    5. Generates all final, publication-ready tables and figures as specified in
       the study's design.

    Args:
        data_path: The file path to the raw input DataFrame (e.g., a Parquet file).
        config_path: The file path to the main JSON configuration file.
        base_output_dir: The root directory where all experimental results, logs,
                         and final artifacts will be saved.

    Returns:
        A dictionary containing the paths to the final aggregated result artifacts,
        confirming the successful completion of the entire study.

    Raises:
        FileNotFoundError: If the essential `data_path` or `config_path`
                           files are not found.
        Exception: Propagates any fatal error from the underlying pipeline.
    """
    # --- 1. Global Setup: Logging, Directory, and Input Loading ---
    # Create the base output directory if it doesn't exist.
    os.makedirs(base_output_dir, exist_ok=True)

    # Set up a master logger for the entire replication project.
    master_log_path = os.path.join(base_output_dir, "master_replication_log.log")
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(master_log_path, mode='w'),
            logging.StreamHandler()
        ]
    )

    # Log the start of the entire project.
    logging.info(f"{'='*80}")
    logging.info("STARTING COMPLETE ACT-TENSOR REPLICATION PIPELINE")
    logging.info(f"Master log will be saved to: {master_log_path}")
    logging.info(f"{'='*80}")

    # Load the raw financial data panel, with robust error handling.
    try:
        df_raw = pd.read_parquet(data_path)
    except FileNotFoundError as e:
        logging.error(f"FATAL: Could not load raw data file at '{data_path}'. Error: {e}. Exiting.")
        raise

    # Load the main configuration file, with robust error handling.
    try:
        with open(config_path, 'r') as f:
            config = json.load(f)
    except FileNotFoundError as e:
        logging.error(f"FATAL: Could not load config file at '{config_path}'. Error: {e}. Exiting.")
        raise

    # Set the global random seeds from the configuration for top-level reproducibility.
    np.random.seed(config["reproducibility"]["seed_global"])
    random.seed(config["reproducibility"]["seed_global"])

    # --- 2. Experimental Execution Loop (Task 36 Logic) ---
    # Define the full factorial experimental design.
    regimes_to_run: List[str] = ["MAR", "Block", "Logit"]
    smoothers_to_run: List[str] = ["CMA", "EMA", "KF"]
    # This list will collect the paths to the summary file from each successful run.
    summary_file_paths: List[str] = []

    # Loop through every combination of regime and smoother.
    for regime in regimes_to_run:
        for smoother in smoothers_to_run:
            # Define a unique prefix for this experimental condition.
            run_name_prefix = f"{regime}_{smoother}"
            logging.info(f"\n{'='*80}\nPreparing experiment: {run_name_prefix}\n{'='*80}")

            # --- Resumability Logic ---
            # Check if a summary file already exists for a run with this prefix.
            existing_summary_path = None
            for run_dir in os.listdir(base_output_dir):
                if run_dir.startswith(run_name_prefix):
                    summary_path = os.path.join(base_output_dir, run_dir, "run_summary.json")
                    if os.path.exists(summary_path):
                        logging.warning(f"Found existing completed run in '{run_dir}'. Loading path and skipping.")
                        existing_summary_path = summary_path
                        break

            # If a completed run was found, add its path and continue to the next experiment.
            if existing_summary_path:
                summary_file_paths.append(existing_summary_path)
                continue

            # --- Single Run Execution (Task 35 Logic) ---
            # Execute the full end-to-end pipeline for one experimental condition.
            run_summary = run_act_tensor_pipeline(
                df_raw=df_raw, config=config, regime=regime, smoother=smoother,
                run_baselines=True, run_ablations=True,
                run_stability_test=(regime in ["Block", "Logit"]),
                base_output_dir=base_output_dir
            )

            # Collect the path to the summary file if the run was successful.
            if run_summary.get("run_info", {}).get("status") == "SUCCESS":
                run_output_dir = run_summary.get("artifacts", {}).get("output_directory")
                if run_output_dir:
                    summary_file_paths.append(os.path.join(run_output_dir, "run_summary.json"))

    # --- 3. Final Aggregation and Reporting (Task 37 Logic) ---
    # Call the dedicated aggregation function to generate all final tables and figures.
    final_artifact_paths = generate_publication_artifacts(
        summary_paths=summary_file_paths,
        base_output_dir=base_output_dir
    )

    # Log the successful completion of the entire suite of experiments.
    logging.info(f"\n{'='*80}")
    logging.info("MASTER SCRIPT: EXECUTION AND AGGREGATION FINISHED SUCCESSFULLY.")
    logging.info(f"Final artifacts are located in: {final_artifact_paths}")
    logging.info(f"{'='*80}")

    # Return the dictionary of paths to the final generated artifacts.
    return final_artifact_paths
