# Dataset Construction

Merges price features, fundamentals, and news embeddings into final ML dataset.

**Steps**:
1. Load price features (base table)
2. Apply liquidity filter ($5 price, $10M avg volume)
3. Point-in-time join fundamentals (45-day lag)
4. Join news embeddings by trading_date
5. Cross-sectional normalize all features
6. Handle missing values

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import timedelta

In [None]:
# Config
MIN_PRICE = 5.0
MIN_DOLLAR_VOL = 10_000_000  # $10M
FUNDAMENTAL_LAG_DAYS = 45  # Conservative 10-Q filing deadline

## 1. Load price features (base table)

In [None]:
prices = pd.read_parquet("data/price_features.pqt")
prices["feature_date"] = pd.to_datetime(prices["feature_date"])
prices["target_date"] = pd.to_datetime(prices["target_date"])

print(f"Price features: {len(prices):,} rows, {prices['symbol'].nunique():,} symbols")
print(f"Date range: {prices['feature_date'].min().date()} to {prices['feature_date'].max().date()}")

## 2. Apply liquidity filter

In [None]:
# Compute dollar volume
prices["dollar_vol"] = prices["close"] * prices["volume"]

# 20-day rolling average dollar volume (per symbol)
prices = prices.sort_values(["symbol", "feature_date"])
prices["avg_dollar_vol_20d"] = prices.groupby("symbol")["dollar_vol"].transform(
    lambda x: x.rolling(20, min_periods=10).mean()
)

print(f"Dollar vol stats:")
print(prices["avg_dollar_vol_20d"].describe())

In [None]:
# Apply filters
n_before = len(prices)
mask = (prices["close"] >= MIN_PRICE) & (prices["avg_dollar_vol_20d"] >= MIN_DOLLAR_VOL)
prices = prices[mask].copy()
n_after = len(prices)

print(f"Liquidity filter: {n_before:,} -> {n_after:,} ({n_after/n_before*100:.1f}%)")
print(f"Symbols remaining: {prices['symbol'].nunique():,}")

In [None]:
# Drop helper columns
prices = prices.drop(columns=["dollar_vol", "avg_dollar_vol_20d"])

## 3. Load and prepare fundamentals

In [None]:
# Load all fundamental data
metrics = pd.read_parquet("data/key_metrics.pqt")
ratios = pd.read_parquet("data/ratios.pqt")
growth = pd.read_parquet("data/growth.pqt")

print(f"Metrics: {len(metrics):,} rows")
print(f"Ratios: {len(ratios):,} rows")
print(f"Growth: {len(growth):,} rows")

In [None]:
# Select features (~15-20 focused features)
METRIC_COLS = [
    "evToEBITDA",           # Value
    "freeCashFlowYield",    # Value
    "earningsYield",        # Value
    "returnOnEquity",       # Quality
    "returnOnAssets",       # Quality
    "returnOnInvestedCapital",  # Quality
    "currentRatio",         # Liquidity
]

RATIO_COLS = [
    "priceToEarningsRatio",  # Value (P/E)
    "priceToBookRatio",      # Value
    "priceToSalesRatio",     # Value
    "grossProfitMargin",     # Quality
    "operatingProfitMargin", # Quality
    "netProfitMargin",       # Quality
    "debtToEquityRatio",     # Leverage
    "debtToAssetsRatio",     # Leverage
]

GROWTH_COLS = [
    "revenueGrowth",         # Growth
    "netIncomeGrowth",       # Growth
    "epsgrowth",             # Growth
    "operatingIncomeGrowth", # Growth
]

In [None]:
# Merge fundamentals into single table
metrics_sub = metrics[["symbol", "date"] + METRIC_COLS].copy()
ratios_sub = ratios[["symbol", "date"] + RATIO_COLS].copy()
growth_sub = growth[["symbol", "date"] + GROWTH_COLS].copy()

# Merge on symbol + date
fundamentals = metrics_sub.merge(ratios_sub, on=["symbol", "date"], how="outer")
fundamentals = fundamentals.merge(growth_sub, on=["symbol", "date"], how="outer")

print(f"Combined fundamentals: {len(fundamentals):,} rows")
print(f"Symbols: {fundamentals['symbol'].nunique():,}")

In [None]:
# Add available_date = period_end + 45 days (point-in-time)
fundamentals["period_end"] = pd.to_datetime(fundamentals["date"])
fundamentals["available_date"] = fundamentals["period_end"] + timedelta(days=FUNDAMENTAL_LAG_DAYS)

# Sort for asof merge
fundamentals = fundamentals.sort_values(["symbol", "available_date"])

print(f"Example available_date:")
print(fundamentals[["symbol", "period_end", "available_date"]].head())

In [None]:
# Point-in-time join: for each (symbol, feature_date), get most recent fundamental
# where available_date <= feature_date

fund_cols = METRIC_COLS + RATIO_COLS + GROWTH_COLS

def pit_join_fundamentals(prices_df: pd.DataFrame, fund_df: pd.DataFrame) -> pd.DataFrame:
    """Point-in-time join fundamentals to prices."""
    results = []
    
    for symbol in prices_df["symbol"].unique():
        price_sym = prices_df[prices_df["symbol"] == symbol].copy()
        fund_sym = fund_df[fund_df["symbol"] == symbol].copy()
        
        if fund_sym.empty:
            # No fundamentals for this symbol
            for col in fund_cols:
                price_sym[col] = np.nan
            price_sym["has_fundamentals"] = 0
            results.append(price_sym)
            continue
        
        # asof merge: for each feature_date, get most recent available_date
        price_sym = price_sym.sort_values("feature_date")
        fund_sym = fund_sym.sort_values("available_date")
        
        merged = pd.merge_asof(
            price_sym,
            fund_sym[["available_date"] + fund_cols],
            left_on="feature_date",
            right_on="available_date",
            direction="backward"
        )
        merged["has_fundamentals"] = merged[fund_cols[0]].notna().astype(int)
        merged = merged.drop(columns=["available_date"])
        results.append(merged)
    
    return pd.concat(results, ignore_index=True)

In [None]:
from tqdm.auto import tqdm
tqdm.pandas()

# This can be slow, show progress
print("Joining fundamentals (point-in-time)...")
df = pit_join_fundamentals(prices, fundamentals)

print(f"After fundamental join: {len(df):,} rows")
print(f"Has fundamentals: {df['has_fundamentals'].sum():,} ({df['has_fundamentals'].mean()*100:.1f}%)")

## 4. Load and join news embeddings

In [None]:
# Load embeddings and news (for trading_date)
embeddings = pd.read_parquet("data/news_embeddings.pqt")
news = pd.read_parquet("data/all_the_news_anon.pqt")

print(f"Embeddings: {len(embeddings):,} rows")
print(f"News: {len(news):,} rows")

In [None]:
# Get trading_date from news
news_meta = news[["url", "symbol", "trading_date"]].copy()
news_meta["trading_date"] = pd.to_datetime(news_meta["trading_date"])

# Join embeddings with trading_date
emb_with_date = embeddings.merge(news_meta, on=["url", "symbol"], how="inner")
print(f"Embeddings with trading_date: {len(emb_with_date):,}")

In [None]:
# Identify embedding columns
emb_cols = [c for c in embeddings.columns if c.startswith("emb_")]
print(f"Embedding dimension: {len(emb_cols)}")

In [None]:
# Aggregate: mean embedding + count per (symbol, trading_date)
emb_agg = emb_with_date.groupby(["symbol", "trading_date"]).agg(
    **{col: (col, "mean") for col in emb_cols},
    news_count=("url", "count")
).reset_index()

print(f"Aggregated embeddings: {len(emb_agg):,} (symbol, trading_date) pairs")

In [None]:
# Join embeddings to dataset
df = df.merge(
    emb_agg,
    left_on=["symbol", "feature_date"],
    right_on=["symbol", "trading_date"],
    how="left"
)
df = df.drop(columns=["trading_date"], errors="ignore")

# Fill missing news
df["news_count"] = df["news_count"].fillna(0).astype(int)
df[emb_cols] = df[emb_cols].fillna(0)

print(f"After news join: {len(df):,} rows")
print(f"Rows with news: {(df['news_count'] > 0).sum():,} ({(df['news_count'] > 0).mean()*100:.1f}%)")

## 5. Cross-sectional normalization

In [None]:
def cross_sectional_zscore(df: pd.DataFrame, col: str, clip: float = 3.0) -> pd.Series:
    """Z-score within each date, with winsorization."""
    grouped = df.groupby("feature_date")[col]
    mean = grouped.transform("mean")
    std = grouped.transform("std")
    z = (df[col] - mean) / std
    return z.clip(-clip, clip)

def fill_with_median(df: pd.DataFrame, col: str) -> pd.Series:
    """Fill NaN with cross-sectional median."""
    median = df.groupby("feature_date")[col].transform("median")
    return df[col].fillna(median)

In [None]:
# Normalize fundamental features
for col in fund_cols:
    # Fill missing with median first
    df[col] = fill_with_median(df, col)
    # Then z-score
    df[f"{col}_z"] = cross_sectional_zscore(df, col)

fund_cols_z = [f"{col}_z" for col in fund_cols]
print(f"Normalized {len(fund_cols)} fundamental features")

In [None]:
# Normalize news_count
df["news_count_z"] = cross_sectional_zscore(df, "news_count")

print("News count stats (z-scored):")
print(df["news_count_z"].describe())

## 6. Final dataset assembly

In [None]:
# Price feature columns (already normalized in 1.4.1)
price_feat_cols = [
    "overnight_gap_z", "intraday_ret_z",
    "ret_1d_z", "ret_2d_z", "ret_3d_z", "ret_5d_z",
    "vol_5d_z", "dist_from_high_5d_z", "dist_from_low_5d_z"
]

# Assemble final columns
id_cols = ["symbol", "feature_date", "target_date"]
target_cols = ["target_return", "target_demean", "target_rank"]
flag_cols = ["has_fundamentals"]

final_cols = (
    id_cols + 
    target_cols + 
    flag_cols +
    price_feat_cols + 
    fund_cols_z + 
    ["news_count_z"] + 
    emb_cols
)

dataset = df[final_cols].copy()
print(f"Final columns: {len(final_cols)}")

In [None]:
# Drop rows with any remaining NaN in features
n_before = len(dataset)
dataset = dataset.dropna()
n_after = len(dataset)
print(f"Dropped {n_before - n_after:,} rows with NaN")
print(f"Final dataset: {n_after:,} rows")

In [None]:
# Summary
print(f"Date range: {dataset['feature_date'].min().date()} to {dataset['feature_date'].max().date()}")
print(f"Symbols: {dataset['symbol'].nunique():,}")
print(f"Days: {dataset['feature_date'].nunique():,}")
print(f"Avg rows per day: {len(dataset) / dataset['feature_date'].nunique():.0f}")

In [None]:
# Feature coverage
print(f"\nFeature coverage:")
print(f"  Has fundamentals: {dataset['has_fundamentals'].mean()*100:.1f}%")
print(f"  Has news: {(dataset['news_count_z'] > 0).mean()*100:.1f}%")

In [None]:
# Save
OUTPUT_PATH = Path("data/ml_dataset.pqt")
dataset.to_parquet(OUTPUT_PATH, index=False)
print(f"Saved to {OUTPUT_PATH}")
print(f"File size: {OUTPUT_PATH.stat().st_size / 1e9:.2f} GB")

In [None]:
dataset.head()