# Portfolio Look-Through Proof of Concept
Using Financial Modeling Prep (FMP) API

This notebook demonstrates a minimal but realistic core for a portfolio
look-through application. It:
1. Accepts a user portfolio with flexible inputs
2. Normalizes holdings to dollar exposure
3. Explodes ETFs into underlying constituents
4. Enriches companies with sector, industry, and country
5. Produces a unified exposure table suitable for slicing & dicing

In [12]:
import requests
import pandas as pd
from typing import Dict, List

In [13]:
# --- CODE CELL 1: Imports + Config + Functions (put functions first) ---

import requests
import pandas as pd
from typing import Dict, Any

# =========================
# Config
# =========================
API_KEY = "yy9SAIQnhX9wB2ABVxXaRA9MxNxeLXPO"
BASE_URL = "https://financialmodelingprep.com/stable"

def fmp_get(endpoint: str, params: Dict[str, Any] | None = None):
    """Simple GET wrapper. Adds apikey automatically."""
    params = {} if params is None else dict(params)
    params["apikey"] = API_KEY
    r = requests.get(f"{BASE_URL}/{endpoint}", params=params, timeout=30)
    r.raise_for_status()
    return r.json()

def get_quote_short_prices(tickers: pd.Series) -> Dict[str, float]:
    """Fetches latest price per ticker using /quote-short."""
    prices = {}
    for t in tickers.dropna().unique():
        q = fmp_get("quote-short", {"symbol": t})
        # FMP returns a list; empty list can happen for invalid symbols
        prices[t] = q[0]["price"] if q else None
    return prices

def get_profiles(tickers: pd.Series) -> Dict[str, Dict[str, Any]]:
    """Fetches /profile for each ticker. Returns dict keyed by ticker."""
    out = {}
    for t in tickers.dropna().unique():
        try:
            p = fmp_get("profile", {"symbol": t})
            out[t] = p[0] if p else {}
        except Exception:
            out[t] = {}
    return out

def normalize_portfolio_inputs(df: pd.DataFrame, total_portfolio_value: float | None = None) -> pd.DataFrame:
    """
    Inputs supported (per row):
      - ticker (required)
      - percent (optional; 0.0 to 1.0)
      - shares (optional)
      - price_per_share (optional)
      - dollars (optional)
    Normalizes to:
      - price (market price if needed)
      - position_value (dollars exposure)
    """
    df = df.copy()
    df.columns = [c.strip().lower() for c in df.columns]
    if "ticker" not in df.columns:
        raise ValueError("Input must include a 'ticker' column.")
    df["ticker"] = df["ticker"].astype(str).str.upper().str.strip()

    # Market price only needed if shares are provided but price_per_share isn't
    needs_market_price = df["shares"].notna() & df.get("price_per_share", pd.Series([None]*len(df))).isna()
    tickers_needing_price = df.loc[needs_market_price, "ticker"]

    market_prices = get_quote_short_prices(tickers_needing_price) if len(tickers_needing_price) else {}
    df["market_price"] = df["ticker"].map(market_prices)

    # Resolve row price
    if "price_per_share" not in df.columns:
        df["price_per_share"] = pd.NA
    df["price"] = df["price_per_share"].fillna(df["market_price"])

    # Compute position_value
    def compute_value(row):
        if pd.notna(row.get("dollars")):
            return float(row["dollars"])
        if pd.notna(row.get("shares")):
            if pd.isna(row.get("price")):
                return None
            return float(row["shares"]) * float(row["price"])
        if pd.notna(row.get("percent")):
            if total_portfolio_value is None:
                raise ValueError("total_portfolio_value must be provided if any row uses 'percent'.")
            return float(row["percent"]) * float(total_portfolio_value)
        return None

    df["position_value"] = df.apply(compute_value, axis=1)

    # Basic validation
    if df["position_value"].isna().any():
        bad = df[df["position_value"].isna()][["ticker", "percent", "shares", "price_per_share", "dollars"]]
        raise ValueError(f"Could not compute position_value for some rows:\n{bad.to_string(index=False)}")

    return df[["ticker", "percent", "shares", "price", "dollars", "position_value"]]

def classify_is_etf(df: pd.DataFrame) -> pd.DataFrame:
    """Adds is_etf using /profile (field: isEtf)."""
    df = df.copy()
    profiles = get_profiles(df["ticker"])
    df["is_etf"] = df["ticker"].map(lambda t: bool(profiles.get(t, {}).get("isEtf", False)))
    df["asset_type"] = df["is_etf"].map(lambda x: "ETF" if x else "Stock")
    return df

def explode_lookthrough(portfolio_df: pd.DataFrame) -> pd.DataFrame:
    """
    Returns a unified exposure table with:
      source_ticker: original holding in portfolio
      underlying_ticker: underlying exposure (same as source if not ETF)
      exposure_value: $ exposure
      source_type: ETF / Stock
    For ETFs: uses /etf/holdings (asset, weightPercentage).
    """
    rows = []
    for _, r in portfolio_df.iterrows():
        src = r["ticker"]
        src_type = r["asset_type"]
        pv = float(r["position_value"])

        if not r["is_etf"]:
            rows.append({
                "source_ticker": src,
                "source_type": src_type,
                "underlying_ticker": src,
                "exposure_value": pv
            })
        else:
            holdings = fmp_get("etf/holdings", {"symbol": src})
            # If holdings empty, keep as unresolved ETF exposure (still useful for debugging)
            if not holdings:
                rows.append({
                    "source_ticker": src,
                    "source_type": src_type,
                    "underlying_ticker": None,
                    "exposure_value": pv
                })
                continue

            for h in holdings:
                asset = h.get("asset")
                wt = h.get("weightPercentage")
                if asset is None or wt is None:
                    continue
                rows.append({
                    "source_ticker": src,
                    "source_type": src_type,
                    "underlying_ticker": str(asset).upper().strip(),
                    "exposure_value": pv * float(wt) / 100.0
                })

    return pd.DataFrame(rows)

def enrich_underlyings(exposure_df: pd.DataFrame) -> pd.DataFrame:
    """Adds company_name, sector, industry, country for underlying_ticker using /profile."""
    df = exposure_df.copy()
    tickers = df["underlying_ticker"].dropna()
    profiles = get_profiles(tickers)

    def pick(t, key):
        return profiles.get(t, {}).get(key)

    df["company_name"] = df["underlying_ticker"].map(lambda t: pick(t, "companyName"))
    df["sector"] = df["underlying_ticker"].map(lambda t: pick(t, "sector"))
    df["industry"] = df["underlying_ticker"].map(lambda t: pick(t, "industry"))
    df["country"] = df["underlying_ticker"].map(lambda t: pick(t, "country"))
    return df

def build_slices(exposures: pd.DataFrame):
    """Convenience: returns (by_company, by_sector, by_country, by_source_vehicle)."""
    by_company = (
        exposures.dropna(subset=["underlying_ticker"])
        .groupby(["underlying_ticker", "company_name"], dropna=False)
        .agg(total_exposure=("exposure_value", "sum"))
        .sort_values("total_exposure", ascending=False)
    )

    by_sector = (
        exposures.groupby("sector", dropna=False)
        .agg(total_exposure=("exposure_value", "sum"))
        .sort_values("total_exposure", ascending=False)
    )

    by_country = (
        exposures.groupby("country", dropna=False)
        .agg(total_exposure=("exposure_value", "sum"))
        .sort_values("total_exposure", ascending=False)
    )

    by_source_vehicle = (
        exposures.groupby(["source_ticker", "source_type"], dropna=False)
        .agg(total_exposure=("exposure_value", "sum"))
        .sort_values("total_exposure", ascending=False)
    )

    return by_company, by_sector, by_country, by_source_vehicle

In [14]:
# --- CODE CELL 2: Input + Run Pipeline + Outputs ---

# Example portfolio input (replace with CSV later)
portfolio_input = pd.DataFrame([
    {"ticker": "AAPL", "shares": 20},
    {"ticker": "MSFT", "dollars": 3000},
    {"ticker": "SPY", "percent": 0.40},
])

TOTAL_PORTFOLIO_VALUE = 20000  # required if using percent weights

# 1) Normalize
portfolio = normalize_portfolio_inputs(portfolio_input, total_portfolio_value=TOTAL_PORTFOLIO_VALUE)

# 2) Classify ETF vs stock
portfolio = classify_is_etf(portfolio)

# 3) Look-through explosion
lookthrough = explode_lookthrough(portfolio)

# 4) Enrich underlyings
exposures = enrich_underlyings(lookthrough)

# 5) Slices
by_company, by_sector, by_country, by_source_vehicle = build_slices(exposures)

print("Portfolio (normalized):")
display(portfolio)

print("\nUnified Look-Through Exposures (sample):")
display(exposures.sort_values("exposure_value", ascending=False).head(25))

print("\nExposure by Company:")
display(by_company.head(25))

print("\nExposure by Sector:")
display(by_sector)

print("\nExposure by Country:")
display(by_country)

print("\nExposure by Source Vehicle (ETF vs Stock):")
display(by_source_vehicle)

  df["price"] = df["price_per_share"].fillna(df["market_price"])


HTTPError: 402 Client Error: Payment Required for url: https://financialmodelingprep.com/stable/etf/holdings?symbol=SPY&apikey=yy9SAIQnhX9wB2ABVxXaRA9MxNxeLXPO