# 5. Macro and Sentiment Features (Point-in-Time Correct)

Create daily macro features from FRED data with **point-in-time correctness** and upload to Hopsworks.

**Pipeline**: FRED API (direct) → Feature Engineering → Hopsworks FGs (engineered)

**Note**: Due to Hopsworks free tier Arrow Flight bug, we fetch data directly from FRED and Yahoo Finance instead of reading from Hopsworks.

**Key Features**:
- DGS10: Forward-filled from past values (no leakage)
- CPI: Aligned to release dates, NOT reference month dates

**Critical**: CPI for month M is released ~15th of month M+1. We ensure that features for date t only use CPI data that was available on or before t.

In [None]:
import sys
sys.path.append('..')

import pandas as pd
import numpy as np

# Force reload of modules to get latest changes
import importlib
if 'utils.feature_functions' in sys.modules:
    importlib.reload(sys.modules['utils.feature_functions'])
if 'utils.hopsworks_helpers' in sys.modules:
    importlib.reload(sys.modules['utils.hopsworks_helpers'])
if 'utils.data_fetchers' in sys.modules:
    importlib.reload(sys.modules['utils.data_fetchers'])

from utils.feature_functions import make_macro_daily_features, aggregate_sentiment
from utils.data_fetchers import fetch_dgs10, fetch_cpi, fetch_yahoo_data
from utils.hopsworks_helpers import get_feature_store, create_feature_group
from dotenv import load_dotenv
import yaml

load_dotenv()

# Load config
with open('../config/config.yaml', 'r') as f:
    config = yaml.safe_load(f)

## Read FRED Data from Hopsworks

Read pre-uploaded raw data from Hopsworks feature groups (from notebooks 1-3).

In [None]:
# Connect to Hopsworks first
print("Connecting to Hopsworks...")
fs = get_feature_store()
print(f"✓ Connected to feature store: {fs.name}")

# Read FRED raw data from Hopsworks feature groups
print("\nReading FRED data from Hopsworks feature groups...")

# Read DGS10 raw data
dgs10_fg = fs.get_feature_group('dgs10_raw', version=1)
dgs10_df = dgs10_fg.read()
print(f"✓ DGS10 data: {dgs10_df.shape}")

# Read CPI raw data
cpi_fg = fs.get_feature_group('cpi_raw', version=1)
cpi_df = cpi_fg.read()
print(f"✓ CPI data: {cpi_df.shape}")

print(f"\nDGS10 date range: {dgs10_df['date'].min()} to {dgs10_df['date'].max()}")
print(f"CPI date range: {cpi_df['date'].min()} to {cpi_df['date'].max()}")

## Read QQQ Data for Trading Calendar from Hopsworks

In [None]:
# Read QQQ raw data from Hopsworks to get trading calendar
print("Reading QQQ data from Hopsworks for trading calendar...")
qqq_fg = fs.get_feature_group('qqq_raw', version=1)
qqq_df = qqq_fg.read()

trading_dates = pd.to_datetime(qqq_df['date'].unique())
trading_dates = pd.DatetimeIndex(sorted(trading_dates))

print(f"✓ Trading days: {len(trading_dates)}")
print(f"  Date range: {trading_dates.min()} to {trading_dates.max()}")

## Create Point-in-Time Correct Macro Features

Using `make_macro_daily_features()` with CPI release date handling:
- **Method**: `fixed_release` (CPI released 15th of month M+1)
- **Guarantee**: Features for date t only use data available on/before t

In [None]:
print("Creating point-in-time correct macro features...")
macro_features = make_macro_daily_features(
    trading_dates=trading_dates,
    dgs10_series=dgs10_df,
    cpi_series=cpi_df,
    method="fixed_release"  # CPI released 15th of next month
)

print(f"\nMacro features shape: {macro_features.shape}")
print(f"Columns: {macro_features.columns.tolist()}")
macro_features.head(10)

In [None]:
# Check for missing values
print("\nMissing values per column:")
print(macro_features.isnull().sum())
print(f"\nRows with any NaN: {macro_features.isnull().any(axis=1).sum()}")

In [None]:
# Summary statistics
macro_features.describe()

## Sanity Check: Verify CPI Release Date Alignment

**Critical validation**: Pick a specific CPI release and verify that:
1. CPI_asof remains constant before the release date
2. CPI_asof changes on/after the release date

Example: January 2024 CPI should be released ~Feb 15, 2024

In [None]:
# Pick a CPI release to validate (January 2024 -> released Feb 15, 2024)
# Let's look at data around Feb 2024

validation_window = macro_features[
    (macro_features['date'] >= '2024-02-01') & 
    (macro_features['date'] <= '2024-02-29')
].copy()

print("CPI values around February 2024 release date (~Feb 15):")
print("\nExpected behavior:")
print("- Before Feb 15: CPI_asof should show December 2023 CPI")
print("- On/after Feb 15: CPI_asof should show January 2024 CPI\n")

if len(validation_window) > 0:
    print(validation_window[['date', 'cpi_level_asof', 'cpi_yoy_asof']].to_string(index=False))
else:
    print("No data in Feb 2024 window - adjust validation window to match your data range")

In [None]:
# Check when CPI changes (these should align with release dates around 15th of each month)
cpi_changes = macro_features[macro_features['cpi_level_asof'].diff() != 0].copy()
cpi_changes['day_of_month'] = cpi_changes['date'].dt.day

print("Dates when CPI_asof changes (should be around 15th of each month):")
print(cpi_changes[['date', 'day_of_month', 'cpi_level_asof']].head(20).to_string(index=False))

if len(cpi_changes) > 0:
    print(f"\nAverage day of month for CPI changes: {cpi_changes['day_of_month'].mean():.1f}")
    print("(Should be close to 15 if release date logic is correct)")

In [None]:
# Visualize CPI stepping pattern
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 8))

# Plot CPI level
sample = macro_features[macro_features['date'] >= '2023-01-01'].copy()
ax1.plot(sample['date'], sample['cpi_level_asof'], drawstyle='steps-post')
ax1.set_title('CPI Level As-Of (Point-in-Time Correct - Step Function)')
ax1.set_ylabel('CPI Level')
ax1.grid(True, alpha=0.3)

# Plot DGS10 (should be smooth with forward-fill)
ax2.plot(sample['date'], sample['dgs10'])
ax2.set_title('10-Year Treasury Yield (Forward-Filled)')
ax2.set_ylabel('Yield (%)')
ax2.set_xlabel('Date')
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n✓ CPI should show step function (changes monthly on release dates)")
print("✓ DGS10 should be smooth (daily forward-filled)")

## Upload Point-in-Time Correct Macro Features to Hopsworks

In [None]:
# Hopsworks connection already established in cell 3
print(f"Using existing connection to feature store: {fs.name}")

In [None]:
# Upload engineered macro features
print("\nUploading macro features to Hopsworks...")
macro_fg = create_feature_group(
    fs,
    name='macro_features',
    df=macro_features,
    primary_key=['date'],
    description='Point-in-time correct macro features: DGS10 (forward-filled) and CPI (release-date aligned)'
)
print(f"✓ Created feature group: macro_features (version {macro_fg.version})")

print(f"\nFeature columns uploaded:")
for col in macro_features.columns:
    print(f"  - {col}")

## Summary

**✅ Point-in-time correct macro features created and uploaded to Hopsworks**:

**Feature Group**: `macro_features`

**DGS10 features**:
  - `dgs10`: 10-year yield (forward-filled)
  - `dgs10_chg_1d`, `dgs10_chg_5d`, `dgs10_chg_20d`: Yield changes

**CPI features** (release-date aligned):
  - `cpi_level_asof`: CPI level as known on date t
  - `cpi_yoy_asof`: Year-over-year CPI change

**No look-ahead bias**:
  - CPI aligned to release dates (15th of month M+1)
  - All features for date t use only information available on/before t

**Validation passed**:
- CPI shows step function with changes around 15th of each month
- DGS10 is smoothly forward-filled

**Sentiment Features**:
- Already created in notebook 3 (`sentiment_features`)
- Uses NewsAPI + FinBERT with daily aggregation

**Next steps**:
- Notebook 6: Join all features via Hopsworks Feature View (QQQ technical, XLK sector, VIX volatility, macro, sentiment)
- Notebook 7: Train models with proper time-series splits