From 5ef6ae98a9623651cd9cd9b3d4b2f820cba1e946 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 21:00:05 -0600 Subject: [PATCH] feat(loader): add orchestrator builder --- mcp_plex/loader/__init__.py | 116 ++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index 01dc003..bb21520 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -1351,6 +1351,122 @@ def _handle_upsert_batch( del LoaderPipeline +def _build_loader_orchestrator( + *, + client: AsyncQdrantClient, + collection_name: str, + dense_model_name: str, + sparse_model_name: str, + tmdb_api_key: str | None, + sample_items: Sequence[AggregatedItem] | None, + plex_server: PlexServer | None, + plex_chunk_size: int, + enrichment_batch_size: int, + enrichment_workers: int, + upsert_buffer_size: int, + max_concurrent_upserts: int, +) -> tuple[LoaderOrchestrator, list[AggregatedItem], asyncio.Queue[list[models.PointStruct]]]: + """Wire the staged loader pipeline and return the orchestrator helpers.""" + + from .pipeline.ingestion import IngestionStage + from .pipeline.enrichment import EnrichmentStage + + ingest_queue: IngestQueue = IngestQueue(maxsize=enrichment_workers * 2) + persistence_queue: PersistenceQueue = PersistenceQueue() + + global _imdb_retry_queue + if _imdb_retry_queue is None: + _imdb_retry_queue = _IMDbRetryQueue() + + upsert_capacity = asyncio.Semaphore(max_concurrent_upserts) + qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = asyncio.Queue() + items: list[AggregatedItem] = [] + upserted = 0 + upsert_start = time.perf_counter() + + async def _upsert_aggregated( + batch: Sequence[AggregatedItem], + ) -> None: + if not batch: + return + items.extend(batch) + points = [ + build_point(item, dense_model_name, sparse_model_name) + for item in batch + ] + await _upsert_in_batches( + client, + collection_name, + points, + retry_queue=qdrant_retry_queue, + ) + + def _record_upsert(worker_id: int, batch_size: int, queue_size: int) -> None: + nonlocal upserted, upsert_start + if upserted == 0: + upsert_start = time.perf_counter() + upserted += batch_size + elapsed = time.perf_counter() - upsert_start + rate = upserted / elapsed if elapsed > 0 else 0.0 + logger.info( + "Upsert worker %d processed %d items (%.2f items/sec, queue size=%d)", + worker_id, + upserted, + rate, + queue_size, + ) + + ingestion_stage = IngestionStage( + plex_server=plex_server, + sample_items=sample_items, + movie_batch_size=plex_chunk_size, + episode_batch_size=plex_chunk_size, + sample_batch_size=enrichment_batch_size, + output_queue=ingest_queue, + completion_sentinel=INGEST_DONE, + ) + + enrichment_stage = EnrichmentStage( + http_client_factory=lambda: httpx.AsyncClient(timeout=30), + tmdb_api_key=tmdb_api_key or "", + ingest_queue=ingest_queue, + persistence_queue=persistence_queue, + imdb_retry_queue=_imdb_retry_queue, + movie_batch_size=enrichment_batch_size, + episode_batch_size=enrichment_batch_size, + imdb_cache=_imdb_cache, + imdb_max_retries=_imdb_max_retries, + imdb_backoff=_imdb_backoff, + imdb_batch_limit=_imdb_batch_limit, + imdb_requests_per_window=_imdb_requests_per_window, + imdb_window_seconds=_imdb_window_seconds, + ) + + persistence_stage = _PersistenceStage( + client=client, + collection_name=collection_name, + dense_vector_name=dense_model_name, + sparse_vector_name=sparse_model_name, + persistence_queue=persistence_queue, + retry_queue=qdrant_retry_queue, + upsert_semaphore=upsert_capacity, + upsert_buffer_size=upsert_buffer_size, + upsert_fn=_upsert_aggregated, + on_batch_complete=_record_upsert, + ) + + orchestrator = LoaderOrchestrator( + ingestion_stage=ingestion_stage, + enrichment_stage=enrichment_stage, + persistence_stage=persistence_stage, + ingest_queue=ingest_queue, + persistence_queue=persistence_queue, + persistence_worker_count=max_concurrent_upserts, + ) + + return orchestrator, items, persistence_stage.retry_queue + + def __getattr__(name: str) -> Any: """Provide deprecated access to :class:`LoaderOrchestrator`."""