From 359f2afd97627b1778df469a1f1077ba7be37835 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Mon, 6 Oct 2025 18:33:00 -0600 Subject: [PATCH 1/2] refactor: paginate plex ingestion batches --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/pipeline/ingestion.py | 190 +++++++++++++++----- pyproject.toml | 2 +- tests/test_ingestion_stage.py | 241 +++++++++++++++++++++++++- uv.lock | 2 +- 5 files changed, 385 insertions(+), 52 deletions(-) diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 9e7a579..a6bc6d4 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "1.0.12" +version = "1.0.13" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/pipeline/ingestion.py b/mcp_plex/loader/pipeline/ingestion.py index 83460a8..57a1ce2 100644 --- a/mcp_plex/loader/pipeline/ingestion.py +++ b/mcp_plex/loader/pipeline/ingestion.py @@ -9,7 +9,7 @@ import asyncio import logging -from typing import Sequence +from typing import Callable, Iterable, Sequence from ...common.types import AggregatedItem from .channels import ( @@ -159,25 +159,65 @@ async def _ingest_plex( library = plex_server.library + def _log_discovered_count( + *, section: object, descriptor: str + ) -> int | None: + try: + total = getattr(section, "totalSize") # type: ignore[assignment] + except Exception: # pragma: no cover - defensive guard + total = None + if isinstance(total, int): + logger.info( + "Discovered %d Plex %s(s) for ingestion.", + total, + descriptor, + ) + return total + return None + + def _iter_section_items( + *, + fetch_page: Callable[[int], Sequence[Movie | Show]], + batch_size: int, + ) -> Iterable[Sequence[Movie | Show]]: + start = 0 + while True: + page = list(fetch_page(start)) + if not page: + break + yield page + if len(page) < batch_size: + break + start += len(page) + movies_section = library.section("Movies") - movies: list[Movie] = list(movies_section.all()) - logger.info( - "Discovered %d Plex movie(s) for ingestion.", - len(movies), + discovered_movies = _log_discovered_count( + section=movies_section, + descriptor="movie", ) + movie_batches = 0 - for batch_index, chunk in enumerate( - chunk_sequence(movies, movie_batch_size), start=1 + movie_total = 0 + + def _fetch_movies(start: int) -> Sequence[Movie]: + return movies_section.search( + container_start=start, + container_size=movie_batch_size, + ) + + for batch_index, batch_movies in enumerate( + _iter_section_items(fetch_page=_fetch_movies, batch_size=movie_batch_size), + start=1, ): - batch_movies = list(chunk) if not batch_movies: continue - batch = MovieBatch(movies=batch_movies) + batch = MovieBatch(movies=list(batch_movies)) await output_queue.put(batch) self._items_ingested += len(batch_movies) self._batches_ingested += 1 movie_batches += 1 + movie_total += len(batch_movies) logger.info( "Queued Plex movie batch %d with %d movies (total items=%d).", batch_index, @@ -185,42 +225,110 @@ async def _ingest_plex( self._items_ingested, ) + if discovered_movies is None: + logger.info( + "Discovered %d Plex movie(s) for ingestion.", + movie_total, + ) + shows_section = library.section("TV Shows") - shows: list[Show] = list(shows_section.all()) - logger.info( - "Discovered %d Plex show(s) for ingestion.", - len(shows), + discovered_shows = _log_discovered_count( + section=shows_section, + descriptor="show", ) + + def _fetch_shows(start: int) -> Sequence[Show]: + return shows_section.search( + container_start=start, + container_size=max(1, episode_batch_size), + ) + + show_total = 0 episode_batches = 0 episode_total = 0 - for show in shows: - show_title = getattr(show, "title", str(show)) - seasons: list[Season] = list(show.seasons()) - episodes: list[Episode] = [] - for season in seasons: - episodes.extend(season.episodes()) - if not episodes: - logger.debug("Show %s yielded no episodes for ingestion.", show_title) - for batch_index, chunk in enumerate( - chunk_sequence(episodes, episode_batch_size), start=1 - ): - batch_episodes = list(chunk) - if not batch_episodes: - continue - - batch = EpisodeBatch(show=show, episodes=batch_episodes) - await output_queue.put(batch) - self._items_ingested += len(batch_episodes) - self._batches_ingested += 1 - episode_batches += 1 - episode_total += len(batch_episodes) - logger.info( - "Queued Plex episode batch %d for %s with %d episodes (total items=%d).", - batch_index, - show_title, - len(batch_episodes), - self._items_ingested, - ) + + for show_batch in _iter_section_items( + fetch_page=_fetch_shows, + batch_size=max(1, episode_batch_size), + ): + for show in show_batch: + show_total += 1 + show_title = getattr(show, "title", str(show)) + show_episode_count = 0 + pending_episodes: list[Episode] = [] + show_batch_index = 0 + + seasons: Sequence[Season] = show.seasons() + for season in seasons: + start = 0 + while True: + season_page = list( + season.episodes( + container_start=start, + container_size=episode_batch_size, + ) + ) + if not season_page: + break + + show_episode_count += len(season_page) + pending_episodes.extend(season_page) + + while len(pending_episodes) >= episode_batch_size: + batch_episodes = pending_episodes[:episode_batch_size] + pending_episodes = pending_episodes[episode_batch_size:] + show_batch_index += 1 + batch = EpisodeBatch( + show=show, + episodes=list(batch_episodes), + ) + await output_queue.put(batch) + self._items_ingested += len(batch_episodes) + self._batches_ingested += 1 + episode_batches += 1 + episode_total += len(batch_episodes) + logger.info( + "Queued Plex episode batch %d for %s with %d episodes (total items=%d).", + show_batch_index, + show_title, + len(batch_episodes), + self._items_ingested, + ) + + if len(season_page) < episode_batch_size: + break + start += len(season_page) + + if pending_episodes: + show_batch_index += 1 + batch = EpisodeBatch( + show=show, + episodes=list(pending_episodes), + ) + await output_queue.put(batch) + self._items_ingested += len(pending_episodes) + self._batches_ingested += 1 + episode_batches += 1 + episode_total += len(pending_episodes) + logger.info( + "Queued Plex episode batch %d for %s with %d episodes (total items=%d).", + show_batch_index, + show_title, + len(pending_episodes), + self._items_ingested, + ) + + if show_episode_count == 0: + logger.debug( + "Show %s yielded no episodes for ingestion.", + show_title, + ) + + if discovered_shows is None: + logger.info( + "Discovered %d Plex show(s) for ingestion.", + show_total, + ) logger.debug( "Plex ingestion summary: %d movie batch(es), %d episode batch(es), %d episode(s).", diff --git a/pyproject.toml b/pyproject.toml index 875ccc3..81d27e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "1.0.12" +version = "1.0.13" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_ingestion_stage.py b/tests/test_ingestion_stage.py index d4a9ba0..71cd3f2 100644 --- a/tests/test_ingestion_stage.py +++ b/tests/test_ingestion_stage.py @@ -1,6 +1,6 @@ import asyncio import logging -from unittest.mock import Mock, create_autospec +from unittest.mock import Mock, call, create_autospec import pytest from plexapi.server import PlexServer @@ -178,28 +178,64 @@ async def scenario() -> tuple[list[MovieBatch | EpisodeBatch], int, int, Mock]: create_autospec(Movie, instance=True, title="Movie 2"), create_autospec(Movie, instance=True, title="Movie 3"), ] - movie_section.all.return_value = movies + movie_section.totalSize = len(movies) + + def movie_search(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or len(movies) + return movies[start : start + size] + + movie_section.search.side_effect = movie_search def _episodes(titles: list[str]) -> list[Episode]: return [create_autospec(Episode, instance=True, title=title) for title in titles] show_a_season_1 = create_autospec(Season, instance=True) - show_a_season_1.episodes.return_value = _episodes(["S01E01", "S01E02"]) + show_a_s1_eps = _episodes(["S01E01", "S01E02"]) + + def show_a_s1_side_effect(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or len(show_a_s1_eps) + return show_a_s1_eps[start : start + size] + + show_a_season_1.episodes.side_effect = show_a_s1_side_effect + show_a_season_2 = create_autospec(Season, instance=True) - show_a_season_2.episodes.return_value = _episodes(["S01E03"]) + show_a_s2_eps = _episodes(["S01E03"]) + + def show_a_s2_side_effect(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or len(show_a_s2_eps) + return show_a_s2_eps[start : start + size] + + show_a_season_2.episodes.side_effect = show_a_s2_side_effect show_a = create_autospec(Show, instance=True, title="Show A") show_a.seasons.return_value = [show_a_season_1, show_a_season_2] show_b_season_1 = create_autospec(Season, instance=True) - show_b_season_1.episodes.return_value = _episodes(["S01E01", "S01E02"]) + show_b_s1_eps = _episodes(["S01E01", "S01E02"]) + + def show_b_s1_side_effect(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or len(show_b_s1_eps) + return show_b_s1_eps[start : start + size] + + show_b_season_1.episodes.side_effect = show_b_s1_side_effect show_b = create_autospec(Show, instance=True, title="Show B") show_b.seasons.return_value = [show_b_season_1] shows = [show_a, show_b] show_section = Mock() - show_section.all.return_value = shows + show_section.totalSize = len(shows) + + def show_search(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or len(shows) + return shows[start : start + size] + + show_section.search.side_effect = show_search library = Mock() library.section.side_effect = lambda name: { @@ -232,14 +268,53 @@ def _episodes(titles: list[str]) -> list[Episode]: while not queue.empty(): batches.append(await queue.get()) - return batches, stage.items_ingested, stage.batches_ingested, library + return ( + batches, + stage.items_ingested, + stage.batches_ingested, + library, + movie_section, + show_section, + show_a_season_1, + show_a_season_2, + show_b_season_1, + ) - batches, items_ingested, batches_ingested, library = asyncio.run(scenario()) + ( + batches, + items_ingested, + batches_ingested, + library, + movie_section, + show_section, + show_a_season_1, + show_a_season_2, + show_b_season_1, + ) = asyncio.run(scenario()) assert library.section.call_args_list == [ (("Movies",),), (("TV Shows",),), ] + assert movie_section.search.call_args_list == [ + call(container_start=0, container_size=2), + call(container_start=2, container_size=2), + ] + assert show_section.search.call_args_list == [ + call(container_start=0, container_size=2), + call(container_start=2, container_size=2), + ] + assert show_a_season_1.episodes.call_args_list == [ + call(container_start=0, container_size=2), + call(container_start=2, container_size=2), + ] + assert show_a_season_2.episodes.call_args_list == [ + call(container_start=0, container_size=2), + ] + assert show_b_season_1.episodes.call_args_list == [ + call(container_start=0, container_size=2), + call(container_start=2, container_size=2), + ] assert items_ingested == 8 assert batches_ingested == 5 @@ -271,3 +346,153 @@ def _episodes(titles: list[str]) -> list[Episode]: pytest.fail(f"Expected log message not found: {expected}") assert "Discovered 3 Plex movie(s) for ingestion." in caplog.messages assert "Discovered 2 Plex show(s) for ingestion." in caplog.messages + + +def test_ingestion_stage_ingest_plex_large_library_batches(caplog) -> None: + caplog.set_level(logging.INFO) + + async def scenario() -> tuple[ + list[MovieBatch | EpisodeBatch], + int, + int, + list, + list, + list[list], + ]: + queue: asyncio.Queue = asyncio.Queue() + + movie_batch_size = 50 + episode_batch_size = 25 + + movie_section = Mock() + movie_count = 105 + movies = [ + create_autospec(Movie, instance=True, title=f"Movie {index + 1}") + for index in range(movie_count) + ] + movie_section.totalSize = movie_count + + def movie_search(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or movie_count + return movies[start : start + size] + + movie_section.search.side_effect = movie_search + + episode_counts = [30, 25, 10] + shows: list[Show] = [] + season_mocks: list[Season] = [] + show_section = Mock() + show_section.totalSize = len(episode_counts) + + for show_index, episode_count in enumerate(episode_counts, start=1): + show = create_autospec(Show, instance=True, title=f"Show {show_index}") + season = create_autospec(Season, instance=True) + episodes = [ + create_autospec( + Episode, + instance=True, + title=f"S{show_index:02d}E{episode_index + 1:02d}", + ) + for episode_index in range(episode_count) + ] + + def _make_side_effect(eps: list[Episode]): + def _side_effect(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or len(eps) + return eps[start : start + size] + + return _side_effect + + season.episodes.side_effect = _make_side_effect(episodes) + show.seasons.return_value = [season] + shows.append(show) + season_mocks.append(season) + + def show_search(*, container_start=None, container_size=None, **_kwargs): + start = container_start or 0 + size = container_size or len(shows) + return shows[start : start + size] + + show_section.search.side_effect = show_search + + library = Mock() + library.section.side_effect = lambda name: { + "Movies": movie_section, + "TV Shows": show_section, + }[name] + + plex = create_autospec(PlexServer, instance=True) + plex.library = library + + stage = IngestionStage( + plex_server=plex, + sample_items=None, + movie_batch_size=movie_batch_size, + episode_batch_size=episode_batch_size, + sample_batch_size=10, + output_queue=queue, + completion_sentinel=INGEST_DONE, + ) + + await stage._ingest_plex( + plex_server=plex, + movie_batch_size=movie_batch_size, + episode_batch_size=episode_batch_size, + output_queue=queue, + logger=stage.logger, + ) + + batches: list[MovieBatch | EpisodeBatch] = [] + while not queue.empty(): + batches.append(await queue.get()) + + return ( + batches, + stage.items_ingested, + stage.batches_ingested, + movie_section.search.call_args_list, + show_section.search.call_args_list, + [season.episodes.call_args_list for season in season_mocks], + ) + + ( + batches, + items_ingested, + batches_ingested, + movie_search_calls, + show_search_calls, + season_episode_calls, + ) = asyncio.run(scenario()) + + movie_batches = [batch for batch in batches if isinstance(batch, MovieBatch)] + episode_batches = [batch for batch in batches if isinstance(batch, EpisodeBatch)] + + assert [len(batch.movies) for batch in movie_batches] == [50, 50, 5] + assert [len(batch.episodes) for batch in episode_batches] == [25, 5, 25, 10] + assert items_ingested == 105 + 65 + assert batches_ingested == 7 + + assert movie_search_calls == [ + call(container_start=0, container_size=50), + call(container_start=50, container_size=50), + call(container_start=100, container_size=50), + ] + assert show_search_calls == [ + call(container_start=0, container_size=25), + ] + assert season_episode_calls[0] == [ + call(container_start=0, container_size=25), + call(container_start=25, container_size=25), + ] + assert season_episode_calls[1] == [ + call(container_start=0, container_size=25), + call(container_start=25, container_size=25), + ] + assert season_episode_calls[2] == [ + call(container_start=0, container_size=25), + ] + + assert "Discovered 105 Plex movie(s) for ingestion." in caplog.messages + assert "Discovered 3 Plex show(s) for ingestion." in caplog.messages diff --git a/uv.lock b/uv.lock index bca5f78..e4c8bb2 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "1.0.12" +version = "1.0.13" source = { editable = "." } dependencies = [ { name = "fastapi" }, From 6d479cbf7ddf3d5dac2a712f5c9a2f94aa84aa27 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Mon, 6 Oct 2025 19:05:23 -0600 Subject: [PATCH 2/2] fix: validate plex pagination batch sizes --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/pipeline/ingestion.py | 10 +++++++ pyproject.toml | 2 +- tests/test_ingestion_stage.py | 41 +++++++++++++++++++++++++++ uv.lock | 2 +- 5 files changed, 54 insertions(+), 3 deletions(-) diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index a6bc6d4..edda86e 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "1.0.13" +version = "1.0.14" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/pipeline/ingestion.py b/mcp_plex/loader/pipeline/ingestion.py index 57a1ce2..f611379 100644 --- a/mcp_plex/loader/pipeline/ingestion.py +++ b/mcp_plex/loader/pipeline/ingestion.py @@ -12,6 +12,7 @@ from typing import Callable, Iterable, Sequence from ...common.types import AggregatedItem +from ...common.validation import require_positive from .channels import ( EpisodeBatch, IngestQueue, @@ -157,6 +158,15 @@ async def _ingest_plex( ) -> None: """Retrieve Plex media and place batches onto *output_queue*.""" + movie_batch_size = require_positive( + int(movie_batch_size), + name="movie_batch_size", + ) + episode_batch_size = require_positive( + int(episode_batch_size), + name="episode_batch_size", + ) + library = plex_server.library def _log_discovered_count( diff --git a/pyproject.toml b/pyproject.toml index 81d27e7..006272d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "1.0.13" +version = "1.0.14" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_ingestion_stage.py b/tests/test_ingestion_stage.py index 71cd3f2..a101a2b 100644 --- a/tests/test_ingestion_stage.py +++ b/tests/test_ingestion_stage.py @@ -117,6 +117,47 @@ async def scenario() -> tuple[ assert batches_ingested == 2 +@pytest.mark.parametrize( + ("movie_batch_size", "episode_batch_size", "expected_name"), + [ + (0, 1, "movie_batch_size"), + (1, 0, "episode_batch_size"), + (-3, 1, "movie_batch_size"), + (1, -7, "episode_batch_size"), + ], +) +def test_ingestion_stage_ingest_plex_requires_positive_batch_sizes( + movie_batch_size: int, episode_batch_size: int, expected_name: str +) -> None: + async def scenario() -> ValueError: + queue: asyncio.Queue = asyncio.Queue() + stage = IngestionStage( + plex_server=create_autospec(PlexServer, instance=True), + sample_items=None, + movie_batch_size=1, + episode_batch_size=1, + sample_batch_size=1, + output_queue=queue, + completion_sentinel=INGEST_DONE, + ) + + plex_server = create_autospec(PlexServer, instance=True) + + with pytest.raises(ValueError) as excinfo: + await stage._ingest_plex( + plex_server=plex_server, + movie_batch_size=movie_batch_size, + episode_batch_size=episode_batch_size, + output_queue=queue, + logger=stage.logger, + ) + + return excinfo.value + + error = asyncio.run(scenario()) + assert str(error) == f"{expected_name} must be positive" + + def test_ingestion_stage_backpressure_handling() -> None: async def scenario() -> tuple[ list[SampleBatch], diff --git a/uv.lock b/uv.lock index e4c8bb2..0854bc5 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "1.0.13" +version = "1.0.14" source = { editable = "." } dependencies = [ { name = "fastapi" },