From 6305b712db919a9e033fc4afe4b79e32565868f4 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 11:44:58 -0600 Subject: [PATCH] feat(loader): batch sample ingestion stage --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/pipeline/ingestion.py | 31 +++++++- pyproject.toml | 2 +- tests/test_ingestion_stage.py | 101 ++++++++++++++++++++++++-- uv.lock | 2 +- 5 files changed, 126 insertions(+), 12 deletions(-) diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 91b67bd..0215513 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "0.26.62" +version = "0.26.63" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/pipeline/ingestion.py b/mcp_plex/loader/pipeline/ingestion.py index c232934..a7b13fe 100644 --- a/mcp_plex/loader/pipeline/ingestion.py +++ b/mcp_plex/loader/pipeline/ingestion.py @@ -12,7 +12,7 @@ from typing import Sequence from ...common.types import AggregatedItem -from .channels import IngestQueue +from .channels import IngestQueue, SampleBatch, chunk_sequence class IngestionStage: @@ -37,6 +37,8 @@ def __init__( self._output_queue = output_queue self._completion_sentinel = completion_sentinel self._logger = logging.getLogger("mcp_plex.loader.ingestion") + self._items_ingested = 0 + self._batches_ingested = 0 @property def logger(self) -> logging.Logger: @@ -60,6 +62,18 @@ async def run(self) -> None: await self._output_queue.put(None) await self._output_queue.put(self._completion_sentinel) + @property + def items_ingested(self) -> int: + """Total number of items placed onto the ingest queue.""" + + return self._items_ingested + + @property + def batches_ingested(self) -> int: + """Total number of batches placed onto the ingest queue.""" + + return self._batches_ingested + async def _run_sample_ingestion(self, items: Sequence[AggregatedItem]) -> None: """Placeholder hook for the sample ingestion flow.""" @@ -68,6 +82,7 @@ async def _run_sample_ingestion(self, items: Sequence[AggregatedItem]) -> None: "Sample ingestion has not been ported yet; %d items queued for later.", item_count, ) + await self._enqueue_sample_batches(items) await asyncio.sleep(0) async def _run_plex_ingestion(self) -> None: @@ -78,3 +93,17 @@ async def _run_plex_ingestion(self) -> None: else: self._logger.info("Plex ingestion pending implementation.") await asyncio.sleep(0) + + async def _enqueue_sample_batches( + self, items: Sequence[AggregatedItem] + ) -> None: + """Place sample items onto the ingest queue in configured batch sizes.""" + + for chunk in chunk_sequence(items, self._sample_batch_size): + batch_items = list(chunk) + if not batch_items: + continue + + await self._output_queue.put(SampleBatch(items=batch_items)) + self._items_ingested += len(batch_items) + self._batches_ingested += 1 diff --git a/pyproject.toml b/pyproject.toml index bf582cb..441ab21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.62" +version = "0.26.63" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_ingestion_stage.py b/tests/test_ingestion_stage.py index b795fbe..b306ac5 100644 --- a/tests/test_ingestion_stage.py +++ b/tests/test_ingestion_stage.py @@ -1,7 +1,7 @@ import asyncio from mcp_plex.common.types import AggregatedItem, PlexItem -from mcp_plex.loader.pipeline.channels import INGEST_DONE +from mcp_plex.loader.pipeline.channels import INGEST_DONE, SampleBatch from mcp_plex.loader.pipeline.ingestion import IngestionStage @@ -34,18 +34,17 @@ async def scenario() -> str: assert logger_name == "mcp_plex.loader.ingestion" -def test_ingestion_stage_emits_completion_sentinels() -> None: +def test_ingestion_stage_sample_empty_batches() -> None: sentinel = object() - async def scenario() -> tuple[object | None, object | None, bool]: + async def scenario() -> tuple[object | None, object | None, bool, int, int]: queue: asyncio.Queue = asyncio.Queue() - sample_items = [_make_aggregated_item("1"), _make_aggregated_item("2")] stage = IngestionStage( plex_server=None, - sample_items=sample_items, + sample_items=[], movie_batch_size=1, episode_batch_size=1, - sample_batch_size=1, + sample_batch_size=2, output_queue=queue, completion_sentinel=sentinel, ) @@ -54,10 +53,96 @@ async def scenario() -> tuple[object | None, object | None, bool]: first = await queue.get() second = await queue.get() - return first, second, queue.empty() + return first, second, queue.empty(), stage.items_ingested, stage.batches_ingested - first, second, empty = asyncio.run(scenario()) + first, second, empty, items_ingested, batches_ingested = asyncio.run(scenario()) assert first is None assert second is sentinel assert empty is True + assert items_ingested == 0 + assert batches_ingested == 0 + + +def test_ingestion_stage_sample_partial_batches() -> None: + sentinel = object() + + async def scenario() -> tuple[list[SampleBatch], object | None, object | None, int, int]: + queue: asyncio.Queue = asyncio.Queue() + sample_items = [ + _make_aggregated_item("1"), + _make_aggregated_item("2"), + _make_aggregated_item("3"), + ] + stage = IngestionStage( + plex_server=None, + sample_items=sample_items, + movie_batch_size=1, + episode_batch_size=1, + sample_batch_size=2, + output_queue=queue, + completion_sentinel=sentinel, + ) + + await stage.run() + + batches = [await queue.get(), await queue.get()] + first_token = await queue.get() + second_token = await queue.get() + return batches, first_token, second_token, stage.items_ingested, stage.batches_ingested + + batches, first_token, second_token, items_ingested, batches_ingested = asyncio.run( + scenario() + ) + + assert all(isinstance(batch, SampleBatch) for batch in batches) + assert [len(batch.items) for batch in batches] == [2, 1] + assert first_token is None + assert second_token is sentinel + assert items_ingested == 3 + assert batches_ingested == 2 + + +def test_ingestion_stage_backpressure_handling() -> None: + sentinel = object() + + async def scenario() -> tuple[list[SampleBatch], object | None, object | None, int, int]: + queue: asyncio.Queue = asyncio.Queue(maxsize=1) + sample_items = [ + _make_aggregated_item("1"), + _make_aggregated_item("2"), + ] + stage = IngestionStage( + plex_server=None, + sample_items=sample_items, + movie_batch_size=1, + episode_batch_size=1, + sample_batch_size=1, + output_queue=queue, + completion_sentinel=sentinel, + ) + + run_task = asyncio.create_task(stage.run()) + await asyncio.sleep(0) + assert run_task.done() is False + + first_batch = await queue.get() + assert isinstance(first_batch, SampleBatch) + await asyncio.sleep(0) + + second_batch = await queue.get() + first_token = await queue.get() + second_token = await queue.get() + + await run_task + return [first_batch, second_batch], first_token, second_token, stage.items_ingested, stage.batches_ingested + + batches, first_token, second_token, items_ingested, batches_ingested = asyncio.run( + scenario() + ) + + assert [len(batch.items) for batch in batches] == [1, 1] + assert first_token is None + assert second_token is sentinel + assert items_ingested == 2 + assert batches_ingested == 2 diff --git a/uv.lock b/uv.lock index 69641bc..876026c 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.62" +version = "0.26.63" source = { editable = "." } dependencies = [ { name = "fastapi" },