4. Empirical Analysis of ETFs
Pick a sector ETF (in the US, for example, XLRE)

a. Find the 30 largest holdings.

b. Get at least 6 months of data (~ 120 data points).

c. Compute the daily returns.

d. Compute the covariance matrix.

e. Compute the PCA.

f. Compute the SVD.

In [None]:
# -*- coding: utf-8 -*-
# XLU empirical workflow with robust holdings parsing (English-only comments)
# Steps:
# (a) Top-30 holdings (robust to header changes; fallback if no tickers)
# (b) >= ~6 months of daily prices
# (c) Daily returns
# (d) Covariance matrix
# (e) PCA on standardized returns
# (f) SVD of standardized returns matrix

import io
import re
import warnings
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import requests
import yfinance as yf
from sklearn.decomposition import PCA

FUND = "XLU"
SSGA_XLSX = "https://www.ssga.com/library-content/products/fund-data/etfs/us/holdings-daily-us-en-xlu.xlsx"
ALT_HOLDINGS_HTML = "https://stockanalysis.com/etf/xlu/holdings/"  # fallback table
END = datetime.today().date()
START = END - timedelta(days=280)         # ~9 calendar months to ensure >= ~6 months trading days
TARGET_TRADING_DAYS = 126                 # ~6 months of trading days

# -----------------------------
# Utilities
# -----------------------------
def _first_matching_col(columns, patterns, case_insensitive=True):
    """
    Return the first column whose name matches any of the regex patterns.
    """
    flags = re.IGNORECASE if case_insensitive else 0
    for pat in patterns:
        for c in columns:
            if re.search(pat, str(c), flags):
                return c
    return None

def _numeric_percent_series(s):
    """
    Convert a column that may contain strings like '3.45%' into float 3.45.
    """
    return (
        s.astype(str)
         .str.replace(",", "", regex=False)
         .str.extract(r"([-+]?\d*\.?\d+)")
         .iloc[:, 0]
         .astype(float)
    )

def _normalize_ticker(s):
    """
    Normalize tickers for Yahoo Finance compatibility (common cases).
    """
    return (s.astype(str)
            .str.strip()
            .str.replace("/", "-", regex=False)   # e.g., BRK/B -> BRK-B
            .str.replace(" ", "", regex=False)
           )

# -----------------------------
# (a) Robust holdings fetcher
# -----------------------------
def get_xlu_holdings_robust():
    """
    Try SSGA XLSX first. Fuzzy-detect Name/Ticker/Weight.
    If no Ticker column can be found, fall back to StockAnalysis HTML table
    and merge tickers by company name.
    Returns a DataFrame with columns: ['Name', 'Ticker', 'Weight'].
    """
    # 1) Try SSGA XLSX
    primary_source = "SSGA XLSX"
    try:
        r = requests.get(SSGA_XLSX, timeout=30)
        r.raise_for_status()
        df = pd.read_excel(io.BytesIO(r.content), engine="openpyxl")
        raw_cols = list(df.columns)

        # Heuristics for column names that often change
        name_col = _first_matching_col(
            raw_cols,
            patterns=[r"^name$", r"security.*name", r"company", r"issuer", r"holding.*name"]
        )
        ticker_col = _first_matching_col(
            raw_cols,
            patterns=[r"ticker", r"symbol", r"trading.*symbol", r"local.*ticker", r"bbg.*ticker"]
        )
        weight_col = _first_matching_col(
            raw_cols,
            patterns=[r"weight", r"% of", r"percent"]
        )

        print(f"[INFO] SSGA columns detected: name={name_col}, ticker={ticker_col}, weight={weight_col}")
        if name_col is None or weight_col is None:
            raise ValueError("Could not detect required columns from SSGA XLSX.")

        out = pd.DataFrame()
        out["Name"] = df[name_col].astype(str).str.strip()

        # Parse weight into numeric percentage (not fraction)
        out["Weight"] = _numeric_percent_series(df[weight_col])

        if ticker_col is not None:
            out["Ticker"] = _normalize_ticker(df[ticker_col])
            # Drop empty tickers
            out = out.dropna(subset=["Ticker"])
            out = out[out["Ticker"].str.len() > 0]
            # Primary case: we have tickers
            print("[INFO] Using SSGA tickers.")
            return out[["Name", "Ticker", "Weight"]].dropna(subset=["Weight"])
        else:
            # 2) Fallback to alternate source for tickers
            print("[WARN] No ticker column in SSGA XLSX. Falling back to StockAnalysis for tickers...")
            alt_tables = pd.read_html(ALT_HOLDINGS_HTML)
            # The first or second table usually contains Name/Symbol/% Weight columns
            alt = None
            for t in alt_tables:
                cols_lower = [c.lower() for c in t.columns]
                if any(("name" in c or "company" in c) for c in cols_lower) and any(("symbol" in c or "ticker" in c) for c in cols_lower):
                    alt = t.copy()
                    break
            if alt is None:
                raise ValueError("Fallback holdings table not found on StockAnalysis.")

            # Standardize fallback columns
            # Try to identify columns by fuzzy match
            alt_name_col = _first_matching_col(list(alt.columns), [r"name", r"company"])
            alt_ticker_col = _first_matching_col(list(alt.columns), [r"symbol", r"ticker"])
            alt_weight_col = _first_matching_col(list(alt.columns), [r"weight", r"%"])
            if alt_weight_col is not None:
                alt["Weight_fallback"] = _numeric_percent_series(alt[alt_weight_col])
            else:
                alt["Weight_fallback"] = np.nan

            alt = alt.rename(columns={alt_name_col: "Name_fallback", alt_ticker_col: "Ticker"})
            alt["Name_fallback"] = alt["Name_fallback"].astype(str).str.strip()
            alt["Ticker"] = _normalize_ticker(alt["Ticker"])

            # Merge by company name (case-insensitive)
            merged = pd.merge(
                out,
                alt[["Name_fallback", "Ticker"]],
                left_on=out["Name"].str.lower(),
                right_on=alt["Name_fallback"].str.lower(),
                how="left",
                suffixes=("","")
            ).drop(columns=["key_0", "Name_fallback"])

            # If still missing tickers for a few rows, drop them (rare)
            missing_before = merged["Ticker"].isna().sum()
            if missing_before > 0:
                warnings.warn(f"{missing_before} rows had no ticker match; they will be dropped.")
            merged = merged.dropna(subset=["Ticker"])

            print("[INFO] Using fallback tickers from StockAnalysis.")
            return merged[["Name", "Ticker", "Weight"]].dropna(subset=["Weight"])

    except Exception as e:
        print(f"[ERROR] Primary holdings fetch failed: {e}")
        raise

def get_top30_xlu():
    holdings = get_xlu_holdings_robust()
    # Take top-30 by weight
    top30 = (holdings.sort_values("Weight", ascending=False)
                      .head(30)
                      .reset_index(drop=True))
    return top30

# -----------------------------
# Run (a) holdings
# -----------------------------
top30 = get_top30_xlu()
print("\n=== (a) XLU Top 30 Holdings ===")
print(top30)

# -----------------------------
# (b) >= 6 months of daily prices for XLU + top-30
# -----------------------------
tickers = [FUND] + top30["Ticker"].dropna().astype(str).unique().tolist()
prices = yf.download(tickers, start=START, end=END, auto_adjust=True, progress=False)["Close"]

# Keep last ~126 trading days and drop assets with any missing values in this window
prices = prices.tail(TARGET_TRADING_DAYS).dropna(axis=1, how="any")

print(f"\n=== (b) Prices shape: {prices.shape} (rows ~ trading days, cols ~ symbols) ===")
print(prices.tail(3))

# -----------------------------
# (c) Daily returns
# -----------------------------
rets = prices.pct_change().dropna()
print(f"\n=== (c) Daily returns shape: {rets.shape} ===")
print(rets.head())

# -----------------------------
# (d) Covariance matrix
# -----------------------------
cov = rets.cov()
print("\n=== (d) Covariance matrix (head) ===")
print(cov.iloc[:10, :10])

# -----------------------------
# (e) PCA on standardized returns (correlation structure)
# -----------------------------
rets_std = (rets - rets.mean()) / rets.std(ddof=1)
ncomp = min(10, rets_std.shape[1])
pca = PCA(n_components=ncomp).fit(rets_std)

evr = pd.Series(pca.explained_variance_ratio_, index=[f"PC{i+1}" for i in range(ncomp)])
loadings = pd.DataFrame(pca.components_, columns=rets_std.columns, index=evr.index)

print("\n=== (e) PCA explained variance ratio ===")
print(evr.round(4))

print("\n=== PCA loadings (first 3 PCs, first 12 tickers) ===")
print(loadings.iloc[:3, :12].round(3))

# -----------------------------
# (f) SVD of standardized returns matrix
# -----------------------------
X = rets_std.values  # shape T x N
U, s, Vt = np.linalg.svd(X, full_matrices=False)

print("\n=== (f) SVD singular values (first 10) ===")
print(np.round(s[:10], 6))

print("\n=== Right singular vectors V^T (first 3 rows, first 12 tickers) ===")
print(pd.DataFrame(Vt[:3, :12], columns=rets_std.columns[:12], index=["SV1","SV2","SV3"]).round(3))
