In [1]:
import pathlib, pandas as pd, numpy as np

BASE = pathlib.Path.cwd().parents[0]
DATA_PROCESSED = BASE / "data" / "processed"
REPORTS = BASE / "reports"
DATA_PROCESSED.mkdir(parents=True, exist_ok=True)
REPORTS.mkdir(parents=True, exist_ok=True)

# Adjust to your symbols
SYMBOLS = ["QQQ", "VFV.TO", "XEQT.TO"]

print("processed:", DATA_PROCESSED)


processed: /Users/itzronald/Desktop/trend-predictor/data/processed


In [2]:
def ema(series: pd.Series, span: int) -> pd.Series:
    return series.ewm(span=span, adjust=False, min_periods=span).mean()

def rsi(series: pd.Series, period: int = 14) -> pd.Series:
    # Classic Wilder’s RSI implementation
    delta = series.diff()
    gain = (delta.clip(lower=0)).ewm(alpha=1/period, min_periods=period, adjust=False).mean()
    loss = (-delta.clip(upper=0)).ewm(alpha=1/period, min_periods=period, adjust=False).mean()
    rs = gain / (loss.replace(0, np.nan))
    out = 100 - (100 / (1 + rs))
    return out

def safe_div(num, den):
    return num / den.replace(0, np.nan)

In [28]:
def build_features_from_prices(df: pd.DataFrame, warmup: int = 30) -> pd.DataFrame:
    """
    Input df columns: date, open, high, low, close, adj_close, volume.
    Ensures index is 'date' instead of N = 0...n-1 to avoid alignment NaNs, then returns features with a reset index.
    """
    # sort + index by date so all computations align
    df = df.sort_values("date").copy()
    df = df.set_index("date")

    px  = df["adj_close"].astype(float)
    vol = df["volume"].astype(float)

    out = pd.DataFrame(index=df.index)

    # --- Returns & volatility ---
    r1 = np.log(px / px.shift(1))
    out["r1"]   = r1
    out["r5"]   = r1.rolling(5).sum()
    out["r10"]  = r1.rolling(10).sum()
    out["vol10"] = r1.rolling(10).std()
    out["vol20"] = r1.rolling(20).std()

    # --- Trend / momentum ---
    sma10 = px.rolling(10).mean()
    sma20 = px.rolling(20).mean()
    out["sma10_rel"] = sma10 / px - 1.0
    out["sma20_rel"] = sma20 / px - 1.0

    ema12 = ema(px, 12)
    ema26 = ema(px, 26)
    out["ema12_rel"] = ema12 / px - 1.0
    out["ema26_rel"] = ema26 / px - 1.0

    macd   = ema12 - ema26
    signal = ema(macd, 9)
    out["macd"]      = macd
    out["macd_hist"] = macd - signal

    out["rsi14"]  = rsi(px, 14)
    out["vol_z20"] = (vol - vol.rolling(20).mean()) / (vol.rolling(20).std() + 1e-9)

    # --- Targets (t+1) ---
    out["y_reg"] = r1.shift(-1)
    out["y_cls"] = (out["y_reg"] > 0).astype(int)

    # Trim warmup rows (instead of dropna on everything)
    out = out.dropna().reset_index().rename(columns={"date":"date"})
    return out

In [12]:
# Gives market context if it is SPY.
def maybe_join_market_context(features_df: pd.DataFrame, market_symbol: str = "SPY") -> pd.DataFrame:
    mkt_path = DATA_PROCESSED / f"{market_symbol}.parquet"
    if not mkt_path.exists():
        return features_df  # skip if no SPY parquet
    mkt = pd.read_parquet(mkt_path).sort_values("date")
    mpx = mkt["adj_close"].astype(float)
    mr1 = np.log(mpx / mpx.shift(1))
    mctx = pd.DataFrame({
        "date": mkt["date"],
        "mkt_r1": mr1,
        "mkt_r5": mr1.rolling(5).sum(),
        "mkt_vol20": mr1.rolling(20).std(),
    }).dropna()
    # join on date, then shift market features by 0 or 1?
    # Using contemporaneous market info at t is ok (available end-of-day)
    out = features_df.merge(mctx, on="date", how="inner")
    return out.dropna()

In [30]:
# Test feature engineering dataset with one symbol
df = pd.read_parquet(DATA_PROCESSED / "QQQ.parquet")
print("raw rows:", len(df), "date range:", df["date"].min(), "→", df["date"].max())
display(df.head(3))

feats = build_features_from_prices(df)
print("after feature eng:", len(feats))
display(feats.head(5))

raw rows: 6493 date range: 1999-11-01 00:00:00 → 2025-08-25 00:00:00


Unnamed: 0,date,open,high,low,close,adj_close,volume
0,1999-11-01,131.5,133.1,130.6,130.8,130.8,4840900
1,1999-11-02,131.5,133.1,130.4,130.9,130.9,6417400
2,1999-11-03,132.8,134.3,132.4,133.5,133.5,9376300


after feature eng: 6459


Unnamed: 0,date,r1,r5,r10,vol10,vol20,sma10_rel,sma20_rel,ema12_rel,ema26_rel,macd,macd_hist,rsi14,vol_z20,y_reg,y_cls
0,1999-12-17,0.008751,0.047418,0.05685,0.012983,0.017198,-0.037742,-0.063691,-0.042852,-0.078129,5.911056,0.285901,74.82477,0.464414,0.01328,1
1,1999-12-20,0.01328,0.047025,0.061324,0.013178,0.017267,-0.044488,-0.070683,-0.046943,-0.083602,6.224673,0.479615,76.731818,-1.081272,0.052206,1
2,1999-12-21,0.052206,0.117294,0.11353,0.019366,0.019418,-0.082359,-0.109681,-0.080742,-0.12057,7.125374,1.104252,82.52366,0.789917,-0.003359,0
3,1999-12-22,-0.003359,0.095254,0.122139,0.018381,0.019587,-0.067773,-0.098721,-0.065702,-0.108899,7.701987,1.344692,81.090422,0.137497,0.011709,1
4,1999-12-23,0.011709,0.082587,0.129422,0.018181,0.01877,-0.066486,-0.102145,-0.064797,-0.110437,8.233499,1.500963,82.252235,-0.7167,-0.005559,0


In [32]:
# Executing feature engineering dataset for ALL symbols.
def make_dataset(symbol: str, add_market: bool = False) -> pathlib.Path:
    path = DATA_PROCESSED / f"{symbol}.parquet"
    df = pd.read_parquet(path).sort_values("date")
    feats = build_features_from_prices(df)
    if add_market:
        feats = maybe_join_market_context(feats, "SPY")
    outp = DATA_PROCESSED / f"{symbol}_dataset.parquet"
    feats.to_parquet(outp, index=False)
    return outp, feats

summary = []
for sym in SYMBOLS:
    outp, feats = make_dataset(sym, add_market=False)  # set False if no SPY
    summary.append((sym, len(feats), outp.name, feats["date"].min().date(), feats["date"].max().date()))

pd.DataFrame(summary, columns=["symbol","rows","dataset","start","end"])

Unnamed: 0,symbol,rows,dataset,start,end
0,QQQ,6459,QQQ_dataset.parquet,1999-12-17,2025-08-22
1,VFV.TO,3176,VFV.TO_dataset.parquet,2012-12-27,2025-08-22
2,XEQT.TO,1480,XEQT.TO_dataset.parquet,2019-10-01,2025-08-22


In [35]:
# 1) No leakage: features should be independent of y_reg at t+1
# (basic sanity—strict check happens during backtest)
for sym in SYMBOLS:
    ds = pd.read_parquet(DATA_PROCESSED / f"{sym}_dataset.parquet")
    assert "y_reg" in ds.columns and "y_cls" in ds.columns
    # Features shouldn't be NaN
    assert ds.drop(columns=["date","y_reg","y_cls"]).isna().sum().sum() == 0, f"{sym}: NaNs in features"
    # Target shouldn't be NaN either
    assert ds["y_reg"].isna().sum() == 0
    # Date strictly increasing
    assert pd.to_datetime(ds["date"]).is_monotonic_increasing, f"{sym}: dates not increasing"
    print(sym, "dataset ok ✅ rows:", len(ds))

for sym in SYMBOLS:
    ds = pd.read_parquet(DATA_PROCESSED / f"{sym}_dataset.parquet")
    assert ds.drop(columns=["date"]).isna().sum().sum() == 0, f"{sym}: NaNs present"
print("All good ✅")

QQQ dataset ok ✅ rows: 6459
VFV.TO dataset ok ✅ rows: 3176
XEQT.TO dataset ok ✅ rows: 1480
All good ✅
