feat(infra): Wave 1 PR D — async aggregator + S3 TTL cache + per-vendor rate limiters#232
Merged
Merged
Conversation
…or rate limiters
Wave 1 PR D of the institutional data-revamp arc (plan doc:
~/Development/alpha-engine-docs/private/data-revamp-260513.md).
Wraps the sync NewsSource adapters (PR β) in an async fan-in pattern
using anyio.to_thread.run_sync — adapters stay synchronous (existing
tests + Lambda dispatch unchanged); aggregation becomes concurrent.
Adds a generic S3-backed TTL cache used by the async aggregator and
available to any future producer-side fetcher.
New modules:
data/cache.py
S3TtlCache(s3_client, bucket, prefix, default_ttl_seconds) class.
- get(key) -> bytes | None (returns None on missing OR expired)
- set(key, value, *, ttl_seconds) (idempotent overwrite)
- cached_call(key, *, compute_fn, ttl_seconds) (sync fetch-or-compute)
- cached_acall(key, *, async_compute_fn, ttl_seconds) (async variant)
- get_json / set_json convenience wrappers
Storage: s3://{bucket}/{prefix}/{sha1(key)}.bin with S3 metadata:
cache-key : original cache key (debug)
cache-cached-at : ISO-8601 UTC write timestamp
cache-ttl-seconds : applied TTL
cache-expires-at : ISO-8601 UTC expiry timestamp
Lazy eviction (expired entries NOT deleted on read — overwritten
on next .set). Entries without our metadata stamp are treated as
expired so external writes don't return stale or garbled content.
collectors/news_aggregator_async.py
AsyncNewsAggregator(sources, *, trust_weights,
per_source_concurrency, cache,
per_source_ttl_seconds, max_retry_attempts,
retry_initial_wait)
Architecture:
- anyio.create_task_group for parallel adapter fan-in
- anyio.Semaphore per vendor (DEFAULT_PER_SOURCE_CONCURRENCY:
polygon=2 / gdelt=1 / yahoo_rss=4 / edgar_press=2 / paid=4)
- tenacity AsyncRetrying with exp backoff (default 3 attempts,
2s/4s/8s wait, max 30s)
- Optional S3TtlCache; per-source TTLs default 30m-4h by vendor
cadence (polygon=30m, gdelt=1h, yahoo=1h, edgar=4h, paid=30m)
- Reuses sync NewsAggregator._dedup + DEFAULT_TRUST_WEIGHTS
verbatim so the canonical AggregatedNewsArticle output shape
is identical to the sync path
Adapter failure modes:
- Transient (timeout, 5xx): tenacity exp-backoff retry; if final
attempt fails returns empty list for that adapter
- Final-attempt failure: caught at task-group level, logs +
continues with remaining adapters (defense in depth matching
sync path)
- Cache deserialization failure: logs + treats as cache miss
(re-fetches from adapter)
+23 unit tests:
- Cache helpers: hash deterministic / handles special chars
- S3TtlCache: set→get round-trip / missing returns None / expired
returns None / entry-without-metadata treated as expired /
overwrite extends TTL / default TTL applied / get_json round-trip
/ get_json malformed returns None
- cached_call: miss calls compute + caches / hit skips compute
- cached_acall: same shape (async)
- Cache key (async): stable under ticker reorder / changes with
hours / changes with vendor
- Async aggregator: parallel fan-in combines sources / per-source
failure isolated / retry recovers from transient failure / retry
exhausted returns empty
- Caching: hit skips adapter / expiry re-runs adapter / isolated
per-tickers
- Semaphore concurrency: with concurrency=1, 2 same-named-vendor
adapters serialize (max in-flight = 1 verified via lock-tracked
counter)
Suite: 1003 passing (1 skipped).
Composes with:
- PR β (#226) — NewsSource Protocol + sync NewsAggregator
- PR A.1 / A.2 / A.3 — same NewsSource adapters
- PR α (lib v0.15.0) — NewsArticle shape
- alpha_engine_data already has anyio + tenacity in venv
What's deferred:
- Sources are still sync; if a vendor SDK becomes natively async,
add an isinstance check in _invoke_adapter to dispatch awaitably
- Adapters for analyst (AnalystSource) + filings (FilingSource)
don't have async aggregators yet — same pattern lifts trivially
when they're wired into Saturday SF fan-in
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Wave 1 PR D of the institutional data-revamp arc. Wraps the sync
NewsSourceadapters (PR β) in an async fan-in pattern usinganyio.to_thread.run_sync— adapters stay synchronous (existing tests + Lambda dispatch unchanged); aggregation becomes concurrent. Adds a generic S3-backed TTL cache available to all producer-side fetchers.What's in
Generic S3-backed TTL cache (
data/cache.py)S3TtlCache(s3_client, bucket, prefix, default_ttl_seconds)classget(key)— returns None on missing OR expiredset(key, value, *, ttl_seconds)— idempotent overwritecached_call(key, *, compute_fn, ttl_seconds)— sync fetch-or-computecached_acall(key, *, async_compute_fn, ttl_seconds)— async variantget_json/set_jsonconvenience wrapperss3://{bucket}/{prefix}/{sha1(key)}.bin+ S3 metadata (cache-key/cached-at/ttl-seconds/expires-at)Async news aggregator (
collectors/news_aggregator_async.py)AsyncNewsAggregator(sources, *, trust_weights, per_source_concurrency, cache, per_source_ttl_seconds, max_retry_attempts, retry_initial_wait)anyio.create_task_groupfor parallel adapter fan-inanyio.Semaphore(defaults: polygon=2, gdelt=1, yahoo_rss=4, edgar_press=2, paid=4)AsyncRetryingwith exp backoff (3 attempts, 2s/4s/8s, max 30s)NewsAggregator._dedup+DEFAULT_TRUST_WEIGHTSverbatim → identical canonical output shapeFailure modes
Test plan
tests/test_async_and_cache.py):cached_call+cached_acall: miss-computes-caches, hit-skips-compute🤖 Generated with Claude Code