Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "2.0.7"
version = "2.0.9"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
1 change: 1 addition & 0 deletions mcp_plex/common/cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""In-memory LRU cache for media payload and artwork data."""

from __future__ import annotations

from collections import OrderedDict
Expand Down
3 changes: 2 additions & 1 deletion mcp_plex/common/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Type definitions for Plex metadata and external services."""

from __future__ import annotations

from dataclasses import dataclass
Expand Down Expand Up @@ -221,6 +222,7 @@ class ExternalIDs:
imdb: Optional[str] = None
tmdb: Optional[str] = None


__all__ = [
"IMDbRating",
"IMDbTitle",
Expand All @@ -244,4 +246,3 @@ class ExternalIDs:
JSONValue: TypeAlias = JSONScalar | Sequence["JSONValue"] | Mapping[str, "JSONValue"]
JSONMapping: TypeAlias = Mapping[str, JSONValue]
MutableJSONMapping: TypeAlias = MutableMapping[str, JSONValue]

21 changes: 12 additions & 9 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Loader orchestration utilities and staged pipeline helpers."""

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -183,7 +184,9 @@ def _build_loader_orchestrator(
max_concurrent_upserts: int,
imdb_config: IMDbRuntimeConfig,
qdrant_config: QdrantRuntimeConfig,
) -> tuple[LoaderOrchestrator, list[AggregatedItem], asyncio.Queue[list[models.PointStruct]]]:
) -> tuple[
LoaderOrchestrator, list[AggregatedItem], asyncio.Queue[list[models.PointStruct]]
]:
"""Wire the staged loader pipeline and return the orchestrator helpers."""

from .pipeline.ingestion import IngestionStage
Expand All @@ -206,8 +209,7 @@ async def _upsert_aggregated(
return
items.extend(batch)
points = [
build_point(item, dense_model_name, sparse_model_name)
for item in batch
build_point(item, dense_model_name, sparse_model_name) for item in batch
]
for point_chunk in chunk_sequence(points, upsert_buffer_size):
await _upsert_in_batches(
Expand Down Expand Up @@ -332,7 +334,9 @@ async def run(
require_positive(max_concurrent_upserts, name="max_concurrent_upserts")
require_positive(qdrant_retry_attempts, name="qdrant_retry_attempts")

imdb_retry_queue = _load_imdb_retry_queue(imdb_queue_path) if imdb_queue_path else IMDbRetryQueue()
imdb_retry_queue = (
_load_imdb_retry_queue(imdb_queue_path) if imdb_queue_path else IMDbRetryQueue()
)
imdb_config = IMDbRuntimeConfig(
cache=imdb_cache,
max_retries=imdb_max_retries,
Expand Down Expand Up @@ -443,7 +447,9 @@ async def run(
if imdb_queue_path:
_persist_imdb_retry_queue(imdb_queue_path, imdb_config.retry_queue)

json.dump([item.model_dump(mode="json") for item in items], fp=sys.stdout, indent=2)
json.dump(
[item.model_dump(mode="json") for item in items], fp=sys.stdout, indent=2
)
sys.stdout.write("\n")
finally:
await client.close()
Expand Down Expand Up @@ -483,9 +489,7 @@ async def load_media(
"""Orchestrate one or more runs of :func:`run`."""

if delay < 0:
raise ValueError(
f"Delay between runs must be non-negative; received {delay!r}"
)
raise ValueError(f"Delay between runs must be non-negative; received {delay!r}")

while True:
await run(
Expand Down Expand Up @@ -521,4 +525,3 @@ async def load_media(
break

await asyncio.sleep(delay)

1 change: 1 addition & 0 deletions mcp_plex/loader/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module entrypoint for ``python -m mcp_plex.loader``."""

from __future__ import annotations

from .cli import main
Expand Down
6 changes: 5 additions & 1 deletion mcp_plex/loader/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Command-line interface for the loader pipeline."""

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -159,7 +160,10 @@
"--log-level",
envvar="LOG_LEVEL",
show_envvar=True,
type=click.Choice(["critical", "error", "warning", "info", "debug", "notset"], case_sensitive=False),
type=click.Choice(
["critical", "error", "warning", "info", "debug", "notset"],
case_sensitive=False,
),
default="info",
show_default=True,
help="Logging level for console output",
Expand Down
5 changes: 2 additions & 3 deletions mcp_plex/loader/pipeline/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
constants. The loader still emits ``None`` as a completion token for
compatibility while downstream components migrate to sentinel-only signaling.
"""

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -74,9 +75,7 @@ class SampleBatch:
IngestBatch = MovieBatch | EpisodeBatch | SampleBatch

IngestQueueItem: TypeAlias = IngestBatch | None | IngestSentinel
PersistenceQueueItem: TypeAlias = (
PersistencePayload | None | PersistenceSentinel
)
PersistenceQueueItem: TypeAlias = PersistencePayload | None | PersistenceSentinel

IngestQueue: TypeAlias = asyncio.Queue[IngestQueueItem]
PersistenceQueue: TypeAlias = asyncio.Queue[PersistenceQueueItem]
Expand Down
53 changes: 16 additions & 37 deletions mcp_plex/loader/pipeline/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,9 @@ async def get(
*,
params: Mapping[str, str] | None = None,
headers: Mapping[str, str] | None = None,
) -> httpx.Response:
...
) -> httpx.Response: ...

async def aclose(self) -> None:
...
async def aclose(self) -> None: ...


HTTPClientResource = (
Expand All @@ -80,9 +78,7 @@ async def aclose(self) -> None:
| AbstractContextManager[AsyncHTTPClient]
)

HTTPClientFactory = Callable[
[], HTTPClientResource | Awaitable[HTTPClientResource]
]
HTTPClientFactory = Callable[[], HTTPClientResource | Awaitable[HTTPClientResource]]


def _extract_external_ids(item: PlexPartialObject) -> ExternalIDs:
Expand Down Expand Up @@ -207,9 +203,7 @@ async def _fetch_tmdb_episode(
) -> TMDBEpisode | None:
"""Fetch TMDb data for a TV episode."""

url = (
f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}"
)
url = f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}"
try:
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
except httpx.HTTPError:
Expand Down Expand Up @@ -245,9 +239,7 @@ async def _fetch_tmdb_episode_chunk(
url, params=params, headers={"Authorization": f"Bearer {api_key}"}
)
except httpx.HTTPError:
LOGGER.exception(
"HTTP error fetching TMDb episode chunk for show %s", show_id
)
LOGGER.exception("HTTP error fetching TMDb episode chunk for show %s", show_id)
return {}
if not resp.is_success:
return {}
Expand Down Expand Up @@ -364,9 +356,7 @@ def __init__(
int(imdb_batch_limit), name="imdb_batch_limit"
)
if imdb_requests_per_window is not None and imdb_requests_per_window <= 0:
raise ValueError(
"imdb_requests_per_window must be positive when provided"
)
raise ValueError("imdb_requests_per_window must be positive when provided")
if imdb_window_seconds <= 0:
raise ValueError("imdb_window_seconds must be positive")
self._imdb_throttle = _RequestThrottler(
Expand Down Expand Up @@ -416,9 +406,7 @@ async def run(self) -> None:
got_item = True
try:
if batch is None:
self._logger.debug(
"Received legacy completion token; ignoring."
)
self._logger.debug("Received legacy completion token; ignoring.")
continue

if batch is INGEST_DONE:
Expand Down Expand Up @@ -450,9 +438,7 @@ async def run(self) -> None:
)
await self._handle_sample_batch(batch)
else: # pragma: no cover - defensive logging for future types
self._logger.warning(
"Received unsupported batch type: %r", batch
)
self._logger.warning("Received unsupported batch type: %r", batch)
finally:
if got_item:
self._ingest_queue.task_done()
Expand Down Expand Up @@ -526,7 +512,9 @@ async def _acquire_http_client(self) -> AsyncIterator[AsyncHTTPClient]:
return

if hasattr(resource, "__aenter__") and hasattr(resource, "__aexit__"):
async with cast(AbstractAsyncContextManager[AsyncHTTPClient], resource) as client:
async with cast(
AbstractAsyncContextManager[AsyncHTTPClient], resource
) as client:
yield client
return

Expand Down Expand Up @@ -592,9 +580,7 @@ async def _enrich_movies(
if not ids.tmdb:
continue
tmdb_tasks.append(
asyncio.create_task(
_fetch_tmdb_movie(client, ids.tmdb, api_key)
)
asyncio.create_task(_fetch_tmdb_movie(client, ids.tmdb, api_key))
)

imdb_map: dict[str, IMDbTitle | None] = {}
Expand All @@ -604,8 +590,7 @@ async def _enrich_movies(
combined_results = await asyncio.gather(imdb_future, *tmdb_tasks)
imdb_map = cast(dict[str, IMDbTitle | None], combined_results[0])
tmdb_results = [
cast(TMDBMovie | None, result)
for result in combined_results[1:]
cast(TMDBMovie | None, result) for result in combined_results[1:]
]
retry_snapshot = set(self._imdb_retry_queue.snapshot())
elif tmdb_tasks:
Expand Down Expand Up @@ -686,9 +671,7 @@ async def _enrich_episodes(
retry_snapshot: set[str] = set()
tmdb_results: list[TMDBEpisode | None] = [None] * len(episodes)
if imdb_future and tmdb_future:
imdb_map, tmdb_results = await asyncio.gather(
imdb_future, tmdb_future
)
imdb_map, tmdb_results = await asyncio.gather(imdb_future, tmdb_future)
retry_snapshot = set(self._imdb_retry_queue.snapshot())
elif imdb_future:
imdb_map = await imdb_future
Expand Down Expand Up @@ -723,9 +706,7 @@ async def _get_tmdb_show(
tmdb_id_str = str(tmdb_id)
if tmdb_id_str in self._show_tmdb_cache:
return self._show_tmdb_cache[tmdb_id_str]
show = await _fetch_tmdb_show(
client, tmdb_id_str, self._tmdb_api_key or ""
)
show = await _fetch_tmdb_show(client, tmdb_id_str, self._tmdb_api_key or "")
self._show_tmdb_cache[tmdb_id_str] = show
return show

Expand Down Expand Up @@ -972,9 +953,7 @@ async def _fetch_imdb_batch(
try:
response = await client.get(url, params=params)
except httpx.HTTPError:
LOGGER.exception(
"HTTP error fetching IMDb IDs %s", ",".join(chunk)
)
LOGGER.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk))
for imdb_id in chunk:
results[imdb_id] = None
break
Expand Down
13 changes: 6 additions & 7 deletions mcp_plex/loader/pipeline/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
having the stage skeleton in place allows other components to depend on the
interface.
"""

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -80,7 +81,9 @@ async def run(self) -> None:
else:
await self._run_plex_ingestion()

self._logger.debug("Publishing ingestion completion sentinels to downstream stages.")
self._logger.debug(
"Publishing ingestion completion sentinels to downstream stages."
)
await enqueue_nowait(self._output_queue, None)
await enqueue_nowait(self._output_queue, self._completion_sentinel)
self._logger.info(
Expand Down Expand Up @@ -346,19 +349,15 @@ def _fetch_shows(start: int) -> Sequence[Show]:
episode_total,
)

async def _enqueue_sample_batches(
self, items: Sequence[AggregatedItem]
) -> None:
async def _enqueue_sample_batches(self, items: Sequence[AggregatedItem]) -> None:
"""Place sample items onto the ingest queue in configured batch sizes."""

for chunk in chunk_sequence(items, self._sample_batch_size):
batch_items = list(chunk)
if not batch_items:
continue

await enqueue_nowait(
self._output_queue, SampleBatch(items=batch_items)
)
await enqueue_nowait(self._output_queue, SampleBatch(items=batch_items))
self._items_ingested += len(batch_items)
self._batches_ingested += 1
self._logger.debug(
Expand Down
13 changes: 4 additions & 9 deletions mcp_plex/loader/pipeline/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,15 @@ def __init__(self, spec: _StageSpec, error: BaseException) -> None:


class IngestionStageProtocol(Protocol):
async def run(self) -> None:
...
async def run(self) -> None: ...


class EnrichmentStageProtocol(Protocol):
async def run(self) -> None:
...
async def run(self) -> None: ...


class PersistenceStageProtocol(Protocol):
async def run(self, worker_id: int) -> None:
...
async def run(self, worker_id: int) -> None: ...


class LoaderOrchestrator:
Expand Down Expand Up @@ -127,9 +124,7 @@ async def _run_stage(
self._logger.debug("%s cancelled.", stage_name)
raise
except BaseException as exc:
self._logger.debug(
"%s raised %s", stage_name, exc, exc_info=exc
)
self._logger.debug("%s raised %s", stage_name, exc, exc_info=exc)
raise _StageFailure(spec, exc) from exc
else:
self._logger.info("%s completed successfully.", stage_name)
Expand Down
13 changes: 4 additions & 9 deletions mcp_plex/loader/pipeline/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
try: # pragma: no cover - allow import to fail when qdrant_client is absent
from qdrant_client import AsyncQdrantClient, models
except ModuleNotFoundError: # pragma: no cover - tooling without qdrant installed

class AsyncQdrantClient: # type: ignore[too-few-public-methods]
"""Fallback stub used when qdrant_client is unavailable."""

Expand Down Expand Up @@ -166,9 +167,7 @@ def _drain_additional_sentinels(self) -> int:

return drained

async def enqueue_points(
self, points: Sequence["models.PointStruct"]
) -> None:
async def enqueue_points(self, points: Sequence["models.PointStruct"]) -> None:
"""Chunk *points* and place them on the persistence queue."""

if not points:
Expand Down Expand Up @@ -229,14 +228,10 @@ async def run(self, worker_id: int) -> None:
outstanding_workers = max(
self._worker_count - self._shutdown_tokens_seen, 0
)
additional_tokens = max(
outstanding_workers - drained_sentinels, 0
)
additional_tokens = max(outstanding_workers - drained_sentinels, 0)
if additional_tokens:
for _ in range(additional_tokens):
await enqueue_nowait(
self._persistence_queue, PERSIST_DONE
)
await enqueue_nowait(self._persistence_queue, PERSIST_DONE)
self._logger.debug(
"Persistence queue sentinel received; finishing run for worker %d.",
worker_id,
Expand Down
Loading