Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Config (contains bucket names, API keys)
config.yaml

# Constituents cache — runtime-rebuilt by collectors/constituents.py;
# never committed (would race the live Wikipedia fetch + test pollution).
data/constituents_cache.csv

# flow-doctor.yaml was previously gitignored because it held inline credentials.
# As of flow-doctor 0.2.0 + FLOW_DOCTOR_* env var contract, all secrets come from
# the environment and the YAML contains only ${VAR} references — safe to commit.
Expand Down
47 changes: 41 additions & 6 deletions collectors/constituents.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,55 @@ def _fetch_constituents() -> tuple[list[str], dict[str, str], dict[str, str], in

tickers = list(dict.fromkeys(tickers)) # deduplicate, preserve order

# Update local cache
# Update local cache with full sector mapping so a future Wikipedia
# outage doesn't dead-end on the empty-sector-map raise in collect().
# Prior cache stored only ticker symbols; the 2026-05-11 partial
# outage exposed that gap (S&P 500 fetch succeeded, S&P 400 failed,
# fallback returned 903 symbols with zero sector data → raise).
_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
pd.DataFrame({"ticker": tickers}).to_csv(_CACHE_PATH, index=False)
pd.DataFrame({
"ticker": tickers,
"gics_sector": [sector_map.get(t, "") for t in tickers],
"sector_etf": [sector_etf_map.get(t, "") for t in tickers],
}).to_csv(_CACHE_PATH, index=False)

return tickers, sector_map, sector_etf_map, sp500_count, sp400_count

except Exception as e:
logger.warning("Wikipedia fetch failed (%s); trying local cache...", e)
if _CACHE_PATH.exists():
cached = pd.read_csv(_CACHE_PATH)["ticker"].tolist()
logger.info("Loaded %d tickers from cache", len(cached))
return cached, {}, {}, 0, 0
return _load_from_cache()


def _load_from_cache() -> tuple[list[str], dict[str, str], dict[str, str], int, int]:
"""Read the local cache and reconstruct ticker list + sector maps.

Backwards-compatible with the legacy ticker-only cache schema: missing
gics_sector / sector_etf columns return empty dicts (which then trip
collect()'s `Sector mapping incomplete` raise — failing loud rather
than writing constituents.json with missing data).
"""
if not _CACHE_PATH.exists():
logger.error("No cache found — cannot build universe")
return [], {}, {}, 0, 0
df = pd.read_csv(_CACHE_PATH)
tickers = df["ticker"].astype(str).tolist()
sector_map: dict[str, str] = {}
sector_etf_map: dict[str, str] = {}
if "gics_sector" in df.columns:
for ticker, sector in zip(tickers, df["gics_sector"].astype(str).tolist()):
sector = sector.strip()
if sector and sector.lower() != "nan":
sector_map[ticker] = sector
if "sector_etf" in df.columns:
for ticker, etf in zip(tickers, df["sector_etf"].astype(str).tolist()):
etf = etf.strip()
if etf and etf.lower() != "nan":
sector_etf_map[ticker] = etf
logger.info(
"Loaded %d tickers from cache (sector_map=%d, sector_etf_map=%d)",
len(tickers), len(sector_map), len(sector_etf_map),
)
return tickers, sector_map, sector_etf_map, 0, 0


def load_from_s3(bucket: str, s3_prefix: str = "market_data/") -> dict | None:
Expand Down
225 changes: 214 additions & 11 deletions tests/test_constituents_sector_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,229 @@ def fake_fetch():
constituents.collect(bucket="any", dry_run=True)


def test_fetch_raises_when_sector_column_missing() -> None:
"""If Wikipedia table column header changes, _fetch_constituents must raise."""
def test_fetch_raises_when_sector_column_missing(tmp_path, monkeypatch) -> None:
"""If Wikipedia table column header changes, _fetch_constituents must
fall through to the cache (or empty result) rather than silently
selecting a junk table."""
# Isolate the cache to a tmp path so prior tests' cache pollution
# doesn't disguise the missing-column signature.
monkeypatch.setattr(
constituents, "_CACHE_PATH", tmp_path / "constituents_cache.csv"
)
df = pd.DataFrame({"Symbol": ["AAPL"], "Industry": ["Tech"]}) # no GICS column
sp500_html = df.to_html(index=False)

def fake_get(url, **kwargs):
return _FakeResp(sp500_html)

with patch("collectors.constituents.requests.get", side_effect=fake_get):
# _fetch_constituents catches Exception and falls back to cache; we want
# to verify the raise happens BEFORE the broad except — the symptom is
# that no sector_map gets populated, which is exactly the cache-fallback
# signature. The downstream collect() check catches this case.
tickers, sector_map, _, _, _ = constituents._fetch_constituents()

# Either the cache fallback returned tickers with empty sector_map, or
# the cache was empty too. Either way: the post-loop validation in
# collect() will hard-fail on this state. That's the contract.
if tickers:
assert not sector_map or len(sector_map) < len(tickers)
# No tables matched the schema → empty result, which collect() then
# short-circuits with status=error before any S3 write.
assert tickers == []
assert sector_map == {}


def test_cache_persists_sector_map_and_etf(tmp_path, monkeypatch) -> None:
"""On a successful Wikipedia fetch the local cache must persist
ticker + GICS sector + sector ETF, so a future Wikipedia outage's
fallback returns a fully-populated sector_map (instead of empty,
which makes collect() raise 'Sector mapping incomplete')."""
cache_path = tmp_path / "constituents_cache.csv"
monkeypatch.setattr(constituents, "_CACHE_PATH", cache_path)

sp500_html = _fake_html(["AAPL", "MSFT"], ["Information Technology", "Information Technology"])
sp400_html = _fake_html(["JHG", "WSO"], ["Financials", "Industrials"])

def fake_get(url, **kwargs):
return _FakeResp(sp500_html if "500" in url else sp400_html)

with patch("collectors.constituents.requests.get", side_effect=fake_get):
constituents._fetch_constituents()

assert cache_path.exists()
cached = pd.read_csv(cache_path)
assert set(cached.columns) >= {"ticker", "gics_sector", "sector_etf"}
row_by_ticker = {r["ticker"]: r for _, r in cached.iterrows()}
assert row_by_ticker["AAPL"]["gics_sector"] == "Information Technology"
assert row_by_ticker["AAPL"]["sector_etf"] == "XLK"
assert row_by_ticker["JHG"]["gics_sector"] == "Financials"
assert row_by_ticker["JHG"]["sector_etf"] == "XLF"


def test_cache_fallback_returns_full_sector_map(tmp_path, monkeypatch) -> None:
"""When Wikipedia is unreachable, the cache fallback must return
populated sector_map + sector_etf_map so collect()'s coverage check
passes. Regression for the 2026-05-11 cascade: partial Wikipedia
outage → empty-cache fallback → collect() raise → SF cascade silent
MorningEnrich failure."""
cache_path = tmp_path / "constituents_cache.csv"
pd.DataFrame({
"ticker": ["AAPL", "MSFT", "JHG"],
"gics_sector": ["Information Technology", "Information Technology", "Financials"],
"sector_etf": ["XLK", "XLK", "XLF"],
}).to_csv(cache_path, index=False)
monkeypatch.setattr(constituents, "_CACHE_PATH", cache_path)

def fake_get(url, **kwargs):
raise RuntimeError("simulated Wikipedia outage")

with patch("collectors.constituents.requests.get", side_effect=fake_get):
tickers, sector_map, sector_etf_map, sp500_count, sp400_count = (
constituents._fetch_constituents()
)

assert tickers == ["AAPL", "MSFT", "JHG"]
assert sector_map == {
"AAPL": "Information Technology",
"MSFT": "Information Technology",
"JHG": "Financials",
}
assert sector_etf_map == {"AAPL": "XLK", "MSFT": "XLK", "JHG": "XLF"}
assert sp500_count == 0 and sp400_count == 0


def test_cache_fallback_handles_legacy_ticker_only_schema(tmp_path, monkeypatch) -> None:
"""Pre-existing caches on EC2 have only the `ticker` column. Reader
must tolerate that schema and return empty sector dicts (failing
loud in collect() rather than crashing inside _fetch_constituents)."""
cache_path = tmp_path / "constituents_cache.csv"
pd.DataFrame({"ticker": ["AAPL", "MSFT"]}).to_csv(cache_path, index=False)
monkeypatch.setattr(constituents, "_CACHE_PATH", cache_path)

def fake_get(url, **kwargs):
raise RuntimeError("simulated Wikipedia outage")

with patch("collectors.constituents.requests.get", side_effect=fake_get):
tickers, sector_map, sector_etf_map, _, _ = constituents._fetch_constituents()

assert tickers == ["AAPL", "MSFT"]
assert sector_map == {}
assert sector_etf_map == {}


def test_cache_fallback_missing_cache_returns_empty(tmp_path, monkeypatch) -> None:
"""No Wikipedia AND no cache → empty lists/dicts, not a crash. The
eventual `collect()` short-circuit ('No tickers fetched') handles
this state."""
cache_path = tmp_path / "constituents_cache.csv" # does not exist
monkeypatch.setattr(constituents, "_CACHE_PATH", cache_path)

def fake_get(url, **kwargs):
raise RuntimeError("simulated Wikipedia outage")

with patch("collectors.constituents.requests.get", side_effect=fake_get):
tickers, sector_map, sector_etf_map, _, _ = constituents._fetch_constituents()

assert tickers == []
assert sector_map == {} and sector_etf_map == {}


def test_select_constituents_table_skips_banner_table() -> None:
"""Wikipedia adds banner/disambiguation tables ahead of the constituents
table without notice. 2026-05-11 incident: the S&P 400 page inserted a
1-row, 2-column disambiguation-warning table at index 0, making
`tables[0]` return columns ``[0, 1]`` instead of the constituents
table at index 1. _select_constituents_table must scan for the right
one by columns, not position.
"""
banner_df = pd.DataFrame({0: [float("nan")], 1: ["This article currently links to a large number of disambiguation pages."]})
constituents_df = pd.DataFrame({
"Symbol": [f"T{i:03d}" for i in range(100)],
"Security": [f"Co {i}" for i in range(100)],
"GICS Sector": ["Industrials"] * 100,
"GICS Sub-Industry": ["Misc"] * 100,
})
sub_industry_only_df = pd.DataFrame({
"Symbol": ["X"], "GICS Sub-Industry": ["Subindustry-Only"],
})

picked = constituents._select_constituents_table(
[banner_df, constituents_df, sub_industry_only_df], "S&P 400"
)
assert list(picked.columns) == [
"Symbol", "Security", "GICS Sector", "GICS Sub-Industry"
]
assert len(picked) == 100


def test_select_constituents_table_raises_when_no_match() -> None:
"""If no table on the page has the expected ticker + GICS sector shape,
raise loudly rather than silently picking a junk table."""
banner_df = pd.DataFrame({0: [1], 1: ["banner"]})
nav_df = pd.DataFrame({"vteFoo": ["a"], "vteBar": ["b"]})

with pytest.raises(RuntimeError, match="No constituents table found"):
constituents._select_constituents_table([banner_df, nav_df], "S&P 400")


def test_select_constituents_table_picks_largest_candidate() -> None:
"""When multiple tables have Symbol + GICS Sector columns (e.g. a small
example/docs table alongside the live roster), pick the largest. On the
real S&P 500/400 pages the roster is the only such table; the largest-
wins rule is a defense-in-depth tiebreaker if Wikipedia ever adds an
additional schema-matching table."""
small_df = pd.DataFrame({
"Symbol": ["X", "Y"],
"GICS Sector": ["Energy", "Energy"],
})
real_df = pd.DataFrame({
"Symbol": [f"T{i}" for i in range(100)],
"GICS Sector": ["Industrials"] * 100,
})
picked = constituents._select_constituents_table([small_df, real_df], "S&P 500")
assert len(picked) == 100


def test_select_constituents_table_flattens_multiindex() -> None:
"""Wikipedia occasionally returns multi-level column headers. The
selector must flatten them before matching."""
df = pd.DataFrame({
("Stock", "Symbol"): [f"T{i}" for i in range(100)],
("Classification", "GICS Sector"): ["Industrials"] * 100,
})
df.columns = pd.MultiIndex.from_tuples(df.columns)
picked = constituents._select_constituents_table([df], "S&P 500")
assert any("symbol" in str(c).lower() for c in picked.columns)
assert any(
"gics" in str(c).lower() and "sector" in str(c).lower()
for c in picked.columns
)


def test_fetch_constituents_handles_banner_table_at_index_0() -> None:
"""End-to-end: Wikipedia page with a banner table at index 0 followed by
the constituents table at index 1 must still produce a populated
sector_map. Regression for the 2026-05-11 silent-MorningEnrich incident."""
banner_df = pd.DataFrame({0: [float("nan")], 1: ["disambiguation banner"]})
sp500_df = pd.DataFrame({
"Symbol": [f"S5{i:02d}" for i in range(60)],
"GICS Sector": ["Information Technology"] * 60,
})
sp400_df = pd.DataFrame({
"Symbol": [f"S4{i:02d}" for i in range(60)],
"GICS Sector": ["Industrials"] * 60,
})

sp500_html = banner_df.to_html(index=False) + sp500_df.to_html(index=False)
sp400_html = banner_df.to_html(index=False) + sp400_df.to_html(index=False)

def fake_get(url, **kwargs):
return _FakeResp(sp500_html if "500" in url else sp400_html)

with patch("collectors.constituents.requests.get", side_effect=fake_get):
tickers, sector_map, sector_etf_map, sp500_count, sp400_count = (
constituents._fetch_constituents()
)

assert sp500_count == 60
assert sp400_count == 60
assert len(sector_map) == 120
assert sector_map["S500"] == "Information Technology"
assert sector_map["S400"] == "Industrials"
assert sector_etf_map["S500"] == "XLK"
assert sector_etf_map["S400"] == "XLI"


def test_select_constituents_table_skips_banner_table() -> None:
Expand Down
Loading