# 01 — Build training dataset using Unified Feature Pipeline

**NEW VERSION:** Uses `UnifiedFeaturePipeline` to ensure training and inference use IDENTICAL features.

Features:
- OHLCV normalization: (120, 5) → [-1, 1]
- Kronos embeddings: (512,) time series foundation model
- Context vector: (29,) = 5 MTF + 12 SMC + 12 TA

Runs on Colab T4 or local GPU.

In [None]:
# Install dependencies
!pip -q install yfinance pandas numpy pyarrow duckdb torch huggingface_hub tqdm
!pip -q install scikit-learn lightgbm joblib

In [None]:
import os, sys, pathlib

# Get current working directory
cwd = pathlib.Path().resolve()

# Detect repo root
# If running from notebooks/, go up one level
if cwd.name == "notebooks":
    repo_root = cwd.parent
# If running from repo root
elif (cwd / "apps").exists() or (cwd / "notebooks").exists():
    repo_root = cwd
# If running from somewhere else, search upwards
else:
    repo_root = None
    for p in [cwd, *cwd.parents]:
        if (p / "apps").exists() and (p / "notebooks").exists():
            repo_root = p
            break
    
    if repo_root is None:
        raise FileNotFoundError(
            f"Cannot find AI_TRADER repo root. Current directory: {cwd}\n"
            "Please run this notebook from the repo root or notebooks/ folder."
        )

# Change to repo root
os.chdir(repo_root)
print(f"✓ Repo root: {repo_root}")

# Add apps/api to Python path
api_path = repo_root / "apps" / "api"
if api_path.exists():
    sys.path.insert(0, str(api_path))
    print(f"✓ Added to path: {api_path}")
else:
    raise FileNotFoundError(
        f"❌ apps/api not found at {api_path}\n"
        "Make sure you're running from the correct repository."
    )

# Create output directory
os.makedirs(repo_root / "training_data/v1", exist_ok=True)
print(f"✓ Output directory ready: {repo_root / 'training_data/v1'}")

# Verify we can import
try:
    import app
    print(f"✓ Can import 'app' module from: {app.__file__}")
except ImportError as e:
    print(f"⚠️ Warning: Cannot import 'app' module: {e}")
    print(f"   Python path: {sys.path[0]}")

In [None]:
# Import dependencies
import numpy as np
import pandas as pd
import yfinance as yf
import torch
import requests
from tqdm import tqdm
from pathlib import Path

# Import Unified Feature Pipeline
from app.ml.unified_features import UnifiedFeaturePipeline

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\n✓ Using device: {device}")
print("✓ All imports successful!")

In [None]:
# Config
LOOKBACK = 120
HORIZONS = [3, 5, 10]
START = os.getenv("DATA_START", "2015-01-01")
END = os.getenv("DATA_END", None)
TICKER_FILE = Path(os.getenv("TICKER_FILE", repo_root / "config/nifty100_yfinance.txt"))

# Load tickers
if TICKER_FILE.exists():
    with open(TICKER_FILE) as f:
        TICKERS = [t.strip() for t in f if t.strip()]
else:
    raise FileNotFoundError(f"Ticker file {TICKER_FILE} not found")

if not TICKERS:
    raise ValueError("No tickers loaded")

OUT_PATH = repo_root / "training_data/v1/dataset.parquet"
print(f"Using {len(TICKERS)} tickers")
print(f"Saving to: {OUT_PATH}")

In [None]:
# Initialize Unified Feature Pipeline
print("Initializing Unified Feature Pipeline...")
pipeline = UnifiedFeaturePipeline(
    device=device,
    lookback=LOOKBACK,
    enable_kronos=True
)
print("✓ Pipeline ready")

In [None]:
# Data fetching functions
ALPHA_KEY = os.getenv('ALPHAVANTAGE_API_KEY')

def fetch_daily(sym: str) -> pd.DataFrame:
    """Fetch daily OHLCV data (AlphaVantage fallback to yfinance)"""
    bases = []
    if sym.endswith('.NS') or sym.endswith('.BSE'):
        base = sym.split('.')[0]
    else:
        base = sym
    bases = [f'NSE:{base}', f'BSE:{base}', base]

    # Try AlphaVantage first
    if ALPHA_KEY:
        for av_sym in bases:
            for attempt in range(4):
                try:
                    params = {
                        'function': 'TIME_SERIES_DAILY_ADJUSTED',
                        'symbol': av_sym,
                        'outputsize': 'full',
                        'apikey': ALPHA_KEY,
                    }
                    resp = requests.get('https://www.alphavantage.co/query', params=params, timeout=30)
                    data = resp.json()
                    if 'Note' in data:
                        import time
                        time.sleep(15)
                        continue
                    series = data.get('Time Series (Daily)', {})
                    if series:
                        records = []
                        for date, vals in series.items():
                            records.append({
                                'date': pd.to_datetime(date),
                                'open': float(vals['1. open']),
                                'high': float(vals['2. high']),
                                'low': float(vals['3. low']),
                                'close': float(vals['4. close']),
                                'volume': float(vals['6. volume']),
                            })
                        df = pd.DataFrame(records).sort_values('date')
                        df.set_index('date', inplace=True)
                        if END:
                            df = df.loc[(df.index >= pd.to_datetime(START)) & (df.index <= pd.to_datetime(END))]
                        else:
                            df = df.loc[df.index >= pd.to_datetime(START)]
                        if not df.empty:
                            return df
                except Exception as exc:
                    print(f'[warn] AlphaVantage failed for {av_sym} attempt {attempt+1}: {exc}')
                import time
                time.sleep(15)

    # Fallback to yfinance
    last_exc = None
    for attempt in range(6):
        try:
            df = yf.download(sym, start=START, end=END, interval='1d',
                           auto_adjust=False, progress=False, threads=False)
            if not df.empty:
                break
        except Exception as exc:
            last_exc = exc
        import time
        time.sleep(2.0 * (attempt + 1))
    else:
        if last_exc:
            print(f'[warn] {sym} failed: {last_exc}')
        return pd.DataFrame()

    df = df.rename(columns=str.lower)[['open', 'high', 'low', 'close', 'volume']].dropna()
    if df.empty:
        return df
    df.reset_index(inplace=True)
    df.rename(columns={'index': 'date', 'Date': 'date'}, inplace=True)
    df['date'] = pd.to_datetime(df['date'])
    df.set_index('date', inplace=True)
    return df


def add_labels(df: pd.DataFrame) -> pd.DataFrame:
    """Add forward return labels"""
    out = df.copy()
    for h in HORIZONS:
        out[f'ret_{h}'] = (out['close'].shift(-h) / out['close']) - 1.0
        out[f'up_{h}'] = (out[f'ret_{h}'] > 0).astype(np.int32)
    return out

print("✓ Data fetching functions ready")

In [None]:
# Build dataset using Unified Feature Pipeline
import time

rows = []

for sym in tqdm(TICKERS, desc="Processing symbols"):
    time.sleep(2.0)  # Throttle API calls
    
    df = fetch_daily(sym)
    if df.empty or len(df) < LOOKBACK + max(HORIZONS) + 10:
        continue
    
    df = add_labels(df)
    
    # Use unified pipeline to compute features for all windows
    try:
        # Get all windows
        for i in range(LOOKBACK - 1, len(df) - max(HORIZONS)):
            window_df = df.iloc[:i+1]  # All data up to this point
            
            # Skip if not enough data
            if len(window_df) < LOOKBACK:
                continue
            
            # Compute features using unified pipeline
            features = pipeline.compute_features(window_df, lookback=LOOKBACK)
            
            # Get labels
            y_ret = np.array([df.iloc[i][f"ret_{h}"] for h in HORIZONS], dtype=np.float32)
            y_up = np.array([df.iloc[i][f"up_{h}"] for h in HORIZONS], dtype=np.float32)
            
            if np.any(np.isnan(y_ret)):
                continue
            
            # Store row
            rows.append({
                "symbol": sym,
                "asof": df.index[i],
                "ohlcv_norm": features['ohlcv_norm'],  # (120, 5)
                "kronos_emb": features['kronos_emb'],  # (512,)
                "context": features['context_vec'],     # (29,)
                "y_ret": y_ret,  # (3,)
                "y_up": y_up,    # (3,)
            })
    
    except Exception as e:
        print(f"Error processing {sym}: {e}")
        continue

print(f"\nTotal samples: {len(rows)}")

In [None]:
# Save to parquet
df_out = pd.DataFrame(rows)
df_out.to_parquet(OUT_PATH, index=False)
print(f"Saved to {OUT_PATH}")

# Display summary
print(f"\n=== Dataset Summary ===")
print(f"Total samples: {len(rows)}")
print(f"Unique symbols: {df_out['symbol'].nunique()}")
print(f"Date range: {df_out['asof'].min()} to {df_out['asof'].max()}")
print(f"OHLCV shape: {rows[0]['ohlcv_norm'].shape if rows else 'N/A'}")
print(f"Kronos embedding shape: {rows[0]['kronos_emb'].shape if rows else 'N/A'}")
print(f"Context vector shape: {rows[0]['context'].shape if rows else 'N/A'}")
print(f"Target horizons: {HORIZONS}")

print(f"\n✓ Dataset built using Unified Feature Pipeline")
print(f"✓ Training and inference now use IDENTICAL features!")