diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 9601223..3568a83 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "1.0.1" +version = "1.0.2" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/pipeline/enrichment.py b/mcp_plex/loader/pipeline/enrichment.py index db3cee8..ad504d6 100644 --- a/mcp_plex/loader/pipeline/enrichment.py +++ b/mcp_plex/loader/pipeline/enrichment.py @@ -325,6 +325,11 @@ def imdb_retry_queue(self) -> IMDbRetryQueue: async def run(self) -> None: """Execute the enrichment stage.""" + self._logger.info( + "Starting enrichment stage with movie batch size=%d and episode batch size=%d.", + self._movie_batch_size, + self._episode_batch_size, + ) while True: got_item = False try: @@ -349,10 +354,26 @@ async def run(self) -> None: break if isinstance(batch, MovieBatch): + self._logger.info( + "Enriching movie batch with %d item(s) (ingest queue=%d).", + len(batch.movies), + self._ingest_queue.qsize(), + ) await self._handle_movie_batch(batch) elif isinstance(batch, EpisodeBatch): + self._logger.info( + "Enriching episode batch for %s with %d item(s) (ingest queue=%d).", + getattr(batch.show, "title", str(batch.show)), + len(batch.episodes), + self._ingest_queue.qsize(), + ) await self._handle_episode_batch(batch) elif isinstance(batch, SampleBatch): + self._logger.info( + "Forwarding sample batch with %d item(s) (ingest queue=%d).", + len(batch.items), + self._ingest_queue.qsize(), + ) await self._handle_sample_batch(batch) else: # pragma: no cover - defensive logging for future types self._logger.warning( @@ -363,6 +384,10 @@ async def run(self) -> None: self._ingest_queue.task_done() await self._persistence_queue.put(PERSIST_DONE) + self._logger.info( + "Enrichment stage completed; persistence sentinel emitted (retry queue=%d).", + self._imdb_retry_queue.qsize(), + ) async def _handle_movie_batch(self, batch: MovieBatch) -> None: """Enrich and forward Plex movie batches to the persistence stage.""" @@ -448,7 +473,13 @@ async def _emit_persistence_batch( if not aggregated: return - await self._persistence_queue.put(list(aggregated)) + payload = list(aggregated) + await self._persistence_queue.put(payload) + self._logger.debug( + "Enqueued %d aggregated item(s) for persistence (queue size=%d).", + len(payload), + self._persistence_queue.qsize(), + ) async def _enrich_movies( self, client: Any, movies: Sequence[Any] @@ -604,6 +635,10 @@ async def _retry_imdb_batches(self) -> bool: if self._imdb_retry_queue.empty(): return False + self._logger.debug( + "Processing IMDb retry queue with %d pending id(s).", + self._imdb_retry_queue.qsize(), + ) imdb_ids: list[str] = [] while ( len(imdb_ids) < self._imdb_batch_limit diff --git a/mcp_plex/loader/pipeline/ingestion.py b/mcp_plex/loader/pipeline/ingestion.py index 54eccee..b5b9bcf 100644 --- a/mcp_plex/loader/pipeline/ingestion.py +++ b/mcp_plex/loader/pipeline/ingestion.py @@ -60,13 +60,27 @@ async def run(self) -> None: ported from the legacy loader. """ + mode = "sample" if self._sample_items is not None else "plex" + self._logger.info( + "Starting ingestion stage (%s mode) with movie batch size=%d, episode batch size=%d, sample batch size=%d.", + mode, + self._movie_batch_size, + self._episode_batch_size, + self._sample_batch_size, + ) if self._sample_items is not None: await self._run_sample_ingestion(self._sample_items) else: await self._run_plex_ingestion() + self._logger.debug("Publishing ingestion completion sentinels to downstream stages.") await self._output_queue.put(None) await self._output_queue.put(self._completion_sentinel) + self._logger.info( + "Ingestion stage finished after queuing %d batch(es) covering %d item(s).", + self._batches_ingested, + self._items_ingested, + ) @property def items_ingested(self) -> int: @@ -84,11 +98,23 @@ async def _run_sample_ingestion(self, items: Sequence[AggregatedItem]) -> None: """Placeholder hook for the sample ingestion flow.""" item_count = len(items) + start_batches = self._batches_ingested + start_items = self._items_ingested + self._logger.info( + "Beginning sample ingestion for %d item(s) with batch size=%d.", + item_count, + self._sample_batch_size, + ) self._logger.info( "Sample ingestion has not been ported yet; %d items queued for later.", item_count, ) await self._enqueue_sample_batches(items) + self._logger.info( + "Queued %d sample batch(es) covering %d item(s).", + self._batches_ingested - start_batches, + self._items_ingested - start_items, + ) await asyncio.sleep(0) async def _run_plex_ingestion(self) -> None: @@ -97,6 +123,11 @@ async def _run_plex_ingestion(self) -> None: if self._plex_server is None: self._logger.warning("Plex server unavailable; skipping ingestion.") else: + self._logger.info( + "Beginning Plex ingestion with movie batch size=%d and episode batch size=%d.", + self._movie_batch_size, + self._episode_batch_size, + ) await self._ingest_plex( plex_server=self._plex_server, movie_batch_size=self._movie_batch_size, @@ -104,6 +135,11 @@ async def _run_plex_ingestion(self) -> None: output_queue=self._output_queue, logger=self._logger, ) + self._logger.info( + "Completed Plex ingestion; emitted %d batch(es) covering %d item(s).", + self._batches_ingested, + self._items_ingested, + ) await asyncio.sleep(0) async def _ingest_plex( @@ -120,6 +156,11 @@ async def _ingest_plex( movies_attr = getattr(plex_server, "movies", []) movies_source = movies_attr() if callable(movies_attr) else movies_attr movies = list(movies_source) + logger.info( + "Discovered %d Plex movie(s) for ingestion.", + len(movies), + ) + movie_batches = 0 for batch_index, chunk in enumerate( chunk_sequence(movies, movie_batch_size), start=1 ): @@ -131,6 +172,7 @@ async def _ingest_plex( await output_queue.put(batch) self._items_ingested += len(batch_movies) self._batches_ingested += 1 + movie_batches += 1 logger.info( "Queued Plex movie batch %d with %d movies (total items=%d).", batch_index, @@ -141,6 +183,12 @@ async def _ingest_plex( shows_attr = getattr(plex_server, "shows", []) shows_source = shows_attr() if callable(shows_attr) else shows_attr shows = list(shows_source) + logger.info( + "Discovered %d Plex show(s) for ingestion.", + len(shows), + ) + episode_batches = 0 + episode_total = 0 for show in shows: show_title = getattr(show, "title", str(show)) episodes_attr = getattr(show, "episodes", []) @@ -148,6 +196,8 @@ async def _ingest_plex( episodes_attr() if callable(episodes_attr) else episodes_attr ) episodes = list(episodes_source) + if not episodes: + logger.debug("Show %s yielded no episodes for ingestion.", show_title) for batch_index, chunk in enumerate( chunk_sequence(episodes, episode_batch_size), start=1 ): @@ -159,6 +209,8 @@ async def _ingest_plex( await output_queue.put(batch) self._items_ingested += len(batch_episodes) self._batches_ingested += 1 + episode_batches += 1 + episode_total += len(batch_episodes) logger.info( "Queued Plex episode batch %d for %s with %d episodes (total items=%d).", batch_index, @@ -167,6 +219,13 @@ async def _ingest_plex( self._items_ingested, ) + logger.debug( + "Plex ingestion summary: %d movie batch(es), %d episode batch(es), %d episode(s).", + movie_batches, + episode_batches, + episode_total, + ) + async def _enqueue_sample_batches( self, items: Sequence[AggregatedItem] ) -> None: @@ -180,3 +239,8 @@ async def _enqueue_sample_batches( await self._output_queue.put(SampleBatch(items=batch_items)) self._items_ingested += len(batch_items) self._batches_ingested += 1 + self._logger.debug( + "Queued sample batch with %d item(s) (total items=%d).", + len(batch_items), + self._items_ingested, + ) diff --git a/mcp_plex/loader/pipeline/orchestrator.py b/mcp_plex/loader/pipeline/orchestrator.py index 8a51753..38ab417 100644 --- a/mcp_plex/loader/pipeline/orchestrator.py +++ b/mcp_plex/loader/pipeline/orchestrator.py @@ -58,6 +58,10 @@ def __init__( async def run(self) -> None: """Execute the configured pipeline stages concurrently.""" + self._logger.info( + "Launching loader orchestrator with %d persistence worker(s).", + self._persistence_worker_count, + ) try: async with asyncio.TaskGroup() as group: group.create_task( @@ -87,6 +91,8 @@ async def run(self) -> None: # Re-raise the first underlying error after cleanup so callers see the # original exception rather than the wrapper. raise failures[0].error + else: + self._logger.info("Loader orchestrator run completed successfully.") async def _run_stage( self, @@ -96,14 +102,22 @@ async def _run_stage( ) -> None: """Execute *runner* and wrap unexpected exceptions with stage metadata.""" + stage_name = self._describe_stage(spec) + self._logger.info("Starting %s.", stage_name) try: result = runner(*args) if inspect.isawaitable(result): await result except asyncio.CancelledError: + self._logger.debug("%s cancelled.", stage_name) raise except BaseException as exc: + self._logger.debug( + "%s raised %s", stage_name, exc, exc_info=exc + ) raise _StageFailure(spec, exc) from exc + else: + self._logger.info("%s completed successfully.", stage_name) async def _handle_failures(self, failures: list[_StageFailure]) -> None: """Log stage-specific failures and drain queues during cancellation.""" diff --git a/mcp_plex/loader/pipeline/persistence.py b/mcp_plex/loader/pipeline/persistence.py index f03b184..83f030f 100644 --- a/mcp_plex/loader/pipeline/persistence.py +++ b/mcp_plex/loader/pipeline/persistence.py @@ -164,6 +164,12 @@ async def enqueue_points( if not points: return + total_points = len(points) + self._logger.info( + "Received %d point(s) for persistence; chunking with buffer size=%d.", + total_points, + self._upsert_buffer_size, + ) for chunk in chunk_sequence(list(points), self._upsert_buffer_size): batch = list(chunk) if not batch: @@ -174,10 +180,21 @@ async def enqueue_points( except BaseException: self._upsert_semaphore.release() raise + else: + self._logger.debug( + "Queued persistence batch with %d point(s) (queue size=%d).", + len(batch), + self._persistence_queue.qsize(), + ) async def run(self, worker_id: int) -> None: """Drain the persistence queue until a sentinel is received.""" + self._logger.info( + "Starting persistence worker %d (buffer size=%d).", + worker_id, + self._upsert_buffer_size, + ) while True: payload = await self._persistence_queue.get() try: diff --git a/pyproject.toml b/pyproject.toml index 2b39c9c..2329904 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "1.0.1" +version = "1.0.2" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_ingestion_stage.py b/tests/test_ingestion_stage.py index eb23d3a..764ea68 100644 --- a/tests/test_ingestion_stage.py +++ b/tests/test_ingestion_stage.py @@ -1,6 +1,8 @@ import asyncio import logging +import pytest + from mcp_plex.common.types import AggregatedItem, PlexItem from mcp_plex.loader.pipeline.channels import ( INGEST_DONE, @@ -237,10 +239,19 @@ async def scenario() -> tuple[list[object], int, int]: assert isinstance(batches[4], EpisodeBatch) assert [episode.title for episode in batches[4].episodes] == ["S01E01", "S01E02"] - assert caplog.messages == [ + expected_tail = [ "Queued Plex movie batch 1 with 2 movies (total items=2).", "Queued Plex movie batch 2 with 1 movies (total items=3).", "Queued Plex episode batch 1 for Show A with 2 episodes (total items=5).", "Queued Plex episode batch 2 for Show A with 1 episodes (total items=6).", "Queued Plex episode batch 1 for Show B with 2 episodes (total items=8).", ] + observed_iter = iter(caplog.messages) + for expected in expected_tail: + for message in observed_iter: + if message == expected: + break + else: # pragma: no cover - pytest fail helper + pytest.fail(f"Expected log message not found: {expected}") + assert "Discovered 3 Plex movie(s) for ingestion." in caplog.messages + assert "Discovered 2 Plex show(s) for ingestion." in caplog.messages diff --git a/tests/test_loader_logging.py b/tests/test_loader_logging.py index a0a8262..9d96316 100644 --- a/tests/test_loader_logging.py +++ b/tests/test_loader_logging.py @@ -34,9 +34,13 @@ def test_run_logs_upsert(monkeypatch, caplog): with caplog.at_level(logging.INFO): asyncio.run(loader.run(None, None, None, sample_dir, None, None)) assert "Starting staged loader (sample mode)" in caplog.text + assert "Launching loader orchestrator" in caplog.text + assert "Starting ingestion stage (sample mode)" in caplog.text assert "Upsert worker 0 handling 2 points" in caplog.text assert "Upsert worker 0 processed 2 items" in caplog.text assert "Loaded 2 items" in caplog.text + assert "Ingestion stage finished" in caplog.text + assert "Loader orchestrator run completed successfully" in caplog.text def test_run_logs_no_points(monkeypatch, caplog): @@ -47,6 +51,7 @@ def test_run_logs_no_points(monkeypatch, caplog): asyncio.run(loader.run(None, None, None, sample_dir, None, None)) assert "Loaded 0 items" in caplog.text assert "No points to upsert" in caplog.text + assert "Ingestion stage finished" in caplog.text def test_run_rejects_invalid_upsert_buffer_size(monkeypatch): diff --git a/uv.lock b/uv.lock index c351934..24830f2 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "1.0.1" +version = "1.0.2" source = { editable = "." } dependencies = [ { name = "fastapi" },