Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "1.0.3"
version = "1.0.5"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
11 changes: 4 additions & 7 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from qdrant_client import models
from qdrant_client.async_qdrant_client import AsyncQdrantClient

from plexapi.base import PlexPartialObject as _PlexPartialObject
from plexapi.server import PlexServer

from .imdb_cache import IMDbCache
from .pipeline.channels import (
IMDbRetryQueue,
Expand All @@ -37,13 +40,7 @@
TMDBShow,
)

try: # Only import plexapi when available; the sample data mode does not require it.
from plexapi.base import PlexPartialObject
from plexapi.server import PlexServer
except Exception:
PlexServer = None # type: ignore[assignment]
PlexPartialObject = object # type: ignore[assignment]

PlexPartialObject = _PlexPartialObject

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down
28 changes: 19 additions & 9 deletions mcp_plex/loader/pipeline/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@
import asyncio
from collections import deque
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Final, Iterable, Sequence, TypeVar, TypeAlias
from typing import (
TYPE_CHECKING,
Any,
Final,
Iterable,
Literal,
Sequence,
TypeAlias,
TypeVar,
)

from ...common.types import AggregatedItem
from ...common.validation import require_positive

try: # Only import plexapi when available; the sample data mode does not require it.
from plexapi.base import PlexPartialObject
from plexapi.video import Episode, Movie, Show
except Exception:
PlexPartialObject = object # type: ignore[assignment]
Episode = Movie = Show = PlexPartialObject # type: ignore[assignment]
from plexapi.video import Episode, Movie, Show

T = TypeVar("T")

Expand All @@ -29,13 +33,15 @@


INGEST_DONE: Final = object()
IngestSentinel: TypeAlias = Literal[INGEST_DONE]
"""Sentinel object signaling that ingestion has completed.

The loader currently places ``None`` on ingestion queues in addition to this
sentinel so legacy listeners that only check for ``None`` continue to work.
"""

PERSIST_DONE: Final = object()
PersistenceSentinel: TypeAlias = Literal[PERSIST_DONE]
"""Sentinel object signaling that persistence has completed."""

if TYPE_CHECKING:
Expand Down Expand Up @@ -68,8 +74,10 @@ class SampleBatch:

IngestBatch = MovieBatch | EpisodeBatch | SampleBatch

IngestQueueItem: TypeAlias = IngestBatch | None | object
PersistenceQueueItem: TypeAlias = PersistencePayload | None | object
IngestQueueItem: TypeAlias = IngestBatch | None | IngestSentinel
PersistenceQueueItem: TypeAlias = (
PersistencePayload | None | PersistenceSentinel
)

IngestQueue: TypeAlias = asyncio.Queue[IngestQueueItem]
PersistenceQueue: TypeAlias = asyncio.Queue[PersistenceQueueItem]
Expand Down Expand Up @@ -125,7 +133,9 @@ def snapshot(self) -> list[str]:
"SampleBatch",
"IngestBatch",
"INGEST_DONE",
"IngestSentinel",
"PERSIST_DONE",
"PersistenceSentinel",
"IngestQueue",
"PersistenceQueue",
"chunk_sequence",
Expand Down
8 changes: 2 additions & 6 deletions mcp_plex/loader/pipeline/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,10 @@
)
from ..imdb_cache import IMDbCache


LOGGER = logging.getLogger(__name__)
from plexapi.base import PlexPartialObject


try: # Only import plexapi when available; the sample data mode does not require it.
from plexapi.base import PlexPartialObject
except Exception: # pragma: no cover - plexapi is optional in tests
PlexPartialObject = object # type: ignore[assignment]
LOGGER = logging.getLogger(__name__)


def _extract_external_ids(item: PlexPartialObject) -> ExternalIDs:
Expand Down
18 changes: 6 additions & 12 deletions mcp_plex/loader/pipeline/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,20 @@

import asyncio
import logging
from typing import TYPE_CHECKING, Sequence
from typing import Sequence

from ...common.types import AggregatedItem
from .channels import (
EpisodeBatch,
IngestQueue,
IngestSentinel,
MovieBatch,
SampleBatch,
chunk_sequence,
)

if TYPE_CHECKING: # pragma: no cover - imported for typing
from plexapi.server import PlexServer
from plexapi.video import Episode, Movie, Season, Show
else: # pragma: no cover - runtime import with graceful fallback
try:
from plexapi.server import PlexServer
from plexapi.video import Episode, Movie, Season, Show
except Exception: # pragma: no cover - plexapi optional at runtime
PlexServer = Movie = Show = Season = Episode = object # type: ignore[assignment]
from plexapi.server import PlexServer
from plexapi.video import Episode, Movie, Season, Show


class IngestionStage:
Expand All @@ -43,15 +37,15 @@ def __init__(
episode_batch_size: int,
sample_batch_size: int,
output_queue: IngestQueue,
completion_sentinel: object,
completion_sentinel: IngestSentinel,
) -> None:
self._plex_server = plex_server
self._sample_items = list(sample_items) if sample_items is not None else None
self._movie_batch_size = int(movie_batch_size)
self._episode_batch_size = int(episode_batch_size)
self._sample_batch_size = int(sample_batch_size)
self._output_queue = output_queue
self._completion_sentinel = completion_sentinel
self._completion_sentinel: IngestSentinel = completion_sentinel
self._logger = logging.getLogger("mcp_plex.loader.ingestion")
self._items_ingested = 0
self._batches_ingested = 0
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "1.0.3"
version = "1.0.5"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
48 changes: 26 additions & 22 deletions tests/test_ingestion_stage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
from typing import cast
from unittest.mock import Mock, create_autospec

import pytest
Expand All @@ -11,6 +10,7 @@
from mcp_plex.loader.pipeline.channels import (
INGEST_DONE,
EpisodeBatch,
IngestSentinel,
MovieBatch,
SampleBatch,
)
Expand All @@ -32,7 +32,7 @@ def test_ingestion_stage_logger_name() -> None:
async def scenario() -> str:
queue: asyncio.Queue = asyncio.Queue()
stage = IngestionStage(
plex_server=cast(PlexServer, object()),
plex_server=create_autospec(PlexServer, instance=True),
sample_items=None,
movie_batch_size=50,
episode_batch_size=25,
Expand All @@ -47,9 +47,7 @@ async def scenario() -> str:


def test_ingestion_stage_sample_empty_batches() -> None:
sentinel = object()

async def scenario() -> tuple[object | None, object | None, bool, int, int]:
async def scenario() -> tuple[SampleBatch | None, None | IngestSentinel, bool, int, int]:
queue: asyncio.Queue = asyncio.Queue()
stage = IngestionStage(
plex_server=None,
Expand All @@ -58,7 +56,7 @@ async def scenario() -> tuple[object | None, object | None, bool, int, int]:
episode_batch_size=1,
sample_batch_size=2,
output_queue=queue,
completion_sentinel=sentinel,
completion_sentinel=INGEST_DONE,
)

await stage.run()
Expand All @@ -70,16 +68,20 @@ async def scenario() -> tuple[object | None, object | None, bool, int, int]:
first, second, empty, items_ingested, batches_ingested = asyncio.run(scenario())

assert first is None
assert second is sentinel
assert second is INGEST_DONE
assert empty is True
assert items_ingested == 0
assert batches_ingested == 0


def test_ingestion_stage_sample_partial_batches() -> None:
sentinel = object()

async def scenario() -> tuple[list[SampleBatch], object | None, object | None, int, int]:
async def scenario() -> tuple[
list[SampleBatch],
None | SampleBatch,
None | IngestSentinel,
int,
int,
]:
queue: asyncio.Queue = asyncio.Queue()
sample_items = [
_make_aggregated_item("1"),
Expand All @@ -93,7 +95,7 @@ async def scenario() -> tuple[list[SampleBatch], object | None, object | None, i
episode_batch_size=1,
sample_batch_size=2,
output_queue=queue,
completion_sentinel=sentinel,
completion_sentinel=INGEST_DONE,
)

await stage.run()
Expand All @@ -110,15 +112,19 @@ async def scenario() -> tuple[list[SampleBatch], object | None, object | None, i
assert all(isinstance(batch, SampleBatch) for batch in batches)
assert [len(batch.items) for batch in batches] == [2, 1]
assert first_token is None
assert second_token is sentinel
assert second_token is INGEST_DONE
assert items_ingested == 3
assert batches_ingested == 2


def test_ingestion_stage_backpressure_handling() -> None:
sentinel = object()

async def scenario() -> tuple[list[SampleBatch], object | None, object | None, int, int]:
async def scenario() -> tuple[
list[SampleBatch],
None | SampleBatch,
None | IngestSentinel,
int,
int,
]:
queue: asyncio.Queue = asyncio.Queue(maxsize=1)
sample_items = [
_make_aggregated_item("1"),
Expand All @@ -131,7 +137,7 @@ async def scenario() -> tuple[list[SampleBatch], object | None, object | None, i
episode_batch_size=1,
sample_batch_size=1,
output_queue=queue,
completion_sentinel=sentinel,
completion_sentinel=INGEST_DONE,
)

run_task = asyncio.create_task(stage.run())
Expand All @@ -155,17 +161,15 @@ async def scenario() -> tuple[list[SampleBatch], object | None, object | None, i

assert [len(batch.items) for batch in batches] == [1, 1]
assert first_token is None
assert second_token is sentinel
assert second_token is INGEST_DONE
assert items_ingested == 2
assert batches_ingested == 2


def test_ingestion_stage_ingest_plex_batches_movies_and_episodes(caplog) -> None:
caplog.set_level(logging.INFO)

sentinel = object()

async def scenario() -> tuple[list[object], int, int, Mock]:
async def scenario() -> tuple[list[MovieBatch | EpisodeBatch], int, int, Mock]:
queue: asyncio.Queue = asyncio.Queue()

movie_section = Mock()
Expand Down Expand Up @@ -213,7 +217,7 @@ def _episodes(titles: list[str]) -> list[Episode]:
episode_batch_size=2,
sample_batch_size=10,
output_queue=queue,
completion_sentinel=sentinel,
completion_sentinel=INGEST_DONE,
)

await stage._ingest_plex(
Expand All @@ -224,7 +228,7 @@ def _episodes(titles: list[str]) -> list[Episode]:
logger=stage.logger,
)

batches: list[object] = []
batches: list[MovieBatch | EpisodeBatch] = []
while not queue.empty():
batches.append(await queue.get())

Expand Down
19 changes: 13 additions & 6 deletions tests/test_loader_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,26 @@ def make_imdb_config(
)


def test_loader_import_fallback(monkeypatch):
def test_loader_import_requires_plexapi(monkeypatch):
real_import = builtins.__import__

def fake_import(name, globals=None, locals=None, fromlist=(), level=0):
if name.startswith("plexapi"):
raise ModuleNotFoundError
raise ModuleNotFoundError("forced plexapi failure")
return real_import(name, globals, locals, fromlist, level)

monkeypatch.setattr(builtins, "__import__", fake_import)
with monkeypatch.context() as ctx:
ctx.setattr(builtins, "__import__", fake_import)
with pytest.raises(ModuleNotFoundError):
importlib.reload(loader)

module = importlib.reload(loader)
assert module.PlexServer is None
assert module.PlexPartialObject is object
importlib.reload(loader)
from plexapi.base import PlexPartialObject
from plexapi.server import PlexServer

assert module.PlexServer is PlexServer
assert module.PlexPartialObject is PlexPartialObject
assert not hasattr(module, "PartialPlexObject")
def test_load_from_sample_returns_items():
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
items = _load_from_sample(sample_dir)
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.