Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "1.0.1"
version = "1.0.2"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
37 changes: 36 additions & 1 deletion mcp_plex/loader/pipeline/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ def imdb_retry_queue(self) -> IMDbRetryQueue:
async def run(self) -> None:
"""Execute the enrichment stage."""

self._logger.info(
"Starting enrichment stage with movie batch size=%d and episode batch size=%d.",
self._movie_batch_size,
self._episode_batch_size,
)
while True:
got_item = False
try:
Expand All @@ -349,10 +354,26 @@ async def run(self) -> None:
break

if isinstance(batch, MovieBatch):
self._logger.info(
"Enriching movie batch with %d item(s) (ingest queue=%d).",
len(batch.movies),
self._ingest_queue.qsize(),
)
await self._handle_movie_batch(batch)
elif isinstance(batch, EpisodeBatch):
self._logger.info(
"Enriching episode batch for %s with %d item(s) (ingest queue=%d).",
getattr(batch.show, "title", str(batch.show)),
len(batch.episodes),
self._ingest_queue.qsize(),
)
await self._handle_episode_batch(batch)
elif isinstance(batch, SampleBatch):
self._logger.info(
"Forwarding sample batch with %d item(s) (ingest queue=%d).",
len(batch.items),
self._ingest_queue.qsize(),
)
await self._handle_sample_batch(batch)
else: # pragma: no cover - defensive logging for future types
self._logger.warning(
Expand All @@ -363,6 +384,10 @@ async def run(self) -> None:
self._ingest_queue.task_done()

await self._persistence_queue.put(PERSIST_DONE)
self._logger.info(
"Enrichment stage completed; persistence sentinel emitted (retry queue=%d).",
self._imdb_retry_queue.qsize(),
)

async def _handle_movie_batch(self, batch: MovieBatch) -> None:
"""Enrich and forward Plex movie batches to the persistence stage."""
Expand Down Expand Up @@ -448,7 +473,13 @@ async def _emit_persistence_batch(

if not aggregated:
return
await self._persistence_queue.put(list(aggregated))
payload = list(aggregated)
await self._persistence_queue.put(payload)
self._logger.debug(
"Enqueued %d aggregated item(s) for persistence (queue size=%d).",
len(payload),
self._persistence_queue.qsize(),
)

async def _enrich_movies(
self, client: Any, movies: Sequence[Any]
Expand Down Expand Up @@ -604,6 +635,10 @@ async def _retry_imdb_batches(self) -> bool:
if self._imdb_retry_queue.empty():
return False

self._logger.debug(
"Processing IMDb retry queue with %d pending id(s).",
self._imdb_retry_queue.qsize(),
)
imdb_ids: list[str] = []
while (
len(imdb_ids) < self._imdb_batch_limit
Expand Down
64 changes: 64 additions & 0 deletions mcp_plex/loader/pipeline/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,27 @@ async def run(self) -> None:
ported from the legacy loader.
"""

mode = "sample" if self._sample_items is not None else "plex"
self._logger.info(
"Starting ingestion stage (%s mode) with movie batch size=%d, episode batch size=%d, sample batch size=%d.",
mode,
self._movie_batch_size,
self._episode_batch_size,
self._sample_batch_size,
)
if self._sample_items is not None:
await self._run_sample_ingestion(self._sample_items)
else:
await self._run_plex_ingestion()

self._logger.debug("Publishing ingestion completion sentinels to downstream stages.")
await self._output_queue.put(None)
await self._output_queue.put(self._completion_sentinel)
self._logger.info(
"Ingestion stage finished after queuing %d batch(es) covering %d item(s).",
self._batches_ingested,
self._items_ingested,
)

@property
def items_ingested(self) -> int:
Expand All @@ -84,11 +98,23 @@ async def _run_sample_ingestion(self, items: Sequence[AggregatedItem]) -> None:
"""Placeholder hook for the sample ingestion flow."""

item_count = len(items)
start_batches = self._batches_ingested
start_items = self._items_ingested
self._logger.info(
"Beginning sample ingestion for %d item(s) with batch size=%d.",
item_count,
self._sample_batch_size,
)
self._logger.info(
"Sample ingestion has not been ported yet; %d items queued for later.",
item_count,
)
await self._enqueue_sample_batches(items)
self._logger.info(
"Queued %d sample batch(es) covering %d item(s).",
self._batches_ingested - start_batches,
self._items_ingested - start_items,
)
await asyncio.sleep(0)

async def _run_plex_ingestion(self) -> None:
Expand All @@ -97,13 +123,23 @@ async def _run_plex_ingestion(self) -> None:
if self._plex_server is None:
self._logger.warning("Plex server unavailable; skipping ingestion.")
else:
self._logger.info(
"Beginning Plex ingestion with movie batch size=%d and episode batch size=%d.",
self._movie_batch_size,
self._episode_batch_size,
)
await self._ingest_plex(
plex_server=self._plex_server,
movie_batch_size=self._movie_batch_size,
episode_batch_size=self._episode_batch_size,
output_queue=self._output_queue,
logger=self._logger,
)
self._logger.info(
"Completed Plex ingestion; emitted %d batch(es) covering %d item(s).",
self._batches_ingested,
self._items_ingested,
)
await asyncio.sleep(0)

async def _ingest_plex(
Expand All @@ -120,6 +156,11 @@ async def _ingest_plex(
movies_attr = getattr(plex_server, "movies", [])
movies_source = movies_attr() if callable(movies_attr) else movies_attr
movies = list(movies_source)
logger.info(
"Discovered %d Plex movie(s) for ingestion.",
len(movies),
)
movie_batches = 0
for batch_index, chunk in enumerate(
chunk_sequence(movies, movie_batch_size), start=1
):
Expand All @@ -131,6 +172,7 @@ async def _ingest_plex(
await output_queue.put(batch)
self._items_ingested += len(batch_movies)
self._batches_ingested += 1
movie_batches += 1
logger.info(
"Queued Plex movie batch %d with %d movies (total items=%d).",
batch_index,
Expand All @@ -141,13 +183,21 @@ async def _ingest_plex(
shows_attr = getattr(plex_server, "shows", [])
shows_source = shows_attr() if callable(shows_attr) else shows_attr
shows = list(shows_source)
logger.info(
"Discovered %d Plex show(s) for ingestion.",
len(shows),
)
episode_batches = 0
episode_total = 0
for show in shows:
show_title = getattr(show, "title", str(show))
episodes_attr = getattr(show, "episodes", [])
episodes_source = (
episodes_attr() if callable(episodes_attr) else episodes_attr
)
episodes = list(episodes_source)
if not episodes:
logger.debug("Show %s yielded no episodes for ingestion.", show_title)
for batch_index, chunk in enumerate(
chunk_sequence(episodes, episode_batch_size), start=1
):
Expand All @@ -159,6 +209,8 @@ async def _ingest_plex(
await output_queue.put(batch)
self._items_ingested += len(batch_episodes)
self._batches_ingested += 1
episode_batches += 1
episode_total += len(batch_episodes)
logger.info(
"Queued Plex episode batch %d for %s with %d episodes (total items=%d).",
batch_index,
Expand All @@ -167,6 +219,13 @@ async def _ingest_plex(
self._items_ingested,
)

logger.debug(
"Plex ingestion summary: %d movie batch(es), %d episode batch(es), %d episode(s).",
movie_batches,
episode_batches,
episode_total,
)

async def _enqueue_sample_batches(
self, items: Sequence[AggregatedItem]
) -> None:
Expand All @@ -180,3 +239,8 @@ async def _enqueue_sample_batches(
await self._output_queue.put(SampleBatch(items=batch_items))
self._items_ingested += len(batch_items)
self._batches_ingested += 1
self._logger.debug(
"Queued sample batch with %d item(s) (total items=%d).",
len(batch_items),
self._items_ingested,
)
14 changes: 14 additions & 0 deletions mcp_plex/loader/pipeline/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def __init__(
async def run(self) -> None:
"""Execute the configured pipeline stages concurrently."""

self._logger.info(
"Launching loader orchestrator with %d persistence worker(s).",
self._persistence_worker_count,
)
try:
async with asyncio.TaskGroup() as group:
group.create_task(
Expand Down Expand Up @@ -87,6 +91,8 @@ async def run(self) -> None:
# Re-raise the first underlying error after cleanup so callers see the
# original exception rather than the wrapper.
raise failures[0].error
else:
self._logger.info("Loader orchestrator run completed successfully.")

async def _run_stage(
self,
Expand All @@ -96,14 +102,22 @@ async def _run_stage(
) -> None:
"""Execute *runner* and wrap unexpected exceptions with stage metadata."""

stage_name = self._describe_stage(spec)
self._logger.info("Starting %s.", stage_name)
try:
result = runner(*args)
if inspect.isawaitable(result):
await result
except asyncio.CancelledError:
self._logger.debug("%s cancelled.", stage_name)
raise
except BaseException as exc:
self._logger.debug(
"%s raised %s", stage_name, exc, exc_info=exc
)
raise _StageFailure(spec, exc) from exc
else:
self._logger.info("%s completed successfully.", stage_name)

async def _handle_failures(self, failures: list[_StageFailure]) -> None:
"""Log stage-specific failures and drain queues during cancellation."""
Expand Down
17 changes: 17 additions & 0 deletions mcp_plex/loader/pipeline/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ async def enqueue_points(
if not points:
return

total_points = len(points)
self._logger.info(
"Received %d point(s) for persistence; chunking with buffer size=%d.",
total_points,
self._upsert_buffer_size,
)
for chunk in chunk_sequence(list(points), self._upsert_buffer_size):
batch = list(chunk)
if not batch:
Expand All @@ -174,10 +180,21 @@ async def enqueue_points(
except BaseException:
self._upsert_semaphore.release()
raise
else:
self._logger.debug(
"Queued persistence batch with %d point(s) (queue size=%d).",
len(batch),
self._persistence_queue.qsize(),
)

async def run(self, worker_id: int) -> None:
"""Drain the persistence queue until a sentinel is received."""

self._logger.info(
"Starting persistence worker %d (buffer size=%d).",
worker_id,
self._upsert_buffer_size,
)
while True:
payload = await self._persistence_queue.get()
try:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "1.0.1"
version = "1.0.2"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
13 changes: 12 additions & 1 deletion tests/test_ingestion_stage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import logging

import pytest

from mcp_plex.common.types import AggregatedItem, PlexItem
from mcp_plex.loader.pipeline.channels import (
INGEST_DONE,
Expand Down Expand Up @@ -237,10 +239,19 @@ async def scenario() -> tuple[list[object], int, int]:
assert isinstance(batches[4], EpisodeBatch)
assert [episode.title for episode in batches[4].episodes] == ["S01E01", "S01E02"]

assert caplog.messages == [
expected_tail = [
"Queued Plex movie batch 1 with 2 movies (total items=2).",
"Queued Plex movie batch 2 with 1 movies (total items=3).",
"Queued Plex episode batch 1 for Show A with 2 episodes (total items=5).",
"Queued Plex episode batch 2 for Show A with 1 episodes (total items=6).",
"Queued Plex episode batch 1 for Show B with 2 episodes (total items=8).",
]
observed_iter = iter(caplog.messages)
for expected in expected_tail:
for message in observed_iter:
if message == expected:
break
else: # pragma: no cover - pytest fail helper
pytest.fail(f"Expected log message not found: {expected}")
assert "Discovered 3 Plex movie(s) for ingestion." in caplog.messages
assert "Discovered 2 Plex show(s) for ingestion." in caplog.messages
5 changes: 5 additions & 0 deletions tests/test_loader_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ def test_run_logs_upsert(monkeypatch, caplog):
with caplog.at_level(logging.INFO):
asyncio.run(loader.run(None, None, None, sample_dir, None, None))
assert "Starting staged loader (sample mode)" in caplog.text
assert "Launching loader orchestrator" in caplog.text
assert "Starting ingestion stage (sample mode)" in caplog.text
assert "Upsert worker 0 handling 2 points" in caplog.text
assert "Upsert worker 0 processed 2 items" in caplog.text
assert "Loaded 2 items" in caplog.text
assert "Ingestion stage finished" in caplog.text
assert "Loader orchestrator run completed successfully" in caplog.text


def test_run_logs_no_points(monkeypatch, caplog):
Expand All @@ -47,6 +51,7 @@ def test_run_logs_no_points(monkeypatch, caplog):
asyncio.run(loader.run(None, None, None, sample_dir, None, None))
assert "Loaded 0 items" in caplog.text
assert "No points to upsert" in caplog.text
assert "Ingestion stage finished" in caplog.text


def test_run_rejects_invalid_upsert_buffer_size(monkeypatch):
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.