### Data Pipeline — Sector Builds with Macro As-Of Merge, Rolling Transforms, and Manifests


In [1]:
import sys
import os
import pandas as pd
import numpy as np
import warnings

from datetime import datetime
from sklearn.pipeline import Pipeline

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
warnings.filterwarnings('ignore')

# Transform utilities (assumed to exist in your repo)
from src.transforms.macro_asof import AsOfMacroMerger
from src.transforms.rolling import RollingImputer, RollingStandardizer, Winsorizer
from src.transforms.pruning import drop_sparse_columns
from src.transforms.manifests import ManifestWriter, hash_df_map

# QA helpers (defined below in src/qa/)
from src.qa.validators import validate_df, assert_no_future, assert_monotonic

PUBLISH_LAGS = {
    "CPIAUCSL": 2, "DFF": 1, "DGS10": 1,
    "UNRATE": 2, "GDPC1": 5, "RSAFS": 3, "PAYEMS": 2
}

from src.stock_features import (
    build_stock_features_orchestrator,
    build_sector_base_features,
    make_target_view
)

from src.macro_features import (
    macro_data_orchestrator,
    normalize_date_index,
    prepare_macro_for_daily_merge,
    merge_stocks_and_macros,
    build_macro_state_features,
    apply_publish_lags
)

# -------- Config --------
base_output_dir = r"C:\Users\epoch_bpjmdqk\Documents\Code\data\processed"
macro_folder    = r"C:\Users\epoch_bpjmdqk\Documents\Code\data\raw"
os.makedirs(base_output_dir, exist_ok=True)

In [2]:
# -------- Sector definitions --------
SECTORS = {
    "staples": {
        "tickers": ["WMT","PG","KO","PEP","COST","CL","CLX","KMB","GIS","MDLZ","KR","TGT","XLP","^GSPC"],
        "sector_etf": "XLP",
    },
    "discretionary": {
        "tickers": ["AMZN","HD","MCD","NKE","SBUX","TJX","LOW","BKNG","ROST","MAR","XLY","^GSPC"],
        "sector_etf": "XLY",
    },
    "healthcare": {
        "tickers": ["UNH","LLY","JNJ","ABBV","MRK","TMO","ABT","PFE","MDT","ISRG","CVS","HUM","XLV","^GSPC"],
        "sector_etf": "XLV",
    },
    "technology": {
        "tickers": ["AAPL","MSFT","NVDA","AVGO","ADBE","CRM","AMD","INTC","CSCO","QCOM","ORCL","TXN","XLK","^GSPC"],
        "sector_etf": "XLK",
    },
    "financials": {
        "tickers": ["JPM","BAC","WFC","MS","GS","C","BLK","PGR","AXP","USB","SCHW","CB","XLF","^GSPC"],
        "sector_etf": "XLF",
    },
    "energy": {
        "tickers": ["XOM","CVX","COP","EOG","SLB","OXY","PSX","MPC","VLO","HAL","KMI","XLE","^GSPC"],
        "sector_etf": "XLE",
    },
    "industrials": {
        "tickers": ["CAT","BA","HON","GE","UPS","UNP","DE","RTX","LMT","ETN","EMR","MMM","XLI","^GSPC"],
        "sector_etf": "XLI",
    },
    "utilities": {
        "tickers": ["NEE","SO","DUK","AEP","EXC","SRE","XEL","D","PEG","ED","XLU","^GSPC"],
        "sector_etf": "XLU",
    },
    "materials": {
        "tickers": ["LIN","APD","ECL","NEM","FCX","NUE","SHW","ALB","MLM","VMC","XLB","^GSPC"],
        "sector_etf": "XLB",
    },
    "communication_services": {
        "tickers": ["META","GOOGL","GOOG","NFLX","CMCSA","DIS","T","VZ","XLC","^GSPC"],
        "sector_etf": "XLC",
    },
    "real_estate": {
        "tickers": ["AMT","PLD","EQIX","PSA","SPG","CCI","O","WELL","XLRE","^GSPC"],
        "sector_etf": "XLRE",
    },
}

In [3]:
# (optional) date range
start_date_str = None  
end_date_str   = None  

FRED_series_ids = {
    'CPI': 'CPIAUCSL',
    'FEDERAL_FUNDS_RATE': 'DFF',
    'TREASURY_YIELD': 'DGS10',
    'UNEMPLOYMENT': 'UNRATE',
    'REAL_GDP': 'GDPC1',
    'RETAIL_SALES': 'RSAFS',
    'PAYEMS': 'PAYEMS'
}

macro_funcs = {'CPI','FEDERAL_FUNDS_RATE','TREASURY_YIELD','UNEMPLOYMENT','REAL_GDP','RETAIL_SALES','PAYEMS'}

# ---- Load / build macro data ----
try:
    macro_df = pd.read_csv(os.path.join(macro_folder, "macros.csv"),
                           index_col="date", parse_dates=["date"])
    macro_df.index = macro_df.index.tz_localize(None).normalize()
    print("Loaded existing macro data from CSV.")
except FileNotFoundError:
    macro_df = macro_data_orchestrator(macro_funcs_to_fetch=macro_funcs,
                                       fred_series_ids_dict=FRED_series_ids,
                                       start_date=start_date_str,
                                       save_path=macro_folder)

# keep a clean DatetimeIndex
macro_df = normalize_date_index(macro_df)

# prep for asof (returns a 'Date' column on business-day grid)
macro_daily = prepare_macro_for_daily_merge(macro_df)

# Apply series-specific publish lags to mimic availability (macro-only)
macro_daily = apply_publish_lags(macro_daily, PUBLISH_LAGS, date_col="Date")

# Validate macro
macro_daily = validate_df(macro_daily.set_index("Date"), "macro_daily")

# Enrich features; set publish_lags=None to avoid double-lagging
macro_daily = build_macro_state_features(macro_daily, publish_lags=None, pca_cols=None, use_hmm=False)
macro_daily = validate_df(macro_daily, "macro_daily+state")



Loaded existing macro data from CSV.
[validate] macro_daily: replaced 61621 ±inf with NaN
[validate] macro_daily+state: replaced 257656 ±inf with NaN
[validate] macro_daily+state: dropping all-NaN columns: ['regime_hmm', 'yc_level_10y_chg_5_surprise_proxy_z', 'carry_10y_ff_chg_5_surprise_proxy_z', 'regime_hmm_surprise_proxy_z']


In [None]:
# ---- Sector loop ----
for sector_name, cfg in SECTORS.items():
    tickers    = cfg["tickers"]
    sector_etf = cfg.get("sector_etf")

    # 1) Base features per sector (cacheable)
    base_path = os.path.join(base_output_dir, f"{sector_name}__BASE.parquet")
    if os.path.exists(base_path):
        base_df = pd.read_parquet(base_path)
        print(f"[cache] loaded {base_path}")
    else:
        base_df = build_sector_base_features(
            tickers=tickers,
            kalman_lags=[1,5,10],
            dropna_frac=0.90,
            output_path=base_path
        )
    base_df = validate_df(base_df, f"{sector_name}::BASE")
    base_df = drop_sparse_columns(base_df, 0.20)

    # 2) Target views + macro merge
    equities = [t for t in tickers if not t.startswith("^") and t != sector_etf]
    for target in equities:
        suppliers = [t for t in equities if t != target]
        print(f"\n--- {sector_name} :: target={target} ---")

        df_t = make_target_view(
            base_df,
            target_ticker=target,
            supplier_tickers=suppliers,
            benchmark_ticker="^GSPC",
            sector_etf=sector_etf
        )
        df_t = validate_df(df_t, f"{sector_name}::{target}::target_view")

        # As-of safe merge with macro
        df_t_for_merge   = df_t.reset_index().rename(columns={df_t.index.name or "index": "Date"})
        macro_for_merge  = macro_daily.reset_index()
        merged           = merge_stocks_and_macros(stock_df=df_t_for_merge, macro_df=macro_for_merge, tolerance_days=31)

        merged = merged.set_index("Date")
        merged = validate_df(merged, f"{sector_name}::{target}::merged")
        merged = drop_sparse_columns(merged, 0.20)

        # ---- Fast QA guards (fail fast) ----
        assert_monotonic(merged.index)
        assert_no_future(merged.index, macro_daily.index)
        if merged.index.duplicated().any():
            raise ValueError(f"[{sector_name}::{target}] Duplicate index rows detected")

        # Warm-up trim (indicators, rolling) and row-level NA filter
        if len(merged) > 60:
            merged = merged.iloc[60:]
            print(f"[finalize] {sector_name}::{target}: trimmed first 60 warm-up rows")

        min_non_null = int(0.85 * merged.shape[1])
        merged = merged.loc[merged.notna().sum(axis=1) >= min_non_null]

        # 3) Rolling, no-lookahead transforms
        pipe = Pipeline([
            # expanding quantile winsorization (shifted) — ~1%/99% tails, needs history
            ("winsor", Winsorizer(lower_q=0.01, upper_q=0.99, min_periods=200)),
            # expanding-mean imputer using only past data
            ("imputer", RollingImputer(min_periods=20)),
            # expanding z-score using only past data (optionally clip to avoid extreme z’s)
            ("scaler", RollingStandardizer(min_periods=60, clip_sigma=4.0)),
        ])
        merged_proc = pipe.fit_transform(merged)

        # 4) Manifest logging (data hash map)
        # manifest_path = os.path.join(base_output_dir, f"{sector_name}__{target}__manifest.json")
        # manifest = ManifestWriter(manifest_path)
        # manifest.write(
        #     source_paths=None,
        #     inputs_hash=hash_df_map(merged=merged_proc),
        #     rows=len(merged_proc),
        #     cols=merged_proc.shape[1],
        #     config={"sector": sector_name, "target": target}
        # )
        
        # 5) Save
        out_path = os.path.join(base_output_dir, f"{sector_name}__{target}.csv")
        merged_proc.to_csv(out_path, index=True)
        print(f"Saved {sector_name}::{target} → {out_path} rows={len(merged_proc):,} cols={merged_proc.shape[1]}")


--- Building sector BASE (target-agnostic) ---
Fetching data for 14 tickers...
Date range: All available history to Current date
Fetching full history for WMT...
Fetching full history for PG...
Fetching full history for KO...
Fetching full history for PEP...
Fetching full history for COST...
Fetching full history for CL...
Fetching full history for CLX...
Fetching full history for KMB...
Fetching full history for GIS...
Fetching full history for MDLZ...
Fetching full history for KR...
Fetching full history for TGT...
Fetching full history for XLP...
Fetching full history for ^GSPC...

Final merged DataFrame has 6092 common entries.
Discovered prefixes: ['CL', 'CLX', 'COST', 'GIS', 'KMB', 'KO', 'KR', 'MDLZ', 'PEP', 'PG', 'TGT', 'WMT', 'XLP', '^GSPC']
Dropped 49 rows (base pruning @ 90%).
Saved BASE → C:\Users\epoch_bpjmdqk\Documents\Code\data\processed\staples__BASE.parquet
[validate] staples::BASE: replaced 3467 ±inf with NaN

--- staples :: target=WMT ---
[validate] staples::WMT::tar