From d333f189daace1bfbb9d78d5b1834757c42f3e9f Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 11:53:41 -0600 Subject: [PATCH] feat(loader): add plex ingestion helper batches --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/pipeline/ingestion.py | 78 +++++++++++++++++++- pyproject.toml | 2 +- tests/test_ingestion_stage.py | 100 +++++++++++++++++++++++++- uv.lock | 2 +- 5 files changed, 178 insertions(+), 6 deletions(-) diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 0215513..d70bd12 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "0.26.63" +version = "0.26.64" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/pipeline/ingestion.py b/mcp_plex/loader/pipeline/ingestion.py index a7b13fe..dc56b0e 100644 --- a/mcp_plex/loader/pipeline/ingestion.py +++ b/mcp_plex/loader/pipeline/ingestion.py @@ -12,7 +12,14 @@ from typing import Sequence from ...common.types import AggregatedItem -from .channels import IngestQueue, SampleBatch, chunk_sequence +from .channels import ( + EpisodeBatch, + IngestQueue, + MovieBatch, + SampleBatch, + chunk_sequence, + _chunk_sequence, +) class IngestionStage: @@ -91,9 +98,76 @@ async def _run_plex_ingestion(self) -> None: if self._plex_server is None: self._logger.warning("Plex server unavailable; skipping ingestion.") else: - self._logger.info("Plex ingestion pending implementation.") + await self._ingest_plex( + plex_server=self._plex_server, + movie_batch_size=self._movie_batch_size, + episode_batch_size=self._episode_batch_size, + output_queue=self._output_queue, + logger=self._logger, + ) await asyncio.sleep(0) + async def _ingest_plex( + self, + *, + plex_server: object, + movie_batch_size: int, + episode_batch_size: int, + output_queue: IngestQueue, + logger: logging.Logger, + ) -> None: + """Retrieve Plex media and place batches onto *output_queue*.""" + + movies_attr = getattr(plex_server, "movies", []) + movies_source = movies_attr() if callable(movies_attr) else movies_attr + movies = list(movies_source) + for batch_index, chunk in enumerate( + _chunk_sequence(movies, movie_batch_size), start=1 + ): + batch_movies = list(chunk) + if not batch_movies: + continue + + batch = MovieBatch(movies=batch_movies) + await output_queue.put(batch) + self._items_ingested += len(batch_movies) + self._batches_ingested += 1 + logger.info( + "Queued Plex movie batch %d with %d movies (total items=%d).", + batch_index, + len(batch_movies), + self._items_ingested, + ) + + shows_attr = getattr(plex_server, "shows", []) + shows_source = shows_attr() if callable(shows_attr) else shows_attr + shows = list(shows_source) + for show in shows: + show_title = getattr(show, "title", str(show)) + episodes_attr = getattr(show, "episodes", []) + episodes_source = ( + episodes_attr() if callable(episodes_attr) else episodes_attr + ) + episodes = list(episodes_source) + for batch_index, chunk in enumerate( + _chunk_sequence(episodes, episode_batch_size), start=1 + ): + batch_episodes = list(chunk) + if not batch_episodes: + continue + + batch = EpisodeBatch(show=show, episodes=batch_episodes) + await output_queue.put(batch) + self._items_ingested += len(batch_episodes) + self._batches_ingested += 1 + logger.info( + "Queued Plex episode batch %d for %s with %d episodes (total items=%d).", + batch_index, + show_title, + len(batch_episodes), + self._items_ingested, + ) + async def _enqueue_sample_batches( self, items: Sequence[AggregatedItem] ) -> None: diff --git a/pyproject.toml b/pyproject.toml index 441ab21..a14e3e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.63" +version = "0.26.64" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_ingestion_stage.py b/tests/test_ingestion_stage.py index b306ac5..eb23d3a 100644 --- a/tests/test_ingestion_stage.py +++ b/tests/test_ingestion_stage.py @@ -1,7 +1,13 @@ import asyncio +import logging from mcp_plex.common.types import AggregatedItem, PlexItem -from mcp_plex.loader.pipeline.channels import INGEST_DONE, SampleBatch +from mcp_plex.loader.pipeline.channels import ( + INGEST_DONE, + EpisodeBatch, + MovieBatch, + SampleBatch, +) from mcp_plex.loader.pipeline.ingestion import IngestionStage @@ -146,3 +152,95 @@ async def scenario() -> tuple[list[SampleBatch], object | None, object | None, i assert second_token is sentinel assert items_ingested == 2 assert batches_ingested == 2 + + +def test_ingestion_stage_ingest_plex_batches_movies_and_episodes(caplog) -> None: + caplog.set_level(logging.INFO) + + class FakeMovie: + def __init__(self, title: str) -> None: + self.title = title + + class FakeEpisode: + def __init__(self, title: str) -> None: + self.title = title + + class FakeShow: + def __init__(self, title: str, episode_titles: list[str]) -> None: + self.title = title + self._episodes = [FakeEpisode(ep_title) for ep_title in episode_titles] + + def episodes(self) -> list[FakeEpisode]: + return list(self._episodes) + + class FakePlex: + def __init__(self) -> None: + self._movies = [ + FakeMovie("Movie 1"), + FakeMovie("Movie 2"), + FakeMovie("Movie 3"), + ] + self._shows = [ + FakeShow("Show A", ["S01E01", "S01E02", "S01E03"]), + FakeShow("Show B", ["S01E01", "S01E02"]), + ] + + def movies(self) -> list[FakeMovie]: + return list(self._movies) + + def shows(self) -> list[FakeShow]: + return list(self._shows) + + sentinel = object() + + async def scenario() -> tuple[list[object], int, int]: + queue: asyncio.Queue = asyncio.Queue() + plex = FakePlex() + stage = IngestionStage( + plex_server=plex, + sample_items=None, + movie_batch_size=2, + episode_batch_size=2, + sample_batch_size=10, + output_queue=queue, + completion_sentinel=sentinel, + ) + + await stage._ingest_plex( + plex_server=plex, + movie_batch_size=2, + episode_batch_size=2, + output_queue=queue, + logger=stage.logger, + ) + + batches: list[object] = [] + while not queue.empty(): + batches.append(await queue.get()) + + return batches, stage.items_ingested, stage.batches_ingested + + batches, items_ingested, batches_ingested = asyncio.run(scenario()) + + assert items_ingested == 8 + assert batches_ingested == 5 + + assert len(batches) == 5 + assert isinstance(batches[0], MovieBatch) + assert [movie.title for movie in batches[0].movies] == ["Movie 1", "Movie 2"] + assert isinstance(batches[1], MovieBatch) + assert [movie.title for movie in batches[1].movies] == ["Movie 3"] + assert isinstance(batches[2], EpisodeBatch) + assert [episode.title for episode in batches[2].episodes] == ["S01E01", "S01E02"] + assert isinstance(batches[3], EpisodeBatch) + assert [episode.title for episode in batches[3].episodes] == ["S01E03"] + assert isinstance(batches[4], EpisodeBatch) + assert [episode.title for episode in batches[4].episodes] == ["S01E01", "S01E02"] + + assert caplog.messages == [ + "Queued Plex movie batch 1 with 2 movies (total items=2).", + "Queued Plex movie batch 2 with 1 movies (total items=3).", + "Queued Plex episode batch 1 for Show A with 2 episodes (total items=5).", + "Queued Plex episode batch 2 for Show A with 1 episodes (total items=6).", + "Queued Plex episode batch 1 for Show B with 2 episodes (total items=8).", + ] diff --git a/uv.lock b/uv.lock index 876026c..dace423 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.63" +version = "0.26.64" source = { editable = "." } dependencies = [ { name = "fastapi" },