Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "1.0.17"
version = "1.0.18"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
4 changes: 4 additions & 0 deletions mcp_plex/loader/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@
- `LoaderOrchestrator` must be initialised with the three stage instances, the ingest queue, the persistence queue, and the number of persistence workers (the CLI's `max_concurrent_upserts`).
- Convert `AggregatedItem` batches into Qdrant `PointStruct` objects with `build_point` before handing them to the persistence stage's `enqueue_points` helper.
- Prefer explicit keyword arguments when threading CLI options into stage constructors so the mapping is obvious to future readers.

## Typing Guidelines
- Avoid introducing new ``Any`` or bare ``object`` annotations in loader modules. Use ``TypedDict`` definitions, ``Protocol`` classes, or precise unions instead.
- When wider typing is unavoidable, leave a brief comment explaining why the loosening is necessary so future contributors can revisit it.
95 changes: 62 additions & 33 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import warnings
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, List, Optional, Sequence, TypeVar
from typing import TYPE_CHECKING, Sequence, TypedDict, TypeVar

import click
import httpx
Expand All @@ -19,7 +19,7 @@
from plexapi.base import PlexPartialObject as _PlexPartialObject
from plexapi.server import PlexServer

from .imdb_cache import IMDbCache
from .imdb_cache import IMDbCache, JSONValue
from .pipeline.channels import (
IMDbRetryQueue,
INGEST_DONE,
Expand Down Expand Up @@ -50,6 +50,10 @@
category=RuntimeWarning,
)

if TYPE_CHECKING: # pragma: no cover - import for typing only
from .pipeline.enrichment import _RequestThrottler


T = TypeVar("T")

IMDB_BATCH_LIMIT: int = 5
Expand All @@ -70,9 +74,9 @@ class IMDbRuntimeConfig:
retry_queue: IMDbRetryQueue
requests_per_window: int | None
window_seconds: float
_throttle: Any = field(default=None, init=False, repr=False)
_throttle: _RequestThrottler | None = field(default=None, init=False, repr=False)

def get_throttle(self) -> Any:
def get_throttle(self) -> _RequestThrottler | None:
"""Return the shared rate limiter, creating it on first use."""

if self.requests_per_window is None:
Expand Down Expand Up @@ -132,7 +136,7 @@ async def _fetch_imdb(
client: httpx.AsyncClient,
imdb_id: str,
config: IMDbRuntimeConfig,
) -> Optional[IMDbTitle]:
) -> IMDbTitle | None:
"""Fetch metadata for an IMDb ID with caching, retry, and throttling."""

from .pipeline import enrichment as enrichment_mod
Expand Down Expand Up @@ -390,10 +394,35 @@ def _build_point_text(item: AggregatedItem) -> str:
return "\n".join(p for p in parts if p)


def _build_point_payload(item: AggregatedItem) -> dict[str, object]:
class _BaseQdrantPayload(TypedDict):
data: dict[str, JSONValue]
title: str
type: str


class QdrantPayload(_BaseQdrantPayload, total=False):
show_title: str
season_title: str
season_number: int
episode_number: int
actors: list[str]
directors: list[str]
writers: list[str]
genres: list[str]
collections: list[str]
summary: str
overview: str
plot: str
tagline: str
reviews: list[str]
year: int
added_at: int


def _build_point_payload(item: AggregatedItem) -> QdrantPayload:
"""Construct the Qdrant payload for ``item``."""

payload: dict[str, object] = {
payload: QdrantPayload = {
"data": item.model_dump(mode="json"),
"title": item.plex.title,
"type": item.plex.type,
Expand Down Expand Up @@ -470,10 +499,10 @@ def build_point(
)


def _load_from_sample(sample_dir: Path) -> List[AggregatedItem]:
def _load_from_sample(sample_dir: Path) -> list[AggregatedItem]:
"""Load items from local sample JSON files."""

results: List[AggregatedItem] = []
results: list[AggregatedItem] = []
movie_dir = sample_dir / "movie"
episode_dir = sample_dir / "episode"

Expand Down Expand Up @@ -713,13 +742,13 @@ def _record_upsert(worker_id: int, batch_size: int, queue_size: int) -> None:


async def run(
plex_url: Optional[str],
plex_token: Optional[str],
tmdb_api_key: Optional[str],
sample_dir: Optional[Path],
qdrant_url: Optional[str],
qdrant_api_key: Optional[str],
qdrant_host: Optional[str] = None,
plex_url: str | None,
plex_token: str | None,
tmdb_api_key: str | None,
sample_dir: Path | None,
qdrant_url: str | None,
qdrant_api_key: str | None,
qdrant_host: str | None = None,
qdrant_port: int = 6333,
qdrant_grpc_port: int = 6334,
qdrant_https: bool = False,
Expand Down Expand Up @@ -799,7 +828,7 @@ async def run(
dense_distance=dense_distance,
)

items: List[AggregatedItem]
items: list[AggregatedItem]
if sample_dir is not None:
logger.info("Loading sample data from %s", sample_dir)
sample_items = _load_from_sample(sample_dir)
Expand Down Expand Up @@ -1087,13 +1116,13 @@ async def run(
help="Path to persistent IMDb retry queue",
)
def main(
plex_url: Optional[str],
plex_token: Optional[str],
tmdb_api_key: Optional[str],
sample_dir: Optional[Path],
qdrant_url: Optional[str],
qdrant_api_key: Optional[str],
qdrant_host: Optional[str],
plex_url: str | None,
plex_token: str | None,
tmdb_api_key: str | None,
sample_dir: Path | None,
qdrant_url: str | None,
qdrant_api_key: str | None,
qdrant_host: str | None,
qdrant_port: int,
qdrant_grpc_port: int,
qdrant_https: bool,
Expand All @@ -1109,7 +1138,7 @@ def main(
imdb_cache: Path,
imdb_max_retries: int,
imdb_backoff: float,
imdb_requests_per_window: Optional[int],
imdb_requests_per_window: int | None,
imdb_window_seconds: float,
imdb_queue: Path,
log_level: str,
Expand Down Expand Up @@ -1150,13 +1179,13 @@ def main(


async def load_media(
plex_url: Optional[str],
plex_token: Optional[str],
tmdb_api_key: Optional[str],
sample_dir: Optional[Path],
qdrant_url: Optional[str],
qdrant_api_key: Optional[str],
qdrant_host: Optional[str],
plex_url: str | None,
plex_token: str | None,
tmdb_api_key: str | None,
sample_dir: Path | None,
qdrant_url: str | None,
qdrant_api_key: str | None,
qdrant_host: str | None,
qdrant_port: int,
qdrant_grpc_port: int,
qdrant_https: bool,
Expand All @@ -1168,7 +1197,7 @@ async def load_media(
imdb_cache: Path,
imdb_max_retries: int,
imdb_backoff: float,
imdb_requests_per_window: Optional[int],
imdb_requests_per_window: int | None,
imdb_window_seconds: float,
imdb_queue: Path,
upsert_buffer_size: int,
Expand Down
53 changes: 47 additions & 6 deletions mcp_plex/loader/imdb_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@
import json
import logging
from pathlib import Path
from typing import Any
from typing import TypeAlias, cast

from pydantic import ValidationError

from ..common.types import IMDbTitle

JSONScalar: TypeAlias = str | int | float | bool | None
JSONValue: TypeAlias = (
JSONScalar | list["JSONValue"] | dict[str, "JSONValue"]
)


CachedIMDbPayload: TypeAlias = IMDbTitle | JSONValue


class IMDbCache:
Expand All @@ -13,7 +25,7 @@ class IMDbCache:

def __init__(self, path: Path) -> None:
self.path = path
self._data: dict[str, Any] = {}
self._data: dict[str, CachedIMDbPayload] = {}
if path.exists():
try:
raw_contents = path.read_text(encoding="utf-8")
Expand All @@ -25,22 +37,51 @@ def __init__(self, path: Path) -> None:
)
else:
try:
self._data = json.loads(raw_contents)
loaded = json.loads(raw_contents)
except (json.JSONDecodeError, UnicodeError) as exc:
self._logger.warning(
"Failed to decode IMDb cache JSON from %s; starting with empty cache.",
path,
exc_info=exc,
)
else:
if isinstance(loaded, dict):
hydrated: dict[str, CachedIMDbPayload] = {}
for key, value in loaded.items():
imdb_id = str(key)
payload: CachedIMDbPayload
if isinstance(value, dict):
try:
payload = IMDbTitle.model_validate(value)
except ValidationError as exc:
self._logger.debug(
"Failed to validate cached IMDb payload for %s; falling back to raw JSON.",
imdb_id,
exc_info=exc,
)
payload = cast(JSONValue, value)
else:
payload = cast(JSONValue, value)
hydrated[imdb_id] = payload
self._data = hydrated
else:
self._logger.warning(
"IMDb cache at %s did not contain an object; ignoring its contents.",
path,
)

def get(self, imdb_id: str) -> dict[str, Any] | None:
def get(self, imdb_id: str) -> CachedIMDbPayload | None:
"""Return cached data for ``imdb_id`` if present."""

return self._data.get(imdb_id)

def set(self, imdb_id: str, data: dict[str, Any]) -> None:
def set(self, imdb_id: str, data: CachedIMDbPayload) -> None:
"""Store ``data`` under ``imdb_id`` and persist to disk."""

self._data[imdb_id] = data
self.path.parent.mkdir(parents=True, exist_ok=True)
self.path.write_text(json.dumps(self._data))
serialisable = {
key: value.model_dump() if isinstance(value, IMDbTitle) else value
for key, value in self._data.items()
}
self.path.write_text(json.dumps(serialisable))
36 changes: 4 additions & 32 deletions mcp_plex/loader/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

from __future__ import annotations

from importlib import import_module
from typing import TYPE_CHECKING, Any

from .channels import (
EpisodeBatch,
IMDbRetryQueue,
Expand All @@ -19,11 +16,10 @@
)
from ...common.validation import require_positive

if TYPE_CHECKING:
from .enrichment import EnrichmentStage
from .ingestion import IngestionStage
from .orchestrator import LoaderOrchestrator
from .persistence import PersistenceStage
from .enrichment import EnrichmentStage
from .ingestion import IngestionStage
from .orchestrator import LoaderOrchestrator
from .persistence import PersistenceStage

__all__ = [
"IngestionStage",
Expand All @@ -42,27 +38,3 @@
"chunk_sequence",
"require_positive",
]

_STAGE_MODULES = {
"IngestionStage": ".ingestion",
"EnrichmentStage": ".enrichment",
"PersistenceStage": ".persistence",
"LoaderOrchestrator": ".orchestrator",
}


def __getattr__(name: str) -> Any:
"""Lazily import pipeline stage classes on first access."""

if name in _STAGE_MODULES:
module = import_module(f"{__name__}{_STAGE_MODULES[name]}")
value = getattr(module, name)
globals()[name] = value
return value
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


def __dir__() -> list[str]:
"""Return module attributes for introspection tools."""

return sorted(set(globals()) | set(__all__))
5 changes: 2 additions & 3 deletions mcp_plex/loader/pipeline/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Final,
Iterable,
Literal,
Expand Down Expand Up @@ -46,8 +45,8 @@

if TYPE_CHECKING:
PersistencePayload: TypeAlias = list[models.PointStruct]
else: # pragma: no cover - runtime fallback for typing-only alias
PersistencePayload: TypeAlias = list[Any]

PersistencePayload: TypeAlias = list["models.PointStruct"]


@dataclass(slots=True)
Expand Down
Loading