<a href="https://colab.research.google.com/github/brendonhuynhbp-hub/gt-markets/blob/main/notebooks/GoogleTrends_Financial_Engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# =========================================================
# ENGINEERED DATASET BUILDER (Colab-friendly, robust paths)
# - Mounts Google Drive
# - Auto-detects project folder
# - Finds latest RAW merged file (csv/parquet)
# - Builds engineered features per-asset (no global dropna)
# - Left-joins features, imputes only engineered cols
# - Saves engineered parquet + csv
# =========================================================

# ---------- 0) Mount Drive ----------
from google.colab import drive
drive.mount('/content/drive')

# ---------- 1) Imports ----------
from pathlib import Path
import numpy as np
import pandas as pd
from datetime import datetime

# ---------- 2) Locate project folder ----------
CANDIDATE_PROJECT_DIRS = [
    Path("/content/drive/MyDrive/gt-markets"),
    Path("/content/drive/Shareddrives/gt-markets"),
]
PROJECT_DIR = next((p for p in CANDIDATE_PROJECT_DIRS if p.exists()), None)
assert PROJECT_DIR is not None, "Project directory not found. Ensure 'gt-markets' exists in MyDrive or a Shared Drive."

DATA_DIR = PROJECT_DIR / "data" / "processed"
DATA_DIR.mkdir(parents=True, exist_ok=True)
print("PROJECT_DIR:", PROJECT_DIR)
print("DATA_DIR:   ", DATA_DIR)

# ---------- 3) Find latest RAW merged file (csv or parquet) ----------
def _latest(patterns):
    files = []
    for pat in patterns:
        files += list(DATA_DIR.glob(pat))
    files = sorted(files, key=lambda p: p.stat().st_mtime, reverse=True)
    return files[0] if files else None

RAW_FILE = _latest([
    "merged_financial_trends_data_*.parquet",
    "merged_financial_trends_data_*.csv",
])

assert RAW_FILE is not None, (
    "Could not find a raw merged file in processed/. "
    "Expected something like 'merged_financial_trends_data_YYYY-MM-DD.csv' or .parquet"
)
print("Using RAW_FILE:", RAW_FILE.name)

# Suffix for output filenames
if RAW_FILE.suffix.lower() in (".csv", ".parquet"):
    stem = RAW_FILE.stem.replace("merged_financial_trends_data_", "")
else:
    stem = datetime.now().strftime("%Y-%m-%d")
OUT_PARQUET = DATA_DIR / f"merged_financial_trends_engineered_{stem}.parquet"
OUT_CSV     = DATA_DIR / f"merged_financial_trends_engineered_{stem}.csv"

# ---------- 4) Load raw merged dataset ----------
def load_raw(path: Path) -> pd.DataFrame:
    if path.suffix.lower() == ".parquet":
        df = pd.read_parquet(path)
    else:
        df = pd.read_csv(path)
    assert "Date" in df.columns, "Expected a 'Date' column in RAW merged file."
    df = df.copy()
    df["Date"] = pd.to_datetime(df["Date"])
    df = df.set_index("Date").sort_index()

    # Fill GT trend NaNs with 0 (legit for Google Trends)
    trend_cols = [c for c in df.columns if "trend" in c.lower()]
    if trend_cols:
        bad = [c for c in trend_cols if df[c].isna().any()]
        if bad:
            print(f"Filling NaN in trend cols with 0 (first 10 shown): {bad[:10]}")
            df[bad] = df[bad].fillna(0.0)
    return df

df0 = load_raw(RAW_FILE)

# ---------- 5) Asset registry (edit if your column names differ) ----------
ASSETS = [
    {"PAIR_ID": "GC=F",      "price_col": "GC=F Close",      "label": "Gold"},
    {"PAIR_ID": "BTC-USD",   "price_col": "BTC-USD Close",   "label": "BTC"},
    {"PAIR_ID": "CL=F",      "price_col": "CL=F Close",      "label": "Oil"},
    {"PAIR_ID": "USDCNY=X",  "price_col": "USDCNY=X Close",  "label": "USDCNY"},
]
for a in ASSETS:
    assert a["price_col"] in df0.columns, f"Missing price column in RAW file: {a['price_col']}"

# ---------- 6) Indicator helpers (min_periods=1 keeps early rows) ----------
def _safe_pct_change(s, periods=1):
    r = s.pct_change(periods)
    r = r.replace([np.inf, -np.inf], np.nan).fillna(0.0)
    return r.clip(-10.0, 10.0)

def _rsi(close: pd.Series, window=14):
    delta = close.diff()
    up  = delta.clip(lower=0)
    dn  = -delta.clip(upper=0)
    roll_up = up.rolling(window, min_periods=1).mean()
    roll_dn = dn.rolling(window, min_periods=1).mean()
    rs  = (roll_up / roll_dn.replace(0, np.nan)).replace([np.inf, -np.inf], np.nan).fillna(0.0)
    rsi = 100 - (100 / (1 + rs))
    return rsi

def _bollinger(close: pd.Series, window=20, k=2.0):
    ma  = close.rolling(window, min_periods=1).mean()
    std = close.rolling(window, min_periods=1).std().fillna(0.0)
    upper = ma + k * std
    lower = ma - k * std
    width = (upper - lower) / ma.replace(0, np.nan)
    width = width.replace([np.inf, -np.inf], np.nan).fillna(0.0)
    return ma, upper, lower, width

def _macd(close: pd.Series, fast=12, slow=26, signal=9):
    ema_fast = close.ewm(span=fast, adjust=False, min_periods=1).mean()
    ema_slow = close.ewm(span=slow, adjust=False, min_periods=1).mean()
    macd    = ema_fast - ema_slow
    signal_ = macd.ewm(span=signal, adjust=False, min_periods=1).mean()
    hist    = macd - signal_
    return macd, signal_, hist

def _annualised_vol(ret, window):
    vol = ret.rolling(window, min_periods=1).std()
    return (vol * np.sqrt(252)).fillna(0.0)   # daily annualisation; acceptable default

# ---------- 7) Per-asset feature engineering (no global NaN drops) ----------
def engineer_for_asset(df_in: pd.DataFrame, price_col: str, prefix: str) -> pd.DataFrame:
    """
    Build engineered features for a single asset.
    - Uses min_periods=1 to keep full history.
    - Returns a DataFrame indexed by Date with prefixed column names.
    """
    px = df_in[[price_col]].rename(columns={price_col: "close"}).copy()

    # Basic returns
    px["ret1"]   = _safe_pct_change(px["close"], 1)
    px["ret5"]   = _safe_pct_change(px["close"], 5)
    px["ret21"]  = _safe_pct_change(px["close"], 21)

    # Moving-average z-scores (scale-invariant)
    for w in [5, 10, 20, 50, 100, 200]:
        ma  = px["close"].rolling(w, min_periods=1).mean()
        std = px["close"].rolling(w, min_periods=1).std().replace(0, np.nan)
        z   = ((px["close"] - ma) / std).replace([np.inf, -np.inf], np.nan).fillna(0.0).clip(-10, 10)
        px[f"ma{w}_z"] = z

    # EMA relative gaps
    for w in [10, 20, 50]:
        ema = px["close"].ewm(span=w, adjust=False, min_periods=1).mean()
        px[f"ema{w}_gap"] = ((px["close"] - ema) / ema.replace(0, np.nan)).replace([np.inf, -np.inf], np.nan).fillna(0.0).clip(-10, 10)

    # Volatility
    for w in [10, 20, 60, 120]:
        px[f"vol{w}"] = _annualised_vol(px["ret1"], w)

    # RSI / Bollinger / MACD
    px["rsi14"] = _rsi(px["close"], 14)
    _, _, _, bb_w = _bollinger(px["close"], 20, 2.0)
    px["bb20_width"] = bb_w
    macd, sig, hist = _macd(px["close"], 12, 26, 9)
    px["macd"] = macd; px["macd_signal"] = sig; px["macd_hist"] = hist

    # Return only features (no raw close)
    feat = px.drop(columns=["close"]).copy()
    feat.columns = [f"{prefix}__{c}" for c in feat.columns]
    return feat

# Build per-asset feature frames
feat_frames = []
for a in ASSETS:
    lbl = a["label"].lower()
    feat_frames.append(engineer_for_asset(df0, a["price_col"], prefix=lbl))

# ---------- 8) Join back to RAW (left joins preserve full index) ----------
df_eng = df0.copy()
for f in feat_frames:
    df_eng = df_eng.join(f, how="left")

# Identify engineered columns we added
added_cols = []
for f in feat_frames:
    added_cols.extend(list(f.columns))
added_cols = list(dict.fromkeys(added_cols))  # unique & ordered

# Sanitise engineered features only (no row drops)
df_eng[added_cols] = (
    df_eng[added_cols]
      .replace([np.inf, -np.inf], np.nan)
      .fillna(0.0)
      .clip(lower=-10.0, upper=10.0)
)

# ---------- 9) Diagnostics ----------
def diag(df: pd.DataFrame, name: str):
    print(f"[{name}] rows={len(df)} | {df.index.min().date()} → {df.index.max().date()}")
    print("rows with any NaN:", int(df.isna().any(axis=1).sum()))

diag(df0,   "RAW (input)")
diag(df_eng, "ENG (post-join + sanitise)")

# Show top offenders if any engineered NaN remain (should be zero)
left_na = df_eng[added_cols].isna().sum().sort_values(ascending=False)
if len(left_na) and left_na.iloc[0] > 0:
    print("Engineered columns with remaining NaN (top 10):")
    print(left_na.head(10))

# ---------- 10) Save outputs ----------
df_eng.reset_index().to_parquet(OUT_PARQUET, index=False)
df_eng.reset_index().to_csv(OUT_CSV, index=False)

print("\nSaved engineered dataset:")
print("  ", OUT_PARQUET)
print("  ", OUT_CSV)


Mounted at /content/drive
PROJECT_DIR: /content/drive/MyDrive/gt-markets
DATA_DIR:    /content/drive/MyDrive/gt-markets/data/processed
Using RAW_FILE: merged_financial_trends_data_2025-09-07.csv
Filling NaN in trend cols with 0 (first 10 shown): ['entrepreneurial_trend', 'cryptocurrency_trend']
[RAW (input)] rows=2609 | 2015-09-08 → 2025-09-05
rows with any NaN: 1496
[ENG (post-join + sanitise)] rows=2609 | 2015-09-08 → 2025-09-05
rows with any NaN: 1496

Saved engineered dataset:
   /content/drive/MyDrive/gt-markets/data/processed/merged_financial_trends_engineered_2025-09-07.parquet
   /content/drive/MyDrive/gt-markets/data/processed/merged_financial_trends_engineered_2025-09-07.csv
