Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "0.26.62"
version = "0.26.63"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
31 changes: 30 additions & 1 deletion mcp_plex/loader/pipeline/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import Sequence

from ...common.types import AggregatedItem
from .channels import IngestQueue
from .channels import IngestQueue, SampleBatch, chunk_sequence


class IngestionStage:
Expand All @@ -37,6 +37,8 @@ def __init__(
self._output_queue = output_queue
self._completion_sentinel = completion_sentinel
self._logger = logging.getLogger("mcp_plex.loader.ingestion")
self._items_ingested = 0
self._batches_ingested = 0

@property
def logger(self) -> logging.Logger:
Expand All @@ -60,6 +62,18 @@ async def run(self) -> None:
await self._output_queue.put(None)
await self._output_queue.put(self._completion_sentinel)

@property
def items_ingested(self) -> int:
"""Total number of items placed onto the ingest queue."""

return self._items_ingested

@property
def batches_ingested(self) -> int:
"""Total number of batches placed onto the ingest queue."""

return self._batches_ingested

async def _run_sample_ingestion(self, items: Sequence[AggregatedItem]) -> None:
"""Placeholder hook for the sample ingestion flow."""

Expand All @@ -68,6 +82,7 @@ async def _run_sample_ingestion(self, items: Sequence[AggregatedItem]) -> None:
"Sample ingestion has not been ported yet; %d items queued for later.",
item_count,
)
await self._enqueue_sample_batches(items)
await asyncio.sleep(0)

async def _run_plex_ingestion(self) -> None:
Expand All @@ -78,3 +93,17 @@ async def _run_plex_ingestion(self) -> None:
else:
self._logger.info("Plex ingestion pending implementation.")
await asyncio.sleep(0)

async def _enqueue_sample_batches(
self, items: Sequence[AggregatedItem]
) -> None:
"""Place sample items onto the ingest queue in configured batch sizes."""

for chunk in chunk_sequence(items, self._sample_batch_size):
batch_items = list(chunk)
if not batch_items:
continue

await self._output_queue.put(SampleBatch(items=batch_items))
self._items_ingested += len(batch_items)
self._batches_ingested += 1
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.62"
version = "0.26.63"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
101 changes: 93 additions & 8 deletions tests/test_ingestion_stage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio

from mcp_plex.common.types import AggregatedItem, PlexItem
from mcp_plex.loader.pipeline.channels import INGEST_DONE
from mcp_plex.loader.pipeline.channels import INGEST_DONE, SampleBatch
from mcp_plex.loader.pipeline.ingestion import IngestionStage


Expand Down Expand Up @@ -34,18 +34,17 @@ async def scenario() -> str:
assert logger_name == "mcp_plex.loader.ingestion"


def test_ingestion_stage_emits_completion_sentinels() -> None:
def test_ingestion_stage_sample_empty_batches() -> None:
sentinel = object()

async def scenario() -> tuple[object | None, object | None, bool]:
async def scenario() -> tuple[object | None, object | None, bool, int, int]:
queue: asyncio.Queue = asyncio.Queue()
sample_items = [_make_aggregated_item("1"), _make_aggregated_item("2")]
stage = IngestionStage(
plex_server=None,
sample_items=sample_items,
sample_items=[],
movie_batch_size=1,
episode_batch_size=1,
sample_batch_size=1,
sample_batch_size=2,
output_queue=queue,
completion_sentinel=sentinel,
)
Expand All @@ -54,10 +53,96 @@ async def scenario() -> tuple[object | None, object | None, bool]:

first = await queue.get()
second = await queue.get()
return first, second, queue.empty()
return first, second, queue.empty(), stage.items_ingested, stage.batches_ingested

first, second, empty = asyncio.run(scenario())
first, second, empty, items_ingested, batches_ingested = asyncio.run(scenario())

assert first is None
assert second is sentinel
assert empty is True
assert items_ingested == 0
assert batches_ingested == 0


def test_ingestion_stage_sample_partial_batches() -> None:
sentinel = object()

async def scenario() -> tuple[list[SampleBatch], object | None, object | None, int, int]:
queue: asyncio.Queue = asyncio.Queue()
sample_items = [
_make_aggregated_item("1"),
_make_aggregated_item("2"),
_make_aggregated_item("3"),
]
stage = IngestionStage(
plex_server=None,
sample_items=sample_items,
movie_batch_size=1,
episode_batch_size=1,
sample_batch_size=2,
output_queue=queue,
completion_sentinel=sentinel,
)

await stage.run()

batches = [await queue.get(), await queue.get()]
first_token = await queue.get()
second_token = await queue.get()
return batches, first_token, second_token, stage.items_ingested, stage.batches_ingested

batches, first_token, second_token, items_ingested, batches_ingested = asyncio.run(
scenario()
)

assert all(isinstance(batch, SampleBatch) for batch in batches)
assert [len(batch.items) for batch in batches] == [2, 1]
assert first_token is None
assert second_token is sentinel
assert items_ingested == 3
assert batches_ingested == 2


def test_ingestion_stage_backpressure_handling() -> None:
sentinel = object()

async def scenario() -> tuple[list[SampleBatch], object | None, object | None, int, int]:
queue: asyncio.Queue = asyncio.Queue(maxsize=1)
sample_items = [
_make_aggregated_item("1"),
_make_aggregated_item("2"),
]
stage = IngestionStage(
plex_server=None,
sample_items=sample_items,
movie_batch_size=1,
episode_batch_size=1,
sample_batch_size=1,
output_queue=queue,
completion_sentinel=sentinel,
)

run_task = asyncio.create_task(stage.run())
await asyncio.sleep(0)
assert run_task.done() is False

first_batch = await queue.get()
assert isinstance(first_batch, SampleBatch)
await asyncio.sleep(0)

second_batch = await queue.get()
first_token = await queue.get()
second_token = await queue.get()

await run_task
return [first_batch, second_batch], first_token, second_token, stage.items_ingested, stage.batches_ingested

batches, first_token, second_token, items_ingested, batches_ingested = asyncio.run(
scenario()
)

assert [len(batch.items) for batch in batches] == [1, 1]
assert first_token is None
assert second_token is sentinel
assert items_ingested == 2
assert batches_ingested == 2
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.