From c2fa37415ac937668e9ca6273ea502de2c594bec Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Mon, 6 Oct 2025 21:44:09 -0600 Subject: [PATCH] fix(loader): allow unbounded ingestion queue --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/__init__.py | 2 +- pyproject.toml | 2 +- tests/test_ingestion_stage.py | 35 ++++++++++++++++++++++------------- uv.lock | 2 +- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index c7c1846..2e96cd2 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "1.0.16" +version = "1.0.17" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index c9fd4b2..1f3a1bb 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -616,7 +616,7 @@ def _build_loader_orchestrator( from .pipeline.ingestion import IngestionStage from .pipeline.enrichment import EnrichmentStage - ingest_queue: IngestQueue = IngestQueue(maxsize=enrichment_workers * 2) + ingest_queue: IngestQueue = IngestQueue() persistence_queue: PersistenceQueue = PersistenceQueue() imdb_queue = imdb_config.retry_queue diff --git a/pyproject.toml b/pyproject.toml index f462eb4..be4dcac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "1.0.16" +version = "1.0.17" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_ingestion_stage.py b/tests/test_ingestion_stage.py index a101a2b..2d4fb9a 100644 --- a/tests/test_ingestion_stage.py +++ b/tests/test_ingestion_stage.py @@ -158,15 +158,16 @@ async def scenario() -> ValueError: assert str(error) == f"{expected_name} must be positive" -def test_ingestion_stage_backpressure_handling() -> None: +def test_ingestion_stage_queues_sample_batches_with_completion_tokens() -> None: async def scenario() -> tuple[ list[SampleBatch], None | SampleBatch, None | IngestSentinel, + bool, int, int, ]: - queue: asyncio.Queue = asyncio.Queue(maxsize=1) + queue: asyncio.Queue = asyncio.Queue() sample_items = [ _make_aggregated_item("1"), _make_aggregated_item("2"), @@ -181,28 +182,36 @@ async def scenario() -> tuple[ completion_sentinel=INGEST_DONE, ) - run_task = asyncio.create_task(stage.run()) - await asyncio.sleep(0) - assert run_task.done() is False + await stage.run() first_batch = await queue.get() - assert isinstance(first_batch, SampleBatch) - await asyncio.sleep(0) - second_batch = await queue.get() first_token = await queue.get() second_token = await queue.get() - await run_task - return [first_batch, second_batch], first_token, second_token, stage.items_ingested, stage.batches_ingested + return ( + [first_batch, second_batch], + first_token, + second_token, + queue.empty(), + stage.items_ingested, + stage.batches_ingested, + ) - batches, first_token, second_token, items_ingested, batches_ingested = asyncio.run( - scenario() - ) + ( + batches, + first_token, + second_token, + queue_empty, + items_ingested, + batches_ingested, + ) = asyncio.run(scenario()) + assert all(isinstance(batch, SampleBatch) for batch in batches) assert [len(batch.items) for batch in batches] == [1, 1] assert first_token is None assert second_token is INGEST_DONE + assert queue_empty is True assert items_ingested == 2 assert batches_ingested == 2 diff --git a/uv.lock b/uv.lock index af656ca..a988d14 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "1.0.16" +version = "1.0.17" source = { editable = "." } dependencies = [ { name = "fastapi" },