# Comprehensive Portfolio Analysis (Schwab CSV Compatible)

This notebook is designed to be **scrupulous and comprehensive** while remaining practical. It focuses on Schwab CSV exports and adds optional enrichment with market data. It is built to be flexible about column names and to tolerate real-world CSV quirks.

**You will get:**
- Robust CSV ingestion for Schwab holdings and transactions
- Portfolio summary, allocation, concentration, and quality checks
- Performance and risk metrics (if price data is available)
- Optional enrichment with sector/industry metadata
- Rebalancing and risk contribution analysis
- Exportable outputs and figures

**Data you can use:**
- Schwab holdings CSV (Positions or Holdings export)
- Schwab transaction history CSV (if available)
- Local price history CSV (recommended if no API access)
- Optional API via `yfinance` (requires network access)

If you have different export formats, update the column mapping in the normalization utilities below.


## 0) Environment setup

This notebook uses common analytics libraries. If a library is missing, install it in your environment.

Recommended: `pandas`, `numpy`, `matplotlib`, `seaborn`, `scipy`, `statsmodels`, `yfinance` (optional).


In [None]:
# Optional install commands (uncomment if needed)
# !pip install pandas numpy matplotlib seaborn scipy statsmodels yfinance

import os
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

sns.set_theme(style="whitegrid")
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 200)

BASE_DIR = Path.cwd()
DATA_DIR = BASE_DIR / "data"
OUTPUT_DIR = BASE_DIR / "outputs"
CACHE_DIR = BASE_DIR / "cache"

DATA_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
CACHE_DIR.mkdir(exist_ok=True)


## 1) Configuration

Update paths and options as needed.
- `holdings_csv`: Schwab positions/holdings export
- `transactions_csv`: Schwab transaction history export
- `prices_csv`: local price history with columns `date`, `symbol`, `adj_close` (or `close`)
- `benchmark_symbol`: used for beta and relative performance

Tip: Keep data in the `data/` folder for convenience.


In [None]:
CONFIG = {
    "holdings_csv": DATA_DIR / "schwab_holdings.csv",
    "transactions_csv": DATA_DIR / "schwab_transactions.csv",
    "prices_csv": DATA_DIR / "prices.csv",
    "benchmark_symbol": "SPY",
    "analysis_currency": "USD",
    "risk_free_rate_annual": 0.03,
    "use_yfinance": False,
    "yfinance_period": "5y",
    "yfinance_interval": "1d",
}

CONFIG


## 2) Utilities: normalization and parsing

Schwab CSV exports can vary in column headers and formatting. The utilities below aim to normalize them into a consistent schema.

If your CSV uses different column names, extend `COLUMN_ALIASES`.


In [None]:
import re

COLUMN_ALIASES = {
    # Holdings / Positions
    "symbol": ["symbol", "ticker", "sym"],
    "description": ["description", "security description", "name", "security name"],
    "quantity": ["quantity", "qty", "shares", "units"],
    "price": ["price", "last price", "mark", "market price"],
    "market_value": ["market value", "marketvalue", "value"],
    "cost_basis": ["cost basis", "costbasis", "total cost"],
    "unrealized_gain": ["unrealized gain", "unrealized", "unrealized gain/loss", "unrealized pl"],
    "unrealized_gain_pct": ["unrealized gain %", "unrealized %", "unrealized gain/loss %"],
    "day_change": ["day change", "change $", "day change $"],
    "day_change_pct": ["day change %", "change %"],
    "asset_class": ["asset class", "assetclass", "asset type"],
    # Transactions
    "trade_date": ["date", "trade date", "posted date"],
    "action": ["action", "transaction", "type"],
    "amount": ["amount", "net amount", "total amount"],
    "fees": ["fees", "commission"],
    "cash": ["cash", "cash balance"],
}

def _norm_col(col):
    col = str(col).strip().lower()
    col = re.sub(r"\s+", " ", col)
    col = col.replace("/", " " ).replace("-", " " )
    col = re.sub(r"[^a-z0-9 %]", "", col)
    col = re.sub(r"\s+", " ", col).strip()
    return col

def normalize_columns(df):
    raw_cols = list(df.columns)
    norm_cols = {_norm_col(c): c for c in raw_cols}

    mapping = {}
    for canonical, aliases in COLUMN_ALIASES.items():
        for alias in aliases:
            alias_norm = _norm_col(alias)
            if alias_norm in norm_cols:
                mapping[norm_cols[alias_norm]] = canonical
                break

    df = df.rename(columns=mapping)
    return df

def to_float(series):
    if series is None:
        return None
    # Remove currency, commas, and parentheses for negatives
    s = series.astype(str)
    s = s.str.replace("$", "", regex=False)
    s = s.str.replace(",", "", regex=False)
    s = s.str.replace("%", "", regex=False)
    s = s.str.replace("(", "-", regex=False)
    s = s.str.replace(")", "", regex=False)
    s = s.str.strip()
    return pd.to_numeric(s, errors="coerce")

def parse_date(series):
    return pd.to_datetime(series, errors="coerce")


## 3) Load Schwab holdings CSV

This function loads a holdings/positions export and normalizes columns into a consistent schema. It also calculates derived fields if missing.


In [None]:
def load_schwab_holdings(path):
    if not Path(path).exists():
        print(f"Holdings CSV not found: {path}")
        return None
    df = pd.read_csv(path)
    df = normalize_columns(df)

    # Standardize key numeric columns
    for col in ["quantity", "price", "market_value", "cost_basis", "unrealized_gain", "unrealized_gain_pct", "day_change", "day_change_pct"]:
        if col in df.columns:
            df[col] = to_float(df[col])

    # Derive market value if missing
    if "market_value" not in df.columns and {"quantity", "price"}.issubset(df.columns):
        df["market_value"] = df["quantity"] * df["price"]

    # Derive cost basis if missing
    if "cost_basis" not in df.columns and "unrealized_gain" in df.columns and "market_value" in df.columns:
        df["cost_basis"] = df["market_value"] - df["unrealized_gain"]

    # Derive unrealized gain if missing
    if "unrealized_gain" not in df.columns and {"market_value", "cost_basis"}.issubset(df.columns):
        df["unrealized_gain"] = df["market_value"] - df["cost_basis"]

    # Derive unrealized gain % if missing
    if "unrealized_gain_pct" not in df.columns and {"unrealized_gain", "cost_basis"}.issubset(df.columns):
        df["unrealized_gain_pct"] = df["unrealized_gain"] / df["cost_basis"]

    # Clean symbol
    if "symbol" in df.columns:
        df["symbol"] = df["symbol"].astype(str).str.strip()

    # Drop rows without symbol or quantity
    if "symbol" in df.columns:
        df = df[df["symbol"].notna()]

    return df

holdings = load_schwab_holdings(CONFIG["holdings_csv"])
holdings.head() if holdings is not None else None


## 4) Load Schwab transactions CSV (optional but recommended)

Transactions provide the data needed for true performance metrics (TWR/MWR), realized gains, and cash flows.


In [None]:
def load_schwab_transactions(path):
    if not Path(path).exists():
        print(f"Transactions CSV not found: {path}")
        return None
    df = pd.read_csv(path)
    df = normalize_columns(df)

    if "trade_date" in df.columns:
        df["trade_date"] = parse_date(df["trade_date"])

    for col in ["quantity", "price", "amount", "fees"]:
        if col in df.columns:
            df[col] = to_float(df[col])

    if "symbol" in df.columns:
        df["symbol"] = df["symbol"].astype(str).str.strip()

    # Normalize action text
    if "action" in df.columns:
        df["action"] = df["action"].astype(str).str.strip().str.lower()

    return df

transactions = load_schwab_transactions(CONFIG["transactions_csv"])
transactions.head() if transactions is not None else None


## 5) Data quality checks

Before analysis, confirm the inputs are complete and consistent.


In [None]:
def holdings_quality_report(df):
    if df is None or df.empty:
        return "Holdings dataset is empty or missing."
    report = {}
    report["rows"] = len(df)
    for col in ["symbol", "quantity", "price", "market_value", "cost_basis"]:
        report[f"missing_{col}"] = df[col].isna().mean() if col in df.columns else 1.0
    report["zero_quantity"] = (df.get("quantity", pd.Series(dtype=float)) == 0).mean()
    report["negative_market_value"] = (df.get("market_value", pd.Series(dtype=float)) < 0).mean()
    return pd.Series(report)

holdings_quality_report(holdings) if holdings is not None else None


## 6) Core portfolio summary

Compute total market value, cost basis, unrealized gain, and exposures.


In [None]:
def portfolio_summary(df):
    if df is None or df.empty:
        return None
    total_mv = df["market_value"].sum() if "market_value" in df.columns else np.nan
    total_cb = df["cost_basis"].sum() if "cost_basis" in df.columns else np.nan
    total_ug = df["unrealized_gain"].sum() if "unrealized_gain" in df.columns else np.nan
    total_ug_pct = total_ug / total_cb if pd.notna(total_cb) and total_cb != 0 else np.nan

    summary = {
        "total_market_value": total_mv,
        "total_cost_basis": total_cb,
        "total_unrealized_gain": total_ug,
        "total_unrealized_gain_pct": total_ug_pct,
        "num_positions": len(df),
    }
    return pd.Series(summary)

summary = portfolio_summary(holdings)
summary


In [None]:
def add_weights(df):
    if df is None or df.empty:
        return df
    total_mv = df["market_value"].sum() if "market_value" in df.columns else np.nan
    if pd.notna(total_mv) and total_mv != 0:
        df = df.copy()
        df["weight"] = df["market_value"] / total_mv
    return df

holdings = add_weights(holdings)
holdings.head() if holdings is not None else None


## 7) Allocation and concentration analysis

Compute allocation by symbol, asset class, and concentration metrics (HHI, top N).


In [None]:
def concentration_metrics(df):
    if df is None or df.empty or "weight" not in df.columns:
        return None
    weights = df["weight"].dropna().values
    hhi = np.sum(weights ** 2)
    top5 = df.nlargest(5, "weight")["weight"].sum()
    top10 = df.nlargest(10, "weight")["weight"].sum()
    return pd.Series({
        "hhi": hhi,
        "top5_weight": top5,
        "top10_weight": top10,
    })

concentration_metrics(holdings) if holdings is not None else None


In [None]:
def plot_top_positions(df, n=10):
    if df is None or df.empty or "weight" not in df.columns:
        return
    top = df.nlargest(n, "weight").copy()
    label_col = "symbol" if "symbol" in top.columns else top.index
    plt.figure(figsize=(10, 6))
    plt.barh(top[label_col].astype(str), top["weight"], color="#4C72B0")
    plt.gca().invert_yaxis()
    plt.title(f"Top {n} Positions by Weight")
    plt.xlabel("Weight")
    plt.tight_layout()
    plt.show()

plot_top_positions(holdings, n=10)


## 8) Price data loading

Price data is required for return, risk, and performance analytics. You can either:
- Load a local CSV with columns: `date`, `symbol`, `adj_close` (or `close`)
- Use `yfinance` to fetch prices (requires network access)

The notebook supports both, but local CSV is most reliable.


In [None]:
def load_prices_local(path):
    if not Path(path).exists():
        print(f"Prices CSV not found: {path}")
        return None
    df = pd.read_csv(path)
    df = normalize_columns(df)
    if "date" in df.columns:
        df["date"] = parse_date(df["date"])
    if "adj_close" not in df.columns and "close" in df.columns:
        df["adj_close"] = df["close"]
    return df

def fetch_prices_yfinance(symbols, period="5y", interval="1d"):
    try:
        import yfinance as yf
    except Exception as e:
        print(f"yfinance not available: {e}")
        return None
    data = yf.download(symbols, period=period, interval=interval, group_by="ticker", auto_adjust=True, progress=False)
    # Normalize to long format
    if isinstance(symbols, str):
        symbols = [symbols]
    frames = []
    for sym in symbols:
        if sym in data.columns.get_level_values(0):
            df = data[sym].reset_index()
            df["symbol"] = sym
            df = df.rename(columns={"Adj Close": "adj_close", "Close": "close", "Date": "date"})
            frames.append(df[["date", "symbol", "adj_close"]])
    return pd.concat(frames, ignore_index=True) if frames else None

prices = load_prices_local(CONFIG["prices_csv"])
if prices is None and CONFIG["use_yfinance"] and holdings is not None:
    symbols = sorted(set(holdings["symbol"].dropna().astype(str)))
    symbols = [s for s in symbols if s not in ["CASH", "USD"]]
    prices = fetch_prices_yfinance(symbols + [CONFIG["benchmark_symbol"]], CONFIG["yfinance_period"], CONFIG["yfinance_interval"])
prices.head() if prices is not None else None


## 9) Returns and risk analytics

With price history, compute returns, volatility, beta, drawdown, and risk metrics.


In [None]:
def build_price_matrix(prices_df):
    if prices_df is None or prices_df.empty:
        return None
    df = prices_df.copy()
    df = df.dropna(subset=["date", "symbol", "adj_close"])
    df = df.sort_values(["symbol", "date"])
    pivot = df.pivot(index="date", columns="symbol", values="adj_close")
    return pivot

price_matrix = build_price_matrix(prices)
price_matrix.tail() if price_matrix is not None else None


In [None]:
def compute_returns(price_matrix, method="log"):
    if price_matrix is None or price_matrix.empty:
        return None
    if method == "log":
        returns = np.log(price_matrix / price_matrix.shift(1))
    else:
        returns = price_matrix.pct_change()
    return returns.dropna(how="all")

returns = compute_returns(price_matrix, method="log")
returns.tail() if returns is not None else None


In [None]:
def portfolio_returns(returns_df, weights):
    if returns_df is None or returns_df.empty:
        return None
    # Align weights to columns
    w = weights.reindex(returns_df.columns).fillna(0.0)
    port = returns_df.mul(w, axis=1).sum(axis=1)
    return port

if holdings is not None and "weight" in holdings.columns and returns is not None:
    weights = holdings.set_index("symbol")["weight"]
    port_rets = portfolio_returns(returns, weights)
else:
    port_rets = None

port_rets.tail() if port_rets is not None else None


In [None]:
def risk_metrics(port_rets, benchmark_rets=None, rf_annual=0.03):
    if port_rets is None or port_rets.empty:
        return None
    rf_daily = (1 + rf_annual) ** (1/252) - 1
    excess = port_rets - rf_daily

    ann_return = port_rets.mean() * 252
    ann_vol = port_rets.std() * np.sqrt(252)
    sharpe = excess.mean() / port_rets.std() * np.sqrt(252) if port_rets.std() != 0 else np.nan

    # Drawdown
    cum = (1 + port_rets).cumprod()
    peak = cum.cummax()
    drawdown = (cum - peak) / peak
    max_dd = drawdown.min()

    results = {
        "annualized_return": ann_return,
        "annualized_volatility": ann_vol,
        "sharpe_ratio": sharpe,
        "max_drawdown": max_dd,
    }

    if benchmark_rets is not None and not benchmark_rets.empty:
        cov = np.cov(port_rets.align(benchmark_rets, join="inner")[0], benchmark_rets.align(port_rets, join="inner")[0])[0, 1]
        beta = cov / np.var(benchmark_rets) if np.var(benchmark_rets) != 0 else np.nan
        results["beta_vs_benchmark"] = beta

    return pd.Series(results)

if returns is not None and CONFIG["benchmark_symbol"] in returns.columns and port_rets is not None:
    bench = returns[CONFIG["benchmark_symbol"]].dropna()
else:
    bench = None

risk_metrics(port_rets, bench, CONFIG["risk_free_rate_annual"]) if port_rets is not None else None


## 10) Correlations and diversification

Compute correlation matrix and visualize it. Use this to identify clustering and diversification gaps.


In [None]:
def plot_correlation_matrix(returns_df, max_symbols=25):
    if returns_df is None or returns_df.empty:
        return
    # Limit to top positions by weight (if available)
    cols = list(returns_df.columns)
    if holdings is not None and "weight" in holdings.columns:
        top_syms = holdings.nlargest(max_symbols, "weight")["symbol"].tolist()
        cols = [c for c in cols if c in top_syms]
    corr = returns_df[cols].corr()
    plt.figure(figsize=(10, 8))
    sns.heatmap(corr, cmap="coolwarm", center=0, square=True)
    plt.title("Correlation Matrix")
    plt.tight_layout()
    plt.show()

plot_correlation_matrix(returns)


## 11) Risk contribution (marginal and total)

Quantify how much each position contributes to total portfolio risk.


In [None]:
def risk_contribution(returns_df, weights):
    if returns_df is None or returns_df.empty:
        return None
    cov = returns_df.cov() * 252
    w = weights.reindex(cov.index).fillna(0.0)
    port_var = np.dot(w, np.dot(cov, w))
    if port_var == 0:
        return None
    mrc = np.dot(cov, w) / np.sqrt(port_var)
    trc = w * mrc
    out = pd.DataFrame({
        "weight": w,
        "marginal_risk_contrib": mrc,
        "total_risk_contrib": trc,
        "total_risk_contrib_pct": trc / trc.sum()
    })
    return out.sort_values("total_risk_contrib_pct", ascending=False)

if returns is not None and holdings is not None and "weight" in holdings.columns:
    weights = holdings.set_index("symbol")["weight"]
    risk_contrib = risk_contribution(returns, weights)
else:
    risk_contrib = None

risk_contrib.head(10) if risk_contrib is not None else None


## 12) Transaction-based performance (TWR and MWR)

If you have transactions, you can compute time-weighted return (TWR) and money-weighted return (MWR/IRR).

This section is more advanced and depends on clean transaction data. If your Schwab CSV uses different action labels, update the classifier.


In [None]:
def classify_cash_flow(df):
    if df is None or df.empty or "action" not in df.columns:
        return None
    df = df.copy()
    # Simple heuristic: buys are negative cash flows, sells and dividends positive
    buys = df["action"].str.contains("buy", na=False)
    sells = df["action"].str.contains("sell", na=False)
    divs = df["action"].str.contains("div", na=False)
    contrib = df["action"].str.contains("contribution|deposit", na=False)
    withdraw = df["action"].str.contains("withdrawal", na=False)

    df["cash_flow"] = 0.0
    if "amount" in df.columns:
        df.loc[buys, "cash_flow"] = -df.loc[buys, "amount"]
        df.loc[sells | divs | contrib, "cash_flow"] = df.loc[sells | divs | contrib, "amount"]
        df.loc[withdraw, "cash_flow"] = -df.loc[withdraw, "amount"]
    return df

def compute_mwr(cash_flows):
    # Money-weighted return using IRR of cash flows
    if cash_flows is None or cash_flows.empty:
        return None
    cf = cash_flows.dropna(subset=["trade_date", "cash_flow"]).copy()
    cf = cf.sort_values("trade_date")
    # Aggregate by date
    cf = cf.groupby("trade_date")["cash_flow"].sum()
    # Add ending market value as a final positive flow (if available)
    if holdings is not None and "market_value" in holdings.columns:
        end_value = holdings["market_value"].sum()
        cf.loc[pd.Timestamp(datetime.today().date())] = cf.get(pd.Timestamp(datetime.today().date()), 0) + end_value
    # IRR approximation using numpy.irr on equally spaced periods
    try:
        irr = np.irr(cf.values)
    except Exception:
        irr = np.nan
    return irr

if transactions is not None:
    cf = classify_cash_flow(transactions)
    mwr = compute_mwr(cf)
else:
    cf = None
    mwr = None

mwr


## 13) Stress tests and scenarios

Run simple stress scenarios across holdings. Adjust the scenarios based on your risk tolerance.


In [None]:
def stress_test(df, shocks=None):
    if df is None or df.empty or "market_value" not in df.columns:
        return None
    if shocks is None:
        shocks = {
            "equity_down_10": -0.10,
            "equity_down_20": -0.20,
            "equity_down_30": -0.30,
        }
    total = df["market_value"].sum()
    out = {}
    for name, shock in shocks.items():
        out[name] = total * (1 + shock)
    return pd.Series(out)

stress_test(holdings) if holdings is not None else None


## 14) Rebalancing diagnostics

Define target weights and compute drift. This is optional but useful for disciplined portfolio management.


In [None]:
# Example target weights (customize to your strategy)
TARGET_WEIGHTS = {
    # "AAPL": 0.05,
    # "SPY": 0.30,
}

def rebalance_table(df, targets):
    if df is None or df.empty or "weight" not in df.columns:
        return None
    out = df.copy()
    out["target_weight"] = out["symbol"].map(targets).fillna(0.0)
    out["drift"] = out["weight"] - out["target_weight"]
    out = out.sort_values("drift", ascending=False)
    return out[["symbol", "weight", "target_weight", "drift", "market_value"]]

rebalance = rebalance_table(holdings, TARGET_WEIGHTS)
rebalance.head(10) if rebalance is not None else None


## 15) Exports

Export data tables and figures to `outputs/` for reporting.


In [None]:
def export_tables():
    if holdings is not None:
        holdings.to_csv(OUTPUT_DIR / "holdings_normalized.csv", index=False)
    if transactions is not None:
        transactions.to_csv(OUTPUT_DIR / "transactions_normalized.csv", index=False)
    if risk_contrib is not None:
        risk_contrib.to_csv(OUTPUT_DIR / "risk_contribution.csv", index=True)
    if rebalance is not None:
        rebalance.to_csv(OUTPUT_DIR / "rebalance.csv", index=False)
    print(f"Exports written to: {OUTPUT_DIR}")

# export_tables()


## 16) Next steps and customization

Ideas to make this even more comprehensive:
- Add tax lot analysis if you export lots from Schwab
- Incorporate dividends by mapping transactions to dividends
- Add factor exposures (value, momentum, quality)
- Integrate custom benchmarks or multi-asset indexes
- Build a rolling risk dashboard (30/90/252 day windows)

If you provide sample CSVs, this notebook can be tailored precisely to your export format.
