diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 2255805..c98e32b 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "0.26.58" +version = "0.26.60" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/docs/adr/0002-loader-pipeline-multi-worker.md b/docs/adr/0002-loader-pipeline-multi-worker.md new file mode 100644 index 0000000..87b3e88 --- /dev/null +++ b/docs/adr/0002-loader-pipeline-multi-worker.md @@ -0,0 +1,29 @@ +# ADR 0002: Loader Multi-Worker Pipeline + +## Status +Accepted + +## Context +The Plex loader previously executed ingestion, enrichment, and Qdrant writes sequentially. Each phase waited for the previous phase to finish, so TMDb and IMDb lookups, embedding generation, and upsert requests could not overlap. Large libraries therefore spent most of their time idle while waiting on network latency, and any failure in a downstream phase required rerunning the entire load. + +Stakeholders asked for the "loader multi worker rearchitecture plan" to overlap these phases, provide better fault isolation, and expose concurrency controls. The new design must also integrate with existing logging, testing, and configuration patterns. + +## Decision +We introduced a `LoaderPipeline` that streams points through dedicated stages backed by bounded queues: + +1. An ingestion worker pulls Plex items, resolves TMDb and IMDb metadata, and enqueues normalized `LoaderItem` batches. +2. A configurable pool of enrichment workers generates dense and sparse embeddings in parallel and enqueues `PointBatch` payloads. +3. An upsert worker sends Qdrant `upsert` requests as soon as batches are ready, logging partial failures without stalling upstream work. + +The pipeline propagates fatal errors across stages, drains queues on shutdown, records per-stage throughput, and surfaces CLI options so operators can tune Plex chunk size, enrichment batch size, and worker counts. + +## Consequences +- Loader runs overlap I/O and compute, reducing end-to-end latency for large libraries. +- Failure handling is localized: individual batch errors are logged and skipped while fatal exceptions halt the entire pipeline predictably. +- Logging, tests, and documentation must reflect the staged pipeline model and validate concurrency behaviors. +- Operators can experiment with worker counts and batch sizes without code changes. + +## Implementation Notes +- The pipeline and supporting dataclasses live in `mcp_plex/loader/__init__.py` with unit tests in `tests/test_loader_unit.py` and logging coverage in `tests/test_loader_logging.py`. +- CLI defaults align with the previous sequential behavior so existing deployments continue to work without extra configuration. +- Version metadata in `pyproject.toml`, `docker/pyproject.deps.toml`, and `uv.lock` is kept in sync whenever loader architecture changes are shipped. diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index 7aeb7b6..d88bb19 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -6,11 +6,20 @@ import json import logging import sys +import time import warnings from collections import deque +from dataclasses import dataclass from pathlib import Path -from types import TracebackType -from typing import AsyncIterator, Awaitable, Iterable, List, Optional, Sequence, TypeVar +from typing import ( + AsyncIterator, + Awaitable, + Iterable, + List, + Optional, + Sequence, + TypeVar, +) import click import httpx @@ -62,6 +71,31 @@ _qdrant_retry_backoff: float = 1.0 +@dataclass(slots=True) +class _MovieBatch: + """Batch of Plex movie items pending metadata enrichment.""" + + movies: list[PlexPartialObject] + + +@dataclass(slots=True) +class _EpisodeBatch: + """Batch of Plex episodes along with their parent show.""" + + show: PlexPartialObject + episodes: list[PlexPartialObject] + + +@dataclass(slots=True) +class _SampleBatch: + """Batch of pre-enriched items used by sample mode.""" + + items: list[AggregatedItem] + + +_IngestBatch = _MovieBatch | _EpisodeBatch | _SampleBatch + + def _require_positive(value: int, *, name: str) -> int: """Return *value* if positive, otherwise raise a ``ValueError``.""" @@ -70,6 +104,14 @@ def _require_positive(value: int, *, name: str) -> int: return value +def _chunk_sequence(items: Sequence[T], size: int) -> Iterable[Sequence[T]]: + """Yield ``items`` in chunks of at most ``size`` elements.""" + + size = _require_positive(int(size), name="size") + for start in range(0, len(items), size): + yield items[start : start + size] + + def _is_local_qdrant(client: AsyncQdrantClient) -> bool: """Return ``True`` if *client* targets an in-process Qdrant instance.""" @@ -643,6 +685,137 @@ def _build_plex_item(item: PlexPartialObject) -> PlexItem: ) +def _format_primary_title(item: AggregatedItem) -> str: + """Format the primary title text for ``item``.""" + + primary_title = item.plex.title + if item.plex.type == "episode": + title_bits: list[str] = [] + if item.plex.show_title: + title_bits.append(item.plex.show_title) + se_parts: list[str] = [] + if item.plex.season_number is not None: + se_parts.append(f"S{item.plex.season_number:02d}") + if item.plex.episode_number is not None: + se_parts.append(f"E{item.plex.episode_number:02d}") + if se_parts: + title_bits.append("".join(se_parts)) + if item.plex.title: + title_bits.append(item.plex.title) + if title_bits: + primary_title = " - ".join(title_bits) + return primary_title + + +def _build_point_text(item: AggregatedItem) -> str: + """Return the vector text for ``item``.""" + + parts = [ + _format_primary_title(item), + item.plex.summary or "", + item.tmdb.overview if item.tmdb and hasattr(item.tmdb, "overview") else "", + item.imdb.plot if item.imdb else "", + ] + directors_text = ", ".join(p.tag for p in item.plex.directors if p.tag) + writers_text = ", ".join(p.tag for p in item.plex.writers if p.tag) + actors_text = ", ".join(p.tag for p in item.plex.actors if p.tag) + if directors_text: + parts.append(f"Directed by {directors_text}") + if writers_text: + parts.append(f"Written by {writers_text}") + if actors_text: + parts.append(f"Starring {actors_text}") + if item.plex.tagline: + parts.append(item.plex.tagline) + if item.tmdb and hasattr(item.tmdb, "tagline"): + tagline = getattr(item.tmdb, "tagline", None) + if tagline: + parts.append(tagline) + if item.tmdb and hasattr(item.tmdb, "reviews"): + parts.extend(r.get("content", "") for r in getattr(item.tmdb, "reviews", [])) + return "\n".join(p for p in parts if p) + + +def _build_point_payload(item: AggregatedItem) -> dict[str, object]: + """Construct the Qdrant payload for ``item``.""" + + payload: dict[str, object] = { + "data": item.model_dump(mode="json"), + "title": item.plex.title, + "type": item.plex.type, + } + if item.plex.type == "episode": + if item.plex.show_title: + payload["show_title"] = item.plex.show_title + if item.plex.season_title: + payload["season_title"] = item.plex.season_title + if item.plex.season_number is not None: + payload["season_number"] = item.plex.season_number + if item.plex.episode_number is not None: + payload["episode_number"] = item.plex.episode_number + if item.plex.actors: + payload["actors"] = [p.tag for p in item.plex.actors if p.tag] + if item.plex.directors: + payload["directors"] = [p.tag for p in item.plex.directors if p.tag] + if item.plex.writers: + payload["writers"] = [p.tag for p in item.plex.writers if p.tag] + if item.plex.genres: + payload["genres"] = item.plex.genres + if item.plex.collections: + payload["collections"] = item.plex.collections + summary = item.plex.summary + if summary: + payload["summary"] = summary + overview = getattr(item.tmdb, "overview", None) if item.tmdb else None + if overview: + payload["overview"] = overview + plot = item.imdb.plot if item.imdb else None + if plot: + payload["plot"] = plot + taglines = [item.plex.tagline] + if item.tmdb and hasattr(item.tmdb, "tagline"): + taglines.append(getattr(item.tmdb, "tagline", None)) + taglines = [t for t in taglines if t] + if taglines: + payload["tagline"] = "\n".join(dict.fromkeys(taglines)) + if item.tmdb and hasattr(item.tmdb, "reviews"): + review_texts = [r.get("content", "") for r in getattr(item.tmdb, "reviews", [])] + review_texts = [r for r in review_texts if r] + if review_texts: + payload["reviews"] = review_texts + if item.plex.year is not None: + payload["year"] = item.plex.year + if item.plex.added_at is not None: + added = item.plex.added_at + if hasattr(added, "timestamp"): + payload["added_at"] = int(added.timestamp()) + return payload + + +def build_point( + item: AggregatedItem, + dense_model_name: str, + sparse_model_name: str, +) -> models.PointStruct: + """Build a Qdrant point for ``item`` using the configured model names.""" + + text = _build_point_text(item) + payload = _build_point_payload(item) + point_id: int | str = ( + int(item.plex.rating_key) + if item.plex.rating_key.isdigit() + else item.plex.rating_key + ) + return models.PointStruct( + id=point_id, + vector={ + "dense": models.Document(text=text, model=dense_model_name), + "sparse": models.Document(text=text, model=sparse_model_name), + }, + payload=payload, + ) + + async def _iter_from_plex( server: PlexServer, tmdb_api_key: str, *, batch_size: int = 50 ) -> AsyncIterator[AggregatedItem]: @@ -839,6 +1012,425 @@ async def _iter_from_sample(sample_dir: Path) -> AsyncIterator[AggregatedItem]: yield item +class LoaderPipeline: + """Coordinate ingestion, enrichment, and Qdrant upserts.""" + + def __init__( + self, + *, + client: AsyncQdrantClient, + collection_name: str, + dense_model_name: str, + sparse_model_name: str, + tmdb_api_key: str | None, + sample_items: list[AggregatedItem] | None, + plex_server: PlexServer | None, + plex_chunk_size: int, + enrichment_batch_size: int, + enrichment_workers: int, + upsert_buffer_size: int, + max_concurrent_upserts: int, + ) -> None: + self._client = client + self._collection_name = collection_name + self._dense_model_name = dense_model_name + self._sparse_model_name = sparse_model_name + self._tmdb_api_key = tmdb_api_key + self._sample_items = sample_items + self._server = plex_server + self._plex_chunk_size = _require_positive(plex_chunk_size, name="plex_chunk_size") + self._enrichment_batch_size = _require_positive( + enrichment_batch_size, name="enrichment_batch_size" + ) + self._enrichment_workers = _require_positive( + enrichment_workers, name="enrichment_workers" + ) + self._upsert_buffer_size = _require_positive( + upsert_buffer_size, name="upsert_buffer_size" + ) + self._max_concurrent_upserts = _require_positive( + max_concurrent_upserts, name="max_concurrent_upserts" + ) + + if self._sample_items is None and self._server is None: + raise RuntimeError("Either sample_items or plex_server must be provided") + if self._sample_items is None and not self._tmdb_api_key: + raise RuntimeError("TMDB API key required for live ingestion") + + self._ingest_queue: asyncio.Queue[_IngestBatch | None] = asyncio.Queue( + maxsize=self._enrichment_workers * 2 + ) + self._points_queue: asyncio.Queue[list[models.PointStruct] | None] = ( + asyncio.Queue() + ) + self._upsert_capacity = asyncio.Semaphore(self._max_concurrent_upserts) + self._items: list[AggregatedItem] = [] + self._qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = ( + asyncio.Queue() + ) + self._show_tmdb_cache: dict[str, TMDBShow | None] = {} + + self._ingested_count = 0 + self._enriched_count = 0 + self._upserted_points = 0 + now = time.perf_counter() + self._ingest_start = now + self._enrich_start = now + self._upsert_start = now + + @property + def qdrant_retry_queue(self) -> asyncio.Queue[list[models.PointStruct]]: + """Expose the Qdrant retry queue for post-processing.""" + + return self._qdrant_retry_queue + + @property + def items(self) -> list[AggregatedItem]: + """Return the aggregated items processed by the pipeline.""" + + return self._items + + async def execute(self) -> None: + """Run the full ingestion/enrichment/upsert pipeline.""" + + async with httpx.AsyncClient(timeout=30) as client: + self._http_client = client + ingest_task = asyncio.create_task(self._ingest()) + enrichment_tasks = [ + asyncio.create_task(self._enrichment_worker(worker_id)) + for worker_id in range(self._enrichment_workers) + ] + upsert_tasks = [ + asyncio.create_task(self._upsert_worker(worker_id)) + for worker_id in range(self._max_concurrent_upserts) + ] + error: BaseException | None = None + try: + await ingest_task + await self._ingest_queue.join() + await asyncio.gather(*enrichment_tasks) + await self._points_queue.join() + except BaseException as exc: + error = exc + finally: + for _ in range(self._max_concurrent_upserts): + await self._points_queue.put(None) + upsert_results = await asyncio.gather( + *upsert_tasks, return_exceptions=True + ) + for result in upsert_results: + if isinstance(result, BaseException) and not isinstance( + result, asyncio.CancelledError + ): + if error is None: + error = result + break + for task in enrichment_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*enrichment_tasks, return_exceptions=True) + if not ingest_task.done(): + ingest_task.cancel() + await asyncio.gather(ingest_task, return_exceptions=True) + if error is not None: + raise error + + def _log_progress( + self, stage: str, count: int, start: float, queue_size: int + ) -> None: + elapsed = time.perf_counter() - start + rate = count / elapsed if elapsed > 0 else 0.0 + logger.info( + "%s processed %d items (%.2f items/sec, queue size=%d)", + stage, + count, + rate, + queue_size, + ) + + async def _ingest(self) -> None: + start = time.perf_counter() + self._ingest_start = start + try: + if self._sample_items is not None: + await self._ingest_sample() + else: + await self._ingest_from_plex() + finally: + for _ in range(self._enrichment_workers): + await self._ingest_queue.put(None) + self._log_progress("Ingestion", self._ingested_count, start, self._ingest_queue.qsize()) + + async def _ingest_sample(self) -> None: + for chunk in _chunk_sequence(self._sample_items or [], self._enrichment_batch_size): + batch = _SampleBatch(items=list(chunk)) + if not batch.items: + continue + await self._ingest_queue.put(batch) + self._ingested_count += len(batch.items) + self._log_progress( + "Ingestion", + self._ingested_count, + self._ingest_start, + self._ingest_queue.qsize(), + ) + + async def _ingest_from_plex(self) -> None: + if self._server is None: + raise RuntimeError("Plex server unavailable for ingestion") + movie_section = self._server.library.section("Movies") + movie_keys = [int(m.ratingKey) for m in movie_section.all()] + for key_chunk in _chunk_sequence(movie_keys, self._plex_chunk_size): + key_list = list(key_chunk) + movies = list(self._server.fetchItems(key_list)) if key_list else [] + if not movies: + continue + await self._ingest_queue.put(_MovieBatch(movies=movies)) + self._ingested_count += len(movies) + self._log_progress( + "Ingestion", + self._ingested_count, + self._ingest_start, + self._ingest_queue.qsize(), + ) + show_section = self._server.library.section("TV Shows") + show_keys = [int(s.ratingKey) for s in show_section.all()] + for show_chunk in _chunk_sequence(show_keys, self._plex_chunk_size): + shows = list(self._server.fetchItems(list(show_chunk))) + for show in shows: + episode_keys = [int(e.ratingKey) for e in show.episodes()] + for episode_chunk in _chunk_sequence(episode_keys, self._plex_chunk_size): + keys = list(episode_chunk) + episodes = list(self._server.fetchItems(keys)) if keys else [] + if not episodes: + continue + await self._ingest_queue.put( + _EpisodeBatch(show=show, episodes=episodes) + ) + self._ingested_count += len(episodes) + self._log_progress( + "Ingestion", + self._ingested_count, + self._ingest_start, + self._ingest_queue.qsize(), + ) + + async def _enrichment_worker(self, worker_id: int) -> None: + while True: + batch = await self._ingest_queue.get() + if batch is None: + self._ingest_queue.task_done() + break + try: + if isinstance(batch, _MovieBatch): + await self._process_movie_batch(batch) + elif isinstance(batch, _EpisodeBatch): + await self._process_episode_batch(batch) + else: + await self._process_sample_batch(batch) + finally: + self._ingest_queue.task_done() + + async def _process_movie_batch(self, batch: _MovieBatch) -> None: + for chunk in _chunk_sequence(batch.movies, self._enrichment_batch_size): + movies = list(chunk) + if not movies: + continue + if self._enriched_count == 0: + self._enrich_start = time.perf_counter() + aggregated = await self._enrich_movies(movies) + self._enriched_count += len(aggregated) + self._log_progress( + "Enrichment", + self._enriched_count, + self._enrich_start, + self._points_queue.qsize(), + ) + await self._emit_points(aggregated) + + async def _process_episode_batch(self, batch: _EpisodeBatch) -> None: + for chunk in _chunk_sequence(batch.episodes, self._enrichment_batch_size): + episodes = list(chunk) + if not episodes: + continue + if self._enriched_count == 0: + self._enrich_start = time.perf_counter() + aggregated = await self._enrich_episodes(batch.show, episodes) + self._enriched_count += len(aggregated) + self._log_progress( + "Enrichment", + self._enriched_count, + self._enrich_start, + self._points_queue.qsize(), + ) + await self._emit_points(aggregated) + + async def _process_sample_batch(self, batch: _SampleBatch) -> None: + for chunk in _chunk_sequence(batch.items, self._enrichment_batch_size): + aggregated = list(chunk) + if not aggregated: + continue + if self._enriched_count == 0: + self._enrich_start = time.perf_counter() + self._enriched_count += len(aggregated) + self._log_progress( + "Enrichment", + self._enriched_count, + self._enrich_start, + self._points_queue.qsize(), + ) + await self._emit_points(aggregated) + + async def _enrich_movies( + self, movies: Sequence[PlexPartialObject] + ) -> list[AggregatedItem]: + movie_ids = [_extract_external_ids(movie) for movie in movies] + imdb_ids = [ids.imdb for ids in movie_ids if ids.imdb] + imdb_map = ( + await _fetch_imdb_batch(self._http_client, imdb_ids) + if imdb_ids + else {} + ) + + api_key = self._tmdb_api_key + tmdb_results: list[TMDBMovie | None] = [] + if api_key: + tmdb_tasks = [ + _fetch_tmdb_movie(self._http_client, ids.tmdb, api_key) + for ids in movie_ids + if ids.tmdb + ] + if tmdb_tasks: + tmdb_results = await asyncio.gather(*tmdb_tasks) + tmdb_iter = iter(tmdb_results) + + aggregated: list[AggregatedItem] = [] + for movie, ids in zip(movies, movie_ids): + tmdb = next(tmdb_iter, None) if ids.tmdb else None + imdb = imdb_map.get(ids.imdb) if ids.imdb else None + aggregated.append( + AggregatedItem( + plex=_build_plex_item(movie), + imdb=imdb, + tmdb=tmdb, + ) + ) + return aggregated + + async def _enrich_episodes( + self, show: PlexPartialObject, episodes: Sequence[PlexPartialObject] + ) -> list[AggregatedItem]: + show_ids = _extract_external_ids(show) + show_tmdb: TMDBShow | None = None + if show_ids.tmdb: + show_tmdb = await self._get_tmdb_show(show_ids.tmdb) + episode_ids = [_extract_external_ids(ep) for ep in episodes] + imdb_ids = [ids.imdb for ids in episode_ids if ids.imdb] + imdb_map = ( + await _fetch_imdb_batch(self._http_client, imdb_ids) + if imdb_ids + else {} + ) + + tmdb_results: list[TMDBEpisode | None] = [] + if show_tmdb: + episode_tasks = [ + self._lookup_tmdb_episode(show_tmdb, ep) + for ep in episodes + ] + if episode_tasks: + tmdb_results = await asyncio.gather(*episode_tasks) + tmdb_iter = iter(tmdb_results) + aggregated: list[AggregatedItem] = [] + for ep, ids in zip(episodes, episode_ids): + tmdb_episode = next(tmdb_iter, None) if show_tmdb else None + imdb = imdb_map.get(ids.imdb) if ids.imdb else None + tmdb_item: TMDBItem | None = tmdb_episode or show_tmdb + aggregated.append( + AggregatedItem( + plex=_build_plex_item(ep), + imdb=imdb, + tmdb=tmdb_item, + ) + ) + return aggregated + + async def _get_tmdb_show(self, tmdb_id: str) -> TMDBShow | None: + if tmdb_id in self._show_tmdb_cache: + return self._show_tmdb_cache[tmdb_id] + show = await _fetch_tmdb_show(self._http_client, tmdb_id, self._tmdb_api_key or "") + self._show_tmdb_cache[tmdb_id] = show + return show + + async def _lookup_tmdb_episode( + self, show_tmdb: TMDBShow | None, episode: PlexPartialObject + ) -> TMDBEpisode | None: + if not show_tmdb or not self._tmdb_api_key: + return None + season = resolve_tmdb_season_number(show_tmdb, episode) + ep_num = getattr(episode, "index", None) + if isinstance(ep_num, str) and ep_num.isdigit(): + ep_num = int(ep_num) + if season is None or ep_num is None: + return None + return await _fetch_tmdb_episode( + self._http_client, + show_tmdb.id, + season, + ep_num, + self._tmdb_api_key, + ) + + async def _emit_points(self, aggregated: Sequence[AggregatedItem]) -> None: + if not aggregated: + return + self._items.extend(aggregated) + points = [ + build_point(item, self._dense_model_name, self._sparse_model_name) + for item in aggregated + ] + for chunk in _chunk_sequence(points, self._upsert_buffer_size): + batch = list(chunk) + if not batch: + continue + await self._upsert_capacity.acquire() + try: + await self._points_queue.put(batch) + except BaseException: + self._upsert_capacity.release() + raise + + async def _upsert_worker(self, worker_id: int) -> None: + while True: + batch = await self._points_queue.get() + if batch is None: + self._points_queue.task_done() + break + logger.info( + "Upsert worker %d handling %d points (queue size=%d)", + worker_id, + len(batch), + self._points_queue.qsize(), + ) + try: + if self._upserted_points == 0: + self._upsert_start = time.perf_counter() + await _upsert_in_batches( + self._client, + self._collection_name, + batch, + retry_queue=self._qdrant_retry_queue, + ) + self._upserted_points += len(batch) + self._log_progress( + f"Upsert worker {worker_id}", + self._upserted_points, + self._upsert_start, + self._points_queue.qsize(), + ) + finally: + self._points_queue.task_done() + self._upsert_capacity.release() async def run( plex_url: Optional[str], plex_token: Optional[str], @@ -858,6 +1450,9 @@ async def run( imdb_backoff: float = 1.0, imdb_queue_path: Path | None = None, upsert_buffer_size: int = _qdrant_upsert_buffer_size, + plex_chunk_size: int = 200, + enrichment_batch_size: int = 100, + enrichment_workers: int = 4, ) -> None: """Core execution logic for the CLI.""" @@ -873,6 +1468,9 @@ async def run( _imdb_retry_queue = _IMDbRetryQueue() _require_positive(upsert_buffer_size, name="upsert_buffer_size") + _require_positive(plex_chunk_size, name="plex_chunk_size") + _require_positive(enrichment_batch_size, name="enrichment_batch_size") + _require_positive(enrichment_workers, name="enrichment_workers") dense_size, dense_distance = _resolve_dense_model_params(dense_model_name) if qdrant_url is None and qdrant_host is None: @@ -894,10 +1492,24 @@ async def run( dense_distance=dense_distance, ) - items: List[AggregatedItem] = [] + items: List[AggregatedItem] if sample_dir is not None: logger.info("Loading sample data from %s", sample_dir) - item_iter = _iter_from_sample(sample_dir) + sample_items = _load_from_sample(sample_dir) + pipeline = LoaderPipeline( + client=client, + collection_name=collection_name, + dense_model_name=dense_model_name, + sparse_model_name=sparse_model_name, + tmdb_api_key=None, + sample_items=sample_items, + plex_server=None, + plex_chunk_size=plex_chunk_size, + enrichment_batch_size=enrichment_batch_size, + enrichment_workers=enrichment_workers, + upsert_buffer_size=upsert_buffer_size, + max_concurrent_upserts=_qdrant_max_concurrent_upserts, + ) else: if PlexServer is None: raise RuntimeError("plexapi is required for live loading") @@ -907,200 +1519,30 @@ async def run( raise RuntimeError("TMDB_API_KEY must be provided") logger.info("Loading data from Plex server %s", plex_url) server = PlexServer(plex_url, plex_token) - item_iter = _iter_from_plex(server, tmdb_api_key) - - points_buffer: List[models.PointStruct] = [] - qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = asyncio.Queue() - max_concurrent_upserts = _require_positive( - _qdrant_max_concurrent_upserts, name="max_concurrent_upserts" - ) - upsert_queue: asyncio.Queue[List[models.PointStruct] | None] = asyncio.Queue() - upsert_capacity = asyncio.Semaphore(max_concurrent_upserts) - batches_enqueued = 0 - worker_error: Exception | None = None - worker_error_tb: TracebackType | None = None - worker_error_lock = asyncio.Lock() - - async def _upsert_worker() -> None: - nonlocal worker_error, worker_error_tb - while True: - batch = await upsert_queue.get() - if batch is None: - upsert_queue.task_done() - break - logger.info( - "Upserting %d points into Qdrant collection %s using batches of up to %d", - len(batch), - collection_name, - _qdrant_batch_size, - ) - try: - await _upsert_in_batches( - client, - collection_name, - batch, - retry_queue=qdrant_retry_queue, - ) - except Exception as exc: # defensive guard - async with worker_error_lock: - if worker_error is None: - worker_error = exc - worker_error_tb = exc.__traceback__ - logger.exception("Unexpected error upserting batch") - finally: - upsert_queue.task_done() - if batch is not None: - upsert_capacity.release() - - upsert_workers = [ - asyncio.create_task(_upsert_worker()) for _ in range(max_concurrent_upserts) - ] - - async for item in item_iter: - items.append(item) - primary_title = item.plex.title - if item.plex.type == "episode": - title_bits: list[str] = [] - if item.plex.show_title: - title_bits.append(item.plex.show_title) - se_parts: list[str] = [] - if item.plex.season_number is not None: - se_parts.append(f"S{item.plex.season_number:02d}") - if item.plex.episode_number is not None: - se_parts.append(f"E{item.plex.episode_number:02d}") - if se_parts: - title_bits.append("".join(se_parts)) - if item.plex.title: - title_bits.append(item.plex.title) - if title_bits: - primary_title = " - ".join(title_bits) - parts = [ - primary_title, - item.plex.summary or "", - item.tmdb.overview if item.tmdb and hasattr(item.tmdb, "overview") else "", - item.imdb.plot if item.imdb else "", - ] - directors_text = ", ".join(p.tag for p in item.plex.directors if p.tag) - writers_text = ", ".join(p.tag for p in item.plex.writers if p.tag) - actors_text = ", ".join(p.tag for p in item.plex.actors if p.tag) - if directors_text: - parts.append(f"Directed by {directors_text}") - if writers_text: - parts.append(f"Written by {writers_text}") - if actors_text: - parts.append(f"Starring {actors_text}") - if item.plex.tagline: - parts.append(item.plex.tagline) - if item.tmdb and hasattr(item.tmdb, "tagline"): - tagline = getattr(item.tmdb, "tagline", None) - if tagline: - parts.append(tagline) - if item.tmdb and hasattr(item.tmdb, "reviews"): - parts.extend(r.get("content", "") for r in getattr(item.tmdb, "reviews", [])) - text = "\n".join(p for p in parts if p) - payload = { - "data": item.model_dump(mode="json"), - "title": item.plex.title, - "type": item.plex.type, - } - if item.plex.type == "episode": - if item.plex.show_title: - payload["show_title"] = item.plex.show_title - if item.plex.season_title: - payload["season_title"] = item.plex.season_title - if item.plex.season_number is not None: - payload["season_number"] = item.plex.season_number - if item.plex.episode_number is not None: - payload["episode_number"] = item.plex.episode_number - if item.plex.actors: - payload["actors"] = [p.tag for p in item.plex.actors if p.tag] - if item.plex.directors: - payload["directors"] = [p.tag for p in item.plex.directors if p.tag] - if item.plex.writers: - payload["writers"] = [p.tag for p in item.plex.writers if p.tag] - if item.plex.genres: - payload["genres"] = item.plex.genres - if item.plex.collections: - payload["collections"] = item.plex.collections - summary = item.plex.summary - if summary: - payload["summary"] = summary - overview = getattr(item.tmdb, "overview", None) if item.tmdb else None - if overview: - payload["overview"] = overview - plot = item.imdb.plot if item.imdb else None - if plot: - payload["plot"] = plot - taglines = [item.plex.tagline] - if item.tmdb and hasattr(item.tmdb, "tagline"): - taglines.append(getattr(item.tmdb, "tagline", None)) - taglines = [t for t in taglines if t] - if taglines: - payload["tagline"] = "\n".join(dict.fromkeys(taglines)) - if item.tmdb and hasattr(item.tmdb, "reviews"): - review_texts = [r.get("content", "") for r in getattr(item.tmdb, "reviews", [])] - review_texts = [r for r in review_texts if r] - if review_texts: - payload["reviews"] = review_texts - if item.plex.year is not None: - payload["year"] = item.plex.year - if item.plex.added_at is not None: - payload["added_at"] = int(item.plex.added_at.timestamp()) - point_id: int | str = ( - int(item.plex.rating_key) - if item.plex.rating_key.isdigit() - else item.plex.rating_key - ) - points_buffer.append( - models.PointStruct( - id=point_id, - vector={ - "dense": models.Document(text=text, model=dense_model_name), - "sparse": models.Document(text=text, model=sparse_model_name), - }, - payload=payload, - ) + pipeline = LoaderPipeline( + client=client, + collection_name=collection_name, + dense_model_name=dense_model_name, + sparse_model_name=sparse_model_name, + tmdb_api_key=tmdb_api_key, + sample_items=None, + plex_server=server, + plex_chunk_size=plex_chunk_size, + enrichment_batch_size=enrichment_batch_size, + enrichment_workers=enrichment_workers, + upsert_buffer_size=upsert_buffer_size, + max_concurrent_upserts=_qdrant_max_concurrent_upserts, ) - if len(points_buffer) >= upsert_buffer_size: - batch = list(points_buffer) - points_buffer.clear() - batches_enqueued += 1 - await upsert_capacity.acquire() - try: - await upsert_queue.put(batch) - except BaseException: - upsert_capacity.release() - raise - + await pipeline.execute() + items = pipeline.items logger.info("Loaded %d items", len(items)) + if not items: + logger.info("No points to upsert") - if points_buffer: - batch = list(points_buffer) - points_buffer.clear() - batches_enqueued += 1 - await upsert_capacity.acquire() - try: - await upsert_queue.put(batch) - except BaseException: - upsert_capacity.release() - raise - - try: - await upsert_queue.join() - if batches_enqueued == 0: - logger.info("No points to upsert") - finally: - for _ in range(max_concurrent_upserts): - await upsert_queue.put(None) - await asyncio.gather(*upsert_workers) - - if worker_error is not None: - if worker_error_tb is not None: - raise worker_error.with_traceback(worker_error_tb) - raise worker_error - - await _process_qdrant_retry_queue(client, collection_name, qdrant_retry_queue) + await _process_qdrant_retry_queue( + client, collection_name, pipeline.qdrant_retry_queue + ) if imdb_queue_path: _persist_imdb_retry_queue(imdb_queue_path) @@ -1201,6 +1643,33 @@ async def _upsert_worker() -> None: show_default=True, help="Number of media items to buffer before scheduling an async upsert", ) +@click.option( + "--plex-chunk-size", + envvar="PLEX_CHUNK_SIZE", + show_envvar=True, + type=int, + default=200, + show_default=True, + help="Number of Plex rating keys to request per fetchItems batch", +) +@click.option( + "--enrichment-batch-size", + envvar="ENRICHMENT_BATCH_SIZE", + show_envvar=True, + type=int, + default=100, + show_default=True, + help="Number of media items to enrich per metadata batch", +) +@click.option( + "--enrichment-workers", + envvar="ENRICHMENT_WORKERS", + show_envvar=True, + type=int, + default=4, + show_default=True, + help="Number of concurrent metadata enrichment workers", +) @click.option( "--dense-model", envvar="DENSE_MODEL", @@ -1282,6 +1751,9 @@ def main( qdrant_https: bool, qdrant_prefer_grpc: bool, upsert_buffer_size: int, + plex_chunk_size: int, + enrichment_batch_size: int, + enrichment_workers: int, dense_model: str, sparse_model: str, continuous: bool, @@ -1315,6 +1787,9 @@ def main( imdb_backoff, imdb_queue, upsert_buffer_size, + plex_chunk_size, + enrichment_batch_size, + enrichment_workers, ) ) @@ -1340,6 +1815,9 @@ async def load_media( imdb_backoff: float, imdb_queue: Path, upsert_buffer_size: int, + plex_chunk_size: int, + enrichment_batch_size: int, + enrichment_workers: int, ) -> None: """Orchestrate one or more runs of :func:`run`.""" @@ -1363,6 +1841,9 @@ async def load_media( imdb_backoff, imdb_queue, upsert_buffer_size, + plex_chunk_size, + enrichment_batch_size, + enrichment_workers, ) if not continuous: break diff --git a/pyproject.toml b/pyproject.toml index 5a0dfc2..59866ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.58" +version = "0.26.60" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_loader_logging.py b/tests/test_loader_logging.py index 53de66e..e3740e9 100644 --- a/tests/test_loader_logging.py +++ b/tests/test_loader_logging.py @@ -34,8 +34,9 @@ def test_run_logs_upsert(monkeypatch, caplog): with caplog.at_level(logging.INFO, logger="mcp_plex.loader"): asyncio.run(loader.run(None, None, None, sample_dir, None, None)) assert "Loaded 2 items" in caplog.text - assert "Upserting 2 points" in caplog.text - assert "using batches of up to" in caplog.text + assert "Upsert worker" in caplog.text + assert "handling 2 points" in caplog.text + assert "processed 2 items" in caplog.text def test_run_logs_no_points(monkeypatch, caplog): @@ -76,24 +77,27 @@ def test_run_limits_concurrent_upserts(monkeypatch): started = asyncio.Queue() release_queue = asyncio.Queue() third_requested = asyncio.Event() - base_items = list(loader._load_from_sample(sample_dir)) - async def fake_iter(sample_dir): - for idx, item in enumerate(base_items + base_items[:1]): - if idx == 2: - third_requested.set() - yield item + monkeypatch.setattr( + loader, + "_load_from_sample", + lambda _: base_items + base_items[:1], + ) + + upsert_calls = {"count": 0} async def fake_upsert(client, collection_name, points, **kwargs): + upsert_calls["count"] += 1 + if upsert_calls["count"] == 3: + third_requested.set() concurrency["current"] += 1 concurrency["max"] = max(concurrency["max"], concurrency["current"]) - await started.put(None) + await started.put(upsert_calls["count"]) await release_queue.get() concurrency["current"] -= 1 monkeypatch.setattr(loader, "_upsert_in_batches", fake_upsert) - monkeypatch.setattr(loader, "_iter_from_sample", fake_iter) async def invoke(): run_task = asyncio.create_task( @@ -103,6 +107,7 @@ async def invoke(): assert not third_requested.is_set() await release_queue.put(None) await asyncio.wait_for(started.get(), timeout=1) + assert not third_requested.is_set() await release_queue.put(None) await asyncio.wait_for(started.get(), timeout=1) await release_queue.put(None) @@ -121,22 +126,19 @@ def test_run_ensures_collection_before_loading(monkeypatch): async def fake_ensure(*args, **kwargs): order.append("ensure") - orig_iter = loader._iter_from_sample + monkeypatch.setattr(loader, "_ensure_collection", fake_ensure) + sample_dir = Path(__file__).resolve().parents[1] / "sample-data" + original_execute = loader.LoaderPipeline.execute - async def fake_iter(sample_dir): - order.append("iter") - async for item in orig_iter(sample_dir): - yield item + async def fake_execute(self): + order.append("execute") + self._items = [] - async def fake_upsert(client, collection_name, points, **kwargs): - return None + monkeypatch.setattr(loader.LoaderPipeline, "execute", fake_execute) - monkeypatch.setattr(loader, "_ensure_collection", fake_ensure) - monkeypatch.setattr(loader, "_iter_from_sample", fake_iter) - monkeypatch.setattr(loader, "_upsert_in_batches", fake_upsert) - - sample_dir = Path(__file__).resolve().parents[1] / "sample-data" asyncio.run(loader.run(None, None, None, sample_dir, None, None)) assert order and order[0] == "ensure" - assert order.count("iter") >= 1 + assert "execute" in order + + monkeypatch.setattr(loader.LoaderPipeline, "execute", original_execute) diff --git a/tests/test_loader_unit.py b/tests/test_loader_unit.py index e69406e..57256b5 100644 --- a/tests/test_loader_unit.py +++ b/tests/test_loader_unit.py @@ -1,9 +1,7 @@ import asyncio import builtins import importlib -import io import json -import sys import types from datetime import datetime from pathlib import Path @@ -15,6 +13,7 @@ from mcp_plex import loader from mcp_plex.loader.imdb_cache import IMDbCache from mcp_plex.loader import ( + LoaderPipeline, _build_plex_item, _extract_external_ids, _fetch_imdb, @@ -27,6 +26,7 @@ _persist_imdb_retry_queue, _process_imdb_retry_queue, _resolve_dense_model_params, + build_point, resolve_tmdb_season_number, ) from mcp_plex.common.types import ( @@ -672,69 +672,56 @@ def test_build_plex_item_converts_string_indices(): assert item.episode_number == 3 -def test_run_live_loader_builds_payload_with_collections(monkeypatch): - monkeypatch.setattr(loader, "_imdb_cache", None) - monkeypatch.setattr(loader, "_imdb_retry_queue", loader._IMDbRetryQueue()) - - class DummyClient: - def __init__(self, *args, **kwargs): - self._client = None +def test_build_point_includes_metadata(): + plex_item = PlexItem( + rating_key="1", + guid="guid", + type="movie", + title="Sample", + summary="Summary", + year=2024, + added_at=datetime.fromtimestamp(1), + guids=[PlexGuid(id="plex://1")], + tagline="Tagline", + directors=[PlexPerson(id=1, tag="Director")], + writers=[PlexPerson(id=2, tag="Writer")], + actors=[PlexPerson(id=3, tag="Actor")], + genres=["Action"], + collections=["Favorites"], + ) + imdb_title = IMDbTitle( + id="tt1", + type="movie", + primaryTitle="Sample", + plot="Plot", + rating=IMDbRating(aggregateRating=8.0), + directors=[IMDbName(id="nm1", displayName="Director")], + ) + tmdb_movie = TMDBMovie( + id=1, + title="Sample", + overview="Overview", + tagline="Another tagline", + reviews=[{"content": "Great"}], + ) + item = AggregatedItem(plex=plex_item, imdb=imdb_title, tmdb=tmdb_movie) - async def collection_exists(self, collection_name: str) -> bool: - return False - - async def create_collection(self, *args, **kwargs) -> None: - pass - - async def create_payload_index(self, *args, **kwargs) -> None: - pass - - monkeypatch.setattr(loader, "AsyncQdrantClient", lambda *args, **kwargs: DummyClient()) - - async def fake_iter(server, tmdb_api_key, *, batch_size=50): - plex_item = PlexItem( - rating_key="1", - guid="guid", - type="movie", - title="Sample", - summary="Summary", - year=2024, - added_at=datetime.fromtimestamp(1), - guids=[PlexGuid(id="plex://1")], - thumb="thumb.jpg", - art="art.jpg", - tagline="Tagline", - directors=[PlexPerson(id=1, tag="Director")], - writers=[PlexPerson(id=2, tag="Writer")], - actors=[PlexPerson(id=3, tag="Actor")], - genres=["Action"], - collections=["Favorites"], - ) - imdb_title = IMDbTitle( - id="tt1", - type="movie", - primaryTitle="Sample", - plot="Plot", - rating=IMDbRating(aggregateRating=8.0), - directors=[IMDbName(id="nm1", displayName="Director")], - ) - tmdb_movie = TMDBMovie( - id=1, - title="Sample", - overview="Overview", - tagline="Another tagline", - reviews=[{"content": "Great"}], - ) - yield AggregatedItem(plex=plex_item, imdb=imdb_title, tmdb=tmdb_movie) + point = build_point( + item, + dense_model_name="BAAI/bge-small-en-v1.5", + sparse_model_name="Qdrant/bm42-all-minilm-l6-v2-attentions", + ) - monkeypatch.setattr(loader, "_iter_from_plex", fake_iter) + assert point.payload["title"] == "Sample" + assert point.payload["collections"] == ["Favorites"] + assert point.payload["plot"] == "Plot" + assert "Directed by" in point.vector["dense"].text + assert point.vector["dense"].model == "BAAI/bge-small-en-v1.5" + assert point.vector["sparse"].model == "Qdrant/bm42-all-minilm-l6-v2-attentions" - class DummyPlexServer: - def __init__(self, url: str, token: str) -> None: - self.url = url - self.token = token - monkeypatch.setattr(loader, "PlexServer", DummyPlexServer) +def test_loader_pipeline_processes_sample_batches(monkeypatch): + items = _load_from_sample(Path(__file__).resolve().parents[1] / "sample-data") recorded_batches: list[list[models.PointStruct]] = [] @@ -743,21 +730,28 @@ async def record_upsert(client, collection_name: str, points, **kwargs): monkeypatch.setattr(loader, "_upsert_in_batches", record_upsert) - stdout = io.StringIO() - monkeypatch.setattr(sys, "stdout", stdout) - - asyncio.run( - loader.run( - "http://localhost:32400", - "token", - "tmdb-key", - None, - None, - None, - ) + pipeline = LoaderPipeline( + client=object(), + collection_name="media-items", + dense_model_name="BAAI/bge-small-en-v1.5", + sparse_model_name="Qdrant/bm42-all-minilm-l6-v2-attentions", + tmdb_api_key=None, + sample_items=items, + plex_server=None, + plex_chunk_size=10, + enrichment_batch_size=1, + enrichment_workers=2, + upsert_buffer_size=1, + max_concurrent_upserts=1, ) - assert recorded_batches, "expected upsert batches to be scheduled" + asyncio.run(pipeline.execute()) + + assert len(pipeline.items) == len(items) + assert recorded_batches, "expected pipeline to emit upsert batches" payload = recorded_batches[0][0].payload - assert payload["collections"] == ["Favorites"] - assert payload["reviews"] == ["Great"] + assert payload["title"] + assert payload["type"] in {"movie", "episode"} + reviews = payload.get("reviews") + if reviews is not None: + assert isinstance(reviews, list) diff --git a/uv.lock b/uv.lock index eebbd3e..d7b7214 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.58" +version = "0.26.60" source = { editable = "." } dependencies = [ { name = "fastapi" },