-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add loader orchestrator builder #103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Coverage Report
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting
| async def _upsert_aggregated( | ||
| batch: Sequence[AggregatedItem], | ||
| ) -> None: | ||
| if not batch: | ||
| return | ||
| items.extend(batch) | ||
| points = [ | ||
| build_point(item, dense_model_name, sparse_model_name) | ||
| for item in batch | ||
| ] | ||
| await _upsert_in_batches( | ||
| client, | ||
| collection_name, | ||
| points, | ||
| retry_queue=qdrant_retry_queue, | ||
| ) | ||
|
|
||
| def _record_upsert(worker_id: int, batch_size: int, queue_size: int) -> None: | ||
| nonlocal upserted, upsert_start | ||
| if upserted == 0: | ||
| upsert_start = time.perf_counter() | ||
| upserted += batch_size | ||
| elapsed = time.perf_counter() - upsert_start | ||
| rate = upserted / elapsed if elapsed > 0 else 0.0 | ||
| logger.info( | ||
| "Upsert worker %d processed %d items (%.2f items/sec, queue size=%d)", | ||
| worker_id, | ||
| upserted, | ||
| rate, | ||
| queue_size, | ||
| ) | ||
|
|
||
| ingestion_stage = IngestionStage( | ||
| plex_server=plex_server, | ||
| sample_items=sample_items, | ||
| movie_batch_size=plex_chunk_size, | ||
| episode_batch_size=plex_chunk_size, | ||
| sample_batch_size=enrichment_batch_size, | ||
| output_queue=ingest_queue, | ||
| completion_sentinel=INGEST_DONE, | ||
| ) | ||
|
|
||
| enrichment_stage = EnrichmentStage( | ||
| http_client_factory=lambda: httpx.AsyncClient(timeout=30), | ||
| tmdb_api_key=tmdb_api_key or "", | ||
| ingest_queue=ingest_queue, | ||
| persistence_queue=persistence_queue, | ||
| imdb_retry_queue=_imdb_retry_queue, | ||
| movie_batch_size=enrichment_batch_size, | ||
| episode_batch_size=enrichment_batch_size, | ||
| imdb_cache=_imdb_cache, | ||
| imdb_max_retries=_imdb_max_retries, | ||
| imdb_backoff=_imdb_backoff, | ||
| imdb_batch_limit=_imdb_batch_limit, | ||
| imdb_requests_per_window=_imdb_requests_per_window, | ||
| imdb_window_seconds=_imdb_window_seconds, | ||
| ) | ||
|
|
||
| persistence_stage = _PersistenceStage( | ||
| client=client, | ||
| collection_name=collection_name, | ||
| dense_vector_name=dense_model_name, | ||
| sparse_vector_name=sparse_model_name, | ||
| persistence_queue=persistence_queue, | ||
| retry_queue=qdrant_retry_queue, | ||
| upsert_semaphore=upsert_capacity, | ||
| upsert_buffer_size=upsert_buffer_size, | ||
| upsert_fn=_upsert_aggregated, | ||
| on_batch_complete=_record_upsert, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Treat retry payloads as AggregatedItem batches
The new orchestrator wiring feeds EnrichmentStage’s aggregated items directly into PersistenceStage and converts them to Qdrant points inside _upsert_aggregated. That works for the first attempt, but when _upsert_in_batches fails it enqueues the raw PointStruct batch onto the shared retry queue. On shutdown PersistenceStage._flush_retry_queue() re-enqueues those point batches via enqueue_points, after which the persistence workers call _upsert_aggregated again. _upsert_aggregated always assumes batch contains AggregatedItem instances and passes each element to build_point, so any retried payload will raise attribute errors and the failed points can never be persisted. Any upsert failure therefore permanently breaks the retry path. The upsert helper needs to detect already-built PointStructs or reuse PersistenceStage’s native point-based enqueueing instead.
Useful? React with 👍 / 👎.
What
_build_loader_orchestratorhelper that wires the staged ingestion, enrichment, and persistence componentsWhy
Affects
Testing
Documentation
https://chatgpt.com/codex/tasks/task_e_68e32f8f3fd88328a4e030e88d7a9c95