In [None]:
# Environment & Data Checks: Mount + Paths

from google.colab import drive
drive.mount('/content/drive')

import os, sys, json, time, platform, subprocess, textwrap

BASE = "/content/drive/MyDrive/MLBA_Project"
RAW  = f"{BASE}/data/raw"
PRO  = f"{BASE}/data/processed"
RPT  = f"{BASE}/reports"
MDIR = f"{BASE}/models"
NBK  = f"{BASE}/notebooks"

for d in (RAW, PRO, RPT, MDIR, NBK):
    os.makedirs(d, exist_ok=True)

print("Project root:", BASE)
print("Folders ready:", {"RAW": RAW, "PRO": PRO, "RPT": RPT, "MDIR": MDIR, "NBK": NBK})

In [None]:
# System & package versions (helpful for reproducibility)

import sys, platform, psutil, shutil
from datetime import datetime

def which(cmd):
    p = shutil.which(cmd)
    return p if p else "not found"

info = {
    "timestamp_utc": datetime.utcnow().isoformat() + "Z",
    "python": sys.version.split()[0],
    "platform": platform.platform(),
    "machine": platform.machine(),
    "processor": platform.processor(),
    "pip": which("pip"),
    "python_exe": sys.executable
}

try:
    import torch
    info["cuda_available"] = bool(torch.cuda.is_available())
    info["torch_version"] = torch.__version__
except Exception:
    info["cuda_available"] = False
    info["torch_version"] = None

try:
    import psutil, os
    vm = psutil.virtual_memory()
    info["ram_gb"] = round(vm.total / 1024**3, 2)
except Exception:
    info["ram_gb"] = None

import pandas as pd, numpy as np
info["pandas_version"] = pd.__version__
info["numpy_version"]  = np.__version__

print(json.dumps(info, indent=2))

In [None]:
# Install/ensure core pip packages (safe to re-run)
# These match downstream phases.

!pip -q install pandas numpy matplotlib scikit-learn lightgbm xgboost
import pandas as pd, numpy as np, matplotlib, sklearn
import lightgbm, xgboost

print("OK →",
      "pandas", pd.__version__,
      "| numpy", np.__version__,
      "| sklearn", sklearn.__version__,
      "| lightgbm", lightgbm.__version__,
      "| xgboost", xgboost.__version__)

In [None]:
# Save a small project config for downstream notebooks to read if needed.

cfg = {
    "base": BASE,
    "paths": {"raw": RAW, "processed": PRO, "reports": RPT, "models": MDIR, "notebooks": NBK},
    "horizon_days": 63,
    "rf_annual": 0.06,
    "random_seed": 42,
}

with open(f"{BASE}/project_config.json", "w") as f:
    json.dump(cfg, f, indent=2)

print("Wrote:", f"{BASE}/project_config.json")
print(json.dumps(cfg, indent=2))

In [None]:
# Verify expected raw files exist (you can adjust names as needed).
# This won’t fail the run; it prints actionable status.

import os, glob

expected_any_funds = "*_LargeCap.csv"         # pattern for funds
expected_files = [
    "index_tri_nifty100.csv",                 # required benchmark
    # Optional macro inputs:
    "india_vix.csv", "usd_inr.csv", "gsec_10y.csv", "gold_inr.csv", "brent_crude.csv"
]

print("RAW directory:", RAW)
present = os.listdir(RAW)
print("RAW contains:", len(present), "files")

missing = []
for fname in expected_files:
    if not os.path.exists(os.path.join(RAW, fname)):
        missing.append(fname)

funds = sorted(glob.glob(os.path.join(RAW, expected_any_funds)))
print(f"Fund files matching '{expected_any_funds}':", len(funds))

if missing:
    print("\n⚠ Missing files (ok for optional macros; TRI is required for Phase 1):")
    for m in missing:
        print("  -", m)
else:
    print("\nAll listed non-fund files present (or optional).")

if not funds:
    print("\n⚠ No fund CSVs found — Phase 1 will fail. Add *_LargeCap.csv files to RAW.")

In [None]:
# Peek first two fund CSVs to ensure 'date' + NAV-like columns exist.

import pandas as pd

def sniff_schema(path):
    df = pd.read_csv(path, nrows=5)
    cols = [c.strip().lower() for c in df.columns]
    has_date = "date" in cols
    nav_candidates = [c for c in ["nav","net_asset_value","nav_value","price","close"] if c in cols]
    return {"file": os.path.basename(path), "has_date": has_date, "nav_cols": nav_candidates, "columns": cols}

sample = funds[:2]
if not sample:
    print("No sample available — add *_LargeCap.csv to RAW.")
else:
    for p in sample:
        print(sniff_schema(p))

In [None]:
# Check TRI timeline continuity and duplicates.

import pandas as pd, numpy as np, os

tri_path = os.path.join(RAW, "index_tri_nifty100.csv")
if not os.path.exists(tri_path):
    print("⚠ TRI file missing — required for Phase 1.")
else:
    tri = pd.read_csv(tri_path)
    tri.columns = tri.columns.str.strip().str.lower()
    assert "date" in tri.columns, "TRI must have a 'date' column"
    tri["date"] = pd.to_datetime(tri["date"], errors="coerce")
    tri = tri.dropna(subset=["date"]).sort_values("date")
    val_col = [c for c in tri.columns if c != "date"][0]
    tri["dupe"] = tri.duplicated(subset=["date"])
    print("TRI range:", tri["date"].min().date(), "→", tri["date"].max().date(),
          "| rows:", len(tri), "| duplicates:", int(tri["dupe"].sum()))
    if tri["dupe"].any():
        display(tri.loc[tri["dupe"]].head())

In [None]:
# Create placeholders so downstream phases don’t crash if run before Phase 1/2.
# These are overwritten by actual outputs later.

import pandas as pd, numpy as np, os, json

place_master = os.path.join(PRO, "clean_master_union.csv")
place_feat   = os.path.join(PRO, "clean_with_features.csv")

if not os.path.exists(place_master):
    pd.DataFrame(columns=["date","fund_id","nav","tri"]).to_csv(place_master, index=False)
    print("Scaffolded:", place_master)

if not os.path.exists(place_feat):
    pd.DataFrame(columns=["date","fund_id"]).to_csv(place_feat, index=False)
    print("Scaffolded:", place_feat)

open(os.path.join(PRO, "README.txt"), "w").write(
    "Processed artifacts are generated by notebooks.\n"
)
print("Processed scaffolding ready.")

In [None]:
# Optional: create a data_dictionary.json you can edit with column descriptions.

dd = {
  "fund_files_pattern": "*_LargeCap.csv",
  "fund_schema": {
    "date": "Trading date (YYYY-MM-DD)",
    "nav|net_asset_value|nav_value|price|close": "Mutual fund NAV / price column (one of these)"
  },
  "index_tri_nifty100.csv": {
    "date": "Trading date",
    "tri": "Total Return Index level (benchmark)"
  },
  "india_vix.csv": {"date": "Trading date", "india_vix": "Volatility index"},
  "usd_inr.csv": {"date": "Trading date", "usd_inr": "USD/INR exchange rate"},
  "gsec_10y.csv": {"date": "Trading date", "gsec_10y_yield": "10Y government bond yield"},
  "gold_inr.csv": {"date": "Trading date", "gold_inr": "Gold (INR) index/price"},
  "brent_crude.csv": {"date": "Trading date", "brent_usd": "Brent crude price (USD)"}
}

out = os.path.join(RAW, "data_dictionary.json")
with open(out, "w") as f:
    json.dump(dd, f, indent=2)

print("Wrote:", out)

In [None]:
# Reusable integrity helpers (import these via %run if you want in later phases)

import pandas as pd, numpy as np

def assert_no_duplicate_keys(df, keys):
    dups = df.duplicated(subset=keys).sum()
    if dups:
        raise AssertionError(f"Found {dups} duplicate rows on keys={keys}")

def assert_monotonic_by_group(df, group_col, date_col):
    bad = []
    for gid, g in df.groupby(group_col):
        if not g[date_col].is_monotonic_increasing:
            bad.append(gid)
    if bad:
        raise AssertionError(f"Non-monotonic dates for groups: {bad[:5]}{'...' if len(bad)>5 else ''}")

def pct_missing(s):
    return round(float(s.isna().mean())*100, 2)

print("Helpers ready:", ["assert_no_duplicate_keys", "assert_monotonic_by_group", "pct_missing"])

In [None]:
# Mark Phase 0 completed (downstream notebooks can optionally check this)

stamp = {
    "phase": 0,
    "message": "Environment initialized & raw checks executed",
    "time_utc": __import__("datetime").datetime.utcnow().isoformat() + "Z"
}
with open(f"{PRO}/phase0_stamp.json", "w") as f:
    json.dump(stamp, f, indent=2)

print("Saved:", f"{PRO}/phase0_stamp.json")