# Sparse Index Tracking — Starter Notebook

_Generated 2025-09-02_

This notebook provides a **clean, modular baseline** for building a sparse portfolio that tracks a benchmark (default **S&P 500, ^GSPC**). It includes:
- Data ingestion (tickers + prices via `pandas`/`yfinance`)
- Benchmark returns (^GSPC)
- Two methods:
  1. **Greedy forward selection + NNLS refit** (long-only, sum-to-one)
  2. **L1-regularized convex regression (cvxpy)** with long-only and budget constraint
- Train/validation split and basic metrics: in-sample/out-of-sample tracking error, number of names

You can later add:
- **Cardinality-constrained MIQP**, **factor-based tracking**, **sector caps**, and **turnover penalties**.

**Tip:** If Wikipedia scrape of S&P 500 tickers fails (network/policy), the code falls back to a small demonstration universe.


In [ ]:
# If running in a fresh Colab session, uncomment the next line to install packages:
# !pip -q install yfinance cvxpy pulp ortools scikit-learn pandas numpy matplotlib > /dev/null


In [ ]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

import yfinance as yf
import cvxpy as cp

np.set_printoptions(suppress=True, linewidth=120)


In [ ]:
# ===== Config =====
BENCHMARK = '^GSPC'   # S&P 500 index ticker in Yahoo Finance
TRAIN_YEARS = 3        # years in train window
TEST_YEARS = 1         # years in test window following train
FREQ = '1d'            # '1d' daily; can change to '1wk'
MAX_TICKERS = 500      # cap for universe size (S&P 500)
SEED = 42
np.random.seed(SEED)

# Dates
end_date = datetime.today().date()
start_date = end_date - timedelta(days=int((TRAIN_YEARS+TEST_YEARS)*365.25))
split_date = end_date - timedelta(days=int(TEST_YEARS*365.25))
start_date, split_date, end_date


In [ ]:
def get_sp500_tickers(max_n=500):
    """Try to fetch S&P 500 tickers from Wikipedia; fall back to a demo list if it fails."""
    try:
        tables = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')
        df = tables[0]
        tix = df['Symbol'].astype(str).str.replace('.', '-', regex=False).tolist()
        # Some tickers are weird (e.g., BRK.B -> BRK-B). Keep only alnum + '-'
        tix = [t for t in tix if len(t) > 0 and set(t) <= set('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-')]
        return tix[:max_n]
    except Exception as e:
        print('Wiki tickers fetch failed, using a small demo universe:', e)
        return ['AAPL','MSFT','NVDA','AMZN','GOOGL','META','BRK-B','JPM','XOM','UNH','JNJ','PG','AVGO','CVX','HD','LLY','ADBE','NFLX','MRK','PEP'][:max_n]

tickers = get_sp500_tickers(MAX_TICKERS)
len(tickers), tickers[:10]


In [ ]:
def download_prices(tickers, start, end, interval='1d'):
    data = yf.download(tickers + [BENCHMARK], start=start, end=end, interval=interval, auto_adjust=True, progress=False)
    px = data['Close']
    # Drop tickers with all-NaN
    px = px.dropna(axis=1, how='all')
    # Forward-fill occasional gaps
    px = px.fillna(method='ffill')
    return px

prices = download_prices(tickers, start_date, end_date, FREQ)
prices.tail(3)


In [ ]:
# Compute simple returns
rets = prices.pct_change().dropna(how='all')
rets = rets.loc[rets.index >= pd.to_datetime(start_date)]
rets = rets.dropna(axis=1, how='any')  # keep only full-history names for simplicity

# Separate benchmark and stock matrix
r_b = rets[BENCHMARK]
R = rets.drop(columns=[BENCHMARK])

print('Shapes -> R:', R.shape, 'r_b:', r_b.shape)
R.head(3)


In [ ]:
# Train/validation split (time-based)
train_mask = R.index < pd.to_datetime(split_date)
R_tr, R_te = R.loc[train_mask], R.loc[~train_mask]
rb_tr, rb_te = r_b.loc[train_mask], r_b.loc[~train_mask]
R_tr.shape, R_te.shape, rb_tr.shape, rb_te.shape


In [ ]:
def nnls_sum_to_one(R, r, idx=None):
    """Solve min ||R w - r||^2 s.t. w>=0, 1'w=1. If idx provided, restrict to those columns."""
    if idx is not None:
        R_ = R.iloc[:, idx].values
    else:
        R_ = R.values
    r_ = r.values.reshape(-1,1)
    n = R_.shape[1]
    w = cp.Variable((n,1))
    obj = cp.Minimize(cp.sum_squares(R_ @ w - r_))
    cons = [w >= 0, cp.sum(w) == 1]
    prob = cp.Problem(obj, cons)
    prob.solve(solver=cp.OSQP, verbose=False)
    if w.value is None:
        return None, np.inf
    wv = w.value.flatten()
    resid = (R_ @ w.value - r_).flatten()
    te = np.std(resid) * np.sqrt(252)  # daily -> annualized TE
    return wv, te

def greedy_forward_nnls(R, r, k=15):
    """Greedy selection by correlation, with NNLS refit (sum-to-one) at each step."""
    remaining = list(range(R.shape[1]))
    chosen = []
    w = None
    resid = r.values.copy()
    for step in range(k):
        # Pick the column with highest |corr| to current residual
        corr = np.abs(np.corrcoef(R.values.T, resid)[R.shape[1]:, :R.shape[1]].diagonal()) if False else None
        # Simpler & robust: brute-force best next feature by NNLS fit improvement
        best_i, best_te = None, np.inf
        for i in remaining:
            idx = chosen + [i]
            w_try, te_try = nnls_sum_to_one(R, r, idx)
            if te_try < best_te:
                best_te = te_try
                best_i = i
        chosen.append(best_i)
        remaining.remove(best_i)
        w, te = nnls_sum_to_one(R, r, chosen)
        # Update residual for logging (not used for selection to keep it simple)
        resid = r.values - R.iloc[:, chosen].values @ w.reshape(-1,1)
        # print(f"Step {step+1}: added {R.columns[best_i]}, TE~{te:.2f} bp")
    return chosen, w, te

def l1_cvx(R, r, lam=1.0):
    """Solve min ||R w - r||^2 + lam * ||w||_1 s.t. w>=0, 1'w=1."""
    R_ = R.values
    r_ = r.values.reshape(-1,1)
    n = R_.shape[1]
    w = cp.Variable((n,1))
    obj = cp.Minimize(cp.sum_squares(R_ @ w - r_) + lam * cp.norm1(w))
    cons = [w >= 0, cp.sum(w) == 1]
    prob = cp.Problem(obj, cons)
    prob.solve(solver=cp.OSQP, verbose=False)
    if w.value is None:
        return None, np.inf
    wv = w.value.flatten()
    resid = (R_ @ w.value - r_).flatten()
    te = np.std(resid) * np.sqrt(252)
    return wv, te

def evaluate_te(R, r, w):
    resid = r.values - R.values @ w
    return float(np.std(resid) * np.sqrt(252))

def portfolio_returns(R, w):
    return pd.Series(R.values @ w, index=R.index)


In [ ]:
# ===== Approach A: Greedy forward + NNLS on TRAIN, evaluate on TEST =====
K = 15
chosen, w_tr, te_tr = greedy_forward_nnls(R_tr, rb_tr, k=K)
names = R_tr.columns[chosen]
print(f"Chosen ({len(chosen)}):", list(names))
print(f"In-sample TE (annualized, bp): {te_tr:.2f}")

# Evaluate on test
w_full = np.zeros(R_tr.shape[1])
w_full[chosen] = w_tr
te_te = evaluate_te(R_te.iloc[:, :], rb_te, w_full)
print(f"Out-of-sample TE (annualized, bp): {te_te:.2f}")

# Build test period cumulative returns vs benchmark
port_te = portfolio_returns(R_te.iloc[:, :], w_full)
cum_port = (1 + port_te).cumprod()
cum_bench = (1 + rb_te).cumprod()
ax = (cum_port.rename('Portfolio') / cum_port.iloc[0]).plot(figsize=(9,4))
((cum_bench.rename('Benchmark') / cum_bench.iloc[0])).plot(ax=ax)
plt.title('Out-of-sample cumulative performance (Greedy+NNLS)')
plt.legend(); plt.grid(True); plt.show()


In [ ]:
# ===== Approach B: L1-regularized convex regression (with constraints) =====
lams = np.geomspace(10.0, 0.01, 10)
best = (None, None, np.inf)
for lam in lams:
    w_l1, te_in = l1_cvx(R_tr, rb_tr, lam=float(lam))
    if w_l1 is None:
        continue
    te_out = evaluate_te(R_te, rb_te, w_l1)
    nnz = int((w_l1 > 1e-6).sum())
    # Simple score: prioritize low out-of-sample TE, break ties with sparsity
    score = te_out + 0.01 * nnz
    print(f"lam={lam:.3f} -> OOS TE={te_out:.2f} bp, nnz={nnz}")
    if score < best[2]:
        best = (w_l1, lam, score)

w_best, lam_best, _ = best
if w_best is not None:
    print(f"\nBest lam={lam_best:.4f}; nnz={int((w_best>1e-6).sum())}; OOS TE={evaluate_te(R_te, rb_te, w_best):.2f} bp")
    port_te2 = portfolio_returns(R_te, w_best)
    cum_port2 = (1 + port_te2).cumprod()
    cum_bench2 = (1 + rb_te).cumprod()
    ax = (cum_port2.rename('Portfolio L1') / cum_port2.iloc[0]).plot(figsize=(9,4))
    ((cum_bench2.rename('Benchmark') / cum_bench2.iloc[0])).plot(ax=ax)
    plt.title('Out-of-sample cumulative performance (L1 constrained)')
    plt.legend(); plt.grid(True); plt.show()
else:
    print('L1 solver failed to find a solution. Try adjusting the lambda grid or OSQP install.')


## Next steps
- Add **sector caps**: restrict weights by sector buckets.
- Add **turnover penalty**: fit sequentially over time with \(\|w_t - w_{t-1}\|_1\) penalty.
- Try **MIQP** with cardinality (\(\sum z_i \le k\)). Use `pulp` or `ortools`.
- Build **factor exposures** (PCA or Fama–French-style) and solve a smaller problem matching exposures.
- Replace benchmark returns with an **equal-weight** or **custom synthetic** benchmark to avoid mega-cap dominance.
