From 2f840897978ab491095953485f573011877c2b26 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 08:08:16 -0600 Subject: [PATCH 1/2] refactor(loader): modularize task pipeline --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/__init__.py | 1472 +++++------------------- mcp_plex/loader/enrichment/__init__.py | 204 ++++ mcp_plex/loader/enrichment/types.py | 42 + mcp_plex/loader/enrichment/utils.py | 552 +++++++++ mcp_plex/loader/ingestion/__init__.py | 122 ++ mcp_plex/loader/ingestion/types.py | 43 + mcp_plex/loader/ingestion/utils.py | 136 +++ mcp_plex/loader/storage/__init__.py | 125 ++ mcp_plex/loader/storage/types.py | 9 + mcp_plex/loader/storage/utils.py | 314 +++++ mcp_plex/loader/utils.py | 89 ++ pyproject.toml | 2 +- tests/test_loader_logging.py | 2 +- uv.lock | 2 +- 15 files changed, 1933 insertions(+), 1183 deletions(-) create mode 100644 mcp_plex/loader/enrichment/__init__.py create mode 100644 mcp_plex/loader/enrichment/types.py create mode 100644 mcp_plex/loader/enrichment/utils.py create mode 100644 mcp_plex/loader/ingestion/__init__.py create mode 100644 mcp_plex/loader/ingestion/types.py create mode 100644 mcp_plex/loader/ingestion/utils.py create mode 100644 mcp_plex/loader/storage/__init__.py create mode 100644 mcp_plex/loader/storage/types.py create mode 100644 mcp_plex/loader/storage/utils.py create mode 100644 mcp_plex/loader/utils.py diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index c98e32b..07e25a2 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "0.26.60" +version = "0.26.61" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index d88bb19..c26c6c1 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -2,24 +2,13 @@ from __future__ import annotations import asyncio -import inspect import json import logging import sys import time import warnings -from collections import deque -from dataclasses import dataclass from pathlib import Path -from typing import ( - AsyncIterator, - Awaitable, - Iterable, - List, - Optional, - Sequence, - TypeVar, -) +from typing import AsyncIterator, List, Optional, Sequence import click import httpx @@ -31,14 +20,42 @@ AggregatedItem, ExternalIDs, IMDbTitle, - PlexGuid, PlexItem, - PlexPerson, TMDBEpisode, TMDBItem, TMDBMovie, TMDBShow, ) +from .utils import ( + close_coroutines as _close_coroutines, + gather_in_batches as _gather_in_batches, + iter_gather_in_batches as _iter_gather_in_batches, + require_positive as _require_positive, + resolve_dense_model_params as _resolve_dense_model_params_impl, +) +from .ingestion import IngestionTask, IngestBatch +from .ingestion.utils import chunk_sequence as _chunk_sequence, load_from_sample as _load_from_sample_impl +from .enrichment import EnrichmentTask, IMDbRetryQueue +from .enrichment.utils import ( + build_plex_item as _build_plex_item_impl, + extract_external_ids as _extract_external_ids_impl, + fetch_imdb as _fetch_imdb_impl, + fetch_imdb_batch as _fetch_imdb_batch_impl, + fetch_tmdb_episode as _fetch_tmdb_episode_impl, + fetch_tmdb_movie as _fetch_tmdb_movie_impl, + fetch_tmdb_show as _fetch_tmdb_show_impl, + load_imdb_retry_queue as _load_imdb_retry_queue_impl, + persist_imdb_retry_queue as _persist_imdb_retry_queue_impl, + process_imdb_retry_queue as _process_imdb_retry_queue_impl, + resolve_tmdb_season_number, +) +from .storage import StorageTask +from .storage.utils import ( + build_point, + ensure_collection as _ensure_collection_impl, + process_qdrant_retry_queue as _process_qdrant_retry_queue_impl, + upsert_in_batches as _upsert_in_batches_impl, +) try: # Only import plexapi when available; the sample data mode does not require it. from plexapi.base import PlexPartialObject @@ -53,16 +70,16 @@ warnings.filterwarnings( "ignore", - message=".*'mcp_plex\\.loader' found in sys.modules after import of package 'mcp_plex'.*", + message=r".*'mcp_plex\.loader' found in sys.modules after import of package 'mcp_plex'.*", category=RuntimeWarning, ) -T = TypeVar("T") +_IMDbRetryQueue = IMDbRetryQueue _imdb_cache: IMDbCache | None = None _imdb_max_retries: int = 3 _imdb_backoff: float = 1.0 -_imdb_retry_queue: "_IMDbRetryQueue" | None = None +_imdb_retry_queue: IMDbRetryQueue | None = None _imdb_batch_limit: int = 5 _qdrant_batch_size: int = 1000 _qdrant_upsert_buffer_size: int = 200 @@ -71,750 +88,84 @@ _qdrant_retry_backoff: float = 1.0 -@dataclass(slots=True) -class _MovieBatch: - """Batch of Plex movie items pending metadata enrichment.""" - - movies: list[PlexPartialObject] - - -@dataclass(slots=True) -class _EpisodeBatch: - """Batch of Plex episodes along with their parent show.""" - - show: PlexPartialObject - episodes: list[PlexPartialObject] - - -@dataclass(slots=True) -class _SampleBatch: - """Batch of pre-enriched items used by sample mode.""" - - items: list[AggregatedItem] - - -_IngestBatch = _MovieBatch | _EpisodeBatch | _SampleBatch - - -def _require_positive(value: int, *, name: str) -> int: - """Return *value* if positive, otherwise raise a ``ValueError``.""" - - if value <= 0: - raise ValueError(f"{name} must be positive") - return value - - -def _chunk_sequence(items: Sequence[T], size: int) -> Iterable[Sequence[T]]: - """Yield ``items`` in chunks of at most ``size`` elements.""" - - size = _require_positive(int(size), name="size") - for start in range(0, len(items), size): - yield items[start : start + size] - - -def _is_local_qdrant(client: AsyncQdrantClient) -> bool: - """Return ``True`` if *client* targets an in-process Qdrant instance.""" - - inner = getattr(client, "_client", None) - return bool(inner) and inner.__class__.__module__.startswith( - "qdrant_client.local" +async def _fetch_imdb( + client: httpx.AsyncClient, + imdb_id: str, + *, + cache: IMDbCache | None = None, + max_retries: int | None = None, + backoff: float | None = None, + retry_queue: IMDbRetryQueue | None = None, + logger_override: logging.Logger | None = None, +) -> Optional[IMDbTitle]: + effective_logger = logger_override or logger + return await _fetch_imdb_impl( + client, + imdb_id, + cache=cache if cache is not None else _imdb_cache, + max_retries=max_retries if max_retries is not None else _imdb_max_retries, + backoff=backoff if backoff is not None else _imdb_backoff, + retry_queue=retry_queue if retry_queue is not None else _imdb_retry_queue, + logger=effective_logger, ) -class _IMDbRetryQueue(asyncio.Queue[str]): - """Queue that tracks items in a deque for safe serialization.""" - - def __init__(self, initial: Iterable[str] | None = None): - super().__init__() - self._items: deque[str] = deque() - if initial: - for imdb_id in initial: - imdb_id_str = str(imdb_id) - super().put_nowait(imdb_id_str) - self._items.append(imdb_id_str) - - def put_nowait(self, item: str) -> None: # type: ignore[override] - super().put_nowait(item) - self._items.append(item) - - def get_nowait(self) -> str: # type: ignore[override] - if not self._items: - raise RuntimeError("Desynchronization: Queue is not empty but self._items is empty.") - try: - item = super().get_nowait() - except asyncio.QueueEmpty: - raise RuntimeError("Desynchronization: self._items is not empty but asyncio.Queue is empty.") - self._items.popleft() - return item - - def snapshot(self) -> list[str]: - """Return a list of the current queue contents.""" - - return list(self._items) - -# Known Qdrant-managed dense embedding models with their dimensionality and -# similarity metric. To support a new server-side embedding model, add an entry -# here with the appropriate vector size and `models.Distance` value. -_DENSE_MODEL_PARAMS: dict[str, tuple[int, models.Distance]] = { - "BAAI/bge-small-en-v1.5": (384, models.Distance.COSINE), - "BAAI/bge-base-en-v1.5": (768, models.Distance.COSINE), - "BAAI/bge-large-en-v1.5": (1024, models.Distance.COSINE), - "text-embedding-3-small": (1536, models.Distance.COSINE), - "text-embedding-3-large": (3072, models.Distance.COSINE), -} - - -def _close_coroutines(tasks: Sequence[Awaitable[object]]) -> None: - """Close coroutine objects to avoid unawaited warnings.""" - - for task in tasks: - if inspect.iscoroutine(task): - task.close() - - -async def _iter_gather_in_batches( - tasks: Sequence[Awaitable[T]], batch_size: int -) -> AsyncIterator[T]: - """Yield results from awaitable tasks in fixed-size batches.""" - - try: - _require_positive(batch_size, name="batch_size") - except ValueError: - _close_coroutines(tasks) - raise - - total = len(tasks) - for i in range(0, total, batch_size): - batch = tasks[i : i + batch_size] - for result in await asyncio.gather(*batch): - yield result - logger.info("Processed %d/%d items", min(i + batch_size, total), total) - - -async def _gather_in_batches( - tasks: Sequence[Awaitable[T]], batch_size: int -) -> List[T]: - """Gather awaitable tasks in fixed-size batches.""" - - return [result async for result in _iter_gather_in_batches(tasks, batch_size)] - - -def _resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]: - """Look up Qdrant vector parameters for a known dense embedding model.""" - - try: - return _DENSE_MODEL_PARAMS[model_name] - except KeyError as exc: - raise ValueError( - f"Unknown dense embedding model '{model_name}'. Update _DENSE_MODEL_PARAMS with the model's size and distance." - ) from exc - - -async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbTitle]: - """Fetch metadata for an IMDb ID with caching and retry logic.""" - - if _imdb_cache: - cached = _imdb_cache.get(imdb_id) - if cached: - return IMDbTitle.model_validate(cached) - - url = f"https://api.imdbapi.dev/titles/{imdb_id}" - delay = _imdb_backoff - for attempt in range(_imdb_max_retries + 1): - try: - resp = await client.get(url) - except httpx.HTTPError: - logger.exception("HTTP error fetching IMDb ID %s", imdb_id) - return None - if resp.status_code == 429: - if attempt == _imdb_max_retries: - if _imdb_retry_queue is not None: - await _imdb_retry_queue.put(imdb_id) - return None - await asyncio.sleep(delay) - delay *= 2 - continue - if resp.is_success: - data = resp.json() - if _imdb_cache: - _imdb_cache.set(imdb_id, data) - return IMDbTitle.model_validate(data) - return None - return None - - async def _fetch_imdb_batch( client: httpx.AsyncClient, imdb_ids: Sequence[str] ) -> dict[str, Optional[IMDbTitle]]: - """Fetch metadata for multiple IMDb IDs, batching requests.""" - - results: dict[str, Optional[IMDbTitle]] = {} - ids_to_fetch: list[str] = [] - for imdb_id in imdb_ids: - if _imdb_cache: - cached = _imdb_cache.get(imdb_id) - if cached: - results[imdb_id] = IMDbTitle.model_validate(cached) - continue - ids_to_fetch.append(imdb_id) - - if not ids_to_fetch: - return results - - url = "https://api.imdbapi.dev/titles:batchGet" - for i in range(0, len(ids_to_fetch), _imdb_batch_limit): - chunk = ids_to_fetch[i : i + _imdb_batch_limit] - params = [("titleIds", imdb_id) for imdb_id in chunk] - delay = _imdb_backoff - for attempt in range(_imdb_max_retries + 1): - try: - resp = await client.get(url, params=params) - except httpx.HTTPError: - logger.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk)) - for imdb_id in chunk: - results[imdb_id] = None - break - if resp.status_code == 429: - if attempt == _imdb_max_retries: - if _imdb_retry_queue is not None: - for imdb_id in chunk: - await _imdb_retry_queue.put(imdb_id) - for imdb_id in chunk: - results[imdb_id] = None - break - await asyncio.sleep(delay) - delay *= 2 - continue - if resp.is_success: - data = resp.json() - found: set[str] = set() - for title_data in data.get("titles", []): - imdb_title = IMDbTitle.model_validate(title_data) - results[imdb_title.id] = imdb_title - found.add(imdb_title.id) - if _imdb_cache: - _imdb_cache.set(imdb_title.id, title_data) - for missing in set(chunk) - found: - results[missing] = None - break - for imdb_id in chunk: - results[imdb_id] = None - break - - return results + return await _fetch_imdb_batch_impl( + client, + imdb_ids, + cache=_imdb_cache, + batch_limit=_imdb_batch_limit, + max_retries=_imdb_max_retries, + backoff=_imdb_backoff, + retry_queue=_imdb_retry_queue, + logger=logger, + ) def _load_imdb_retry_queue(path: Path) -> None: - """Populate the retry queue from a JSON file if it exists.""" - global _imdb_retry_queue - ids: list[str] = [] - if path.exists(): - try: - data = json.loads(path.read_text()) - if isinstance(data, list): - ids = [str(imdb_id) for imdb_id in data] - else: - logger.warning( - "IMDb retry queue file %s did not contain a list; ignoring its contents", - path, - ) - except Exception: - logger.exception("Failed to load IMDb retry queue from %s", path) - _imdb_retry_queue = _IMDbRetryQueue(ids) + _imdb_retry_queue = _load_imdb_retry_queue_impl(path, logger) async def _process_imdb_retry_queue(client: httpx.AsyncClient) -> None: - """Attempt to fetch queued IMDb IDs, re-queueing failures.""" - - if _imdb_retry_queue is None or _imdb_retry_queue.empty(): + if _imdb_retry_queue is None: return - size = _imdb_retry_queue.qsize() - for _ in range(size): - imdb_id = await _imdb_retry_queue.get() - title = await _fetch_imdb(client, imdb_id) - if title is None: - await _imdb_retry_queue.put(imdb_id) + async def _retry_fetch(client: httpx.AsyncClient, imdb_id: str, **_: object) -> Optional[IMDbTitle]: + return await _fetch_imdb(client, imdb_id) + await _process_imdb_retry_queue_impl( + client, + _imdb_retry_queue, + cache=_imdb_cache, + max_retries=_imdb_max_retries, + backoff=_imdb_backoff, + logger=logger, + fetch_fn=_retry_fetch, + ) def _persist_imdb_retry_queue(path: Path) -> None: - """Persist the retry queue to disk.""" - if _imdb_retry_queue is None: return - path.write_text(json.dumps(_imdb_retry_queue.snapshot())) - - -async def _upsert_in_batches( - client: AsyncQdrantClient, - collection_name: str, - points: Sequence[models.PointStruct], - *, - retry_queue: asyncio.Queue[list[models.PointStruct]] | None = None, -) -> None: - """Upsert points into Qdrant in batches, logging HTTP errors.""" - - total = len(points) - for i in range(0, total, _qdrant_batch_size): - batch = points[i : i + _qdrant_batch_size] - try: - await client.upsert(collection_name=collection_name, points=batch) - except Exception: - logger.exception( - "Failed to upsert batch %d-%d", i, i + len(batch) - ) - if retry_queue is not None: - await retry_queue.put(list(batch)) - else: - logger.info( - "Upserted %d/%d points", min(i + len(batch), total), total - ) - - -async def _process_qdrant_retry_queue( - client: AsyncQdrantClient, - collection_name: str, - retry_queue: asyncio.Queue[list[models.PointStruct]], -) -> None: - """Retry failed Qdrant batches with exponential backoff.""" - - if retry_queue.empty(): - return - - pending = retry_queue.qsize() - logger.info("Retrying %d failed Qdrant batches", pending) - while not retry_queue.empty(): - batch = await retry_queue.get() - attempt = 1 - while attempt <= _qdrant_retry_attempts: - try: - await client.upsert( - collection_name=collection_name, - points=batch, - ) - except Exception: - logger.exception( - "Retry %d/%d failed for Qdrant batch of %d points", - attempt, - _qdrant_retry_attempts, - len(batch), - ) - attempt += 1 - if attempt > _qdrant_retry_attempts: - logger.error( - "Giving up on Qdrant batch after %d attempts; %d points were not indexed", - _qdrant_retry_attempts, - len(batch), - ) - break - await asyncio.sleep(_qdrant_retry_backoff * attempt) - continue - else: - logger.info( - "Successfully retried Qdrant batch of %d points on attempt %d", - len(batch), - attempt, - ) - break - - -async def _ensure_collection( - client: AsyncQdrantClient, - collection_name: str, - *, - dense_size: int, - dense_distance: models.Distance, -) -> None: - """Create the collection and payload indexes if they do not already exist.""" - - created_collection = False - if not await client.collection_exists(collection_name): - await client.create_collection( - collection_name=collection_name, - vectors_config={"dense": models.VectorParams(size=dense_size, distance=dense_distance)}, - sparse_vectors_config={"sparse": models.SparseVectorParams()}, - ) - created_collection = True - - if not created_collection: - return - - suppress_payload_warning = _is_local_qdrant(client) - - async def _create_index( - field_name: str, - field_schema: models.PayloadSchemaType | models.TextIndexParams, - ) -> None: - if suppress_payload_warning: - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message="Payload indexes have no effect in the local Qdrant.*", - category=UserWarning, - ) - await client.create_payload_index( - collection_name=collection_name, - field_name=field_name, - field_schema=field_schema, - ) - else: - await client.create_payload_index( - collection_name=collection_name, - field_name=field_name, - field_schema=field_schema, - ) - - text_index = models.TextIndexParams( - type=models.PayloadSchemaType.TEXT, - tokenizer=models.TokenizerType.WORD, - min_token_len=2, - lowercase=True, - ) - await _create_index("title", text_index) - await _create_index("type", models.PayloadSchemaType.KEYWORD) - await _create_index("year", models.PayloadSchemaType.INTEGER) - await _create_index("added_at", models.PayloadSchemaType.INTEGER) - await _create_index("actors", models.PayloadSchemaType.KEYWORD) - await _create_index("directors", models.PayloadSchemaType.KEYWORD) - await _create_index("writers", models.PayloadSchemaType.KEYWORD) - await _create_index("genres", models.PayloadSchemaType.KEYWORD) - await _create_index("show_title", models.PayloadSchemaType.KEYWORD) - await _create_index("season_number", models.PayloadSchemaType.INTEGER) - await _create_index("episode_number", models.PayloadSchemaType.INTEGER) - await _create_index("collections", models.PayloadSchemaType.KEYWORD) - await _create_index("summary", text_index) - await _create_index("overview", text_index) - await _create_index("plot", text_index) - await _create_index("tagline", text_index) - await _create_index("reviews", text_index) - await _create_index("data.plex.rating_key", models.PayloadSchemaType.KEYWORD) - await _create_index("data.imdb.id", models.PayloadSchemaType.KEYWORD) - await _create_index("data.tmdb.id", models.PayloadSchemaType.INTEGER) - - -async def _fetch_tmdb_movie( - client: httpx.AsyncClient, tmdb_id: str, api_key: str -) -> Optional[TMDBMovie]: - url = ( - f"https://api.themoviedb.org/3/movie/{tmdb_id}?append_to_response=reviews" - ) - try: - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) - except httpx.HTTPError: - logger.exception("HTTP error fetching TMDb movie %s", tmdb_id) - return None - if resp.is_success: - return TMDBMovie.model_validate(resp.json()) - return None - - -async def _fetch_tmdb_show( - client: httpx.AsyncClient, tmdb_id: str, api_key: str -) -> Optional[TMDBShow]: - url = ( - f"https://api.themoviedb.org/3/tv/{tmdb_id}?append_to_response=reviews" - ) - try: - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) - except httpx.HTTPError: - logger.exception("HTTP error fetching TMDb show %s", tmdb_id) - return None - if resp.is_success: - return TMDBShow.model_validate(resp.json()) - return None - + _persist_imdb_retry_queue_impl(path, _imdb_retry_queue) -async def _fetch_tmdb_episode( - client: httpx.AsyncClient, - show_id: int, - season_number: int, - episode_number: int, - api_key: str, -) -> Optional[TMDBEpisode]: - """Fetch TMDb data for a TV episode.""" - url = ( - f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}" - ) - try: - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) - except httpx.HTTPError: - logger.exception( - "HTTP error fetching TMDb episode %s S%sE%s", - show_id, - season_number, - episode_number, - ) - return None - if resp.is_success: - return TMDBEpisode.model_validate(resp.json()) - return None - - -def resolve_tmdb_season_number( - show_tmdb: Optional[TMDBShow], episode: PlexPartialObject -) -> Optional[int]: - """Map a Plex episode to the appropriate TMDb season number. - - This resolves cases where Plex uses year-based season indices that do not - match TMDb's sequential ``season_number`` values. - """ - - parent_index = getattr(episode, "parentIndex", None) - parent_title = getattr(episode, "parentTitle", None) - parent_year = getattr(episode, "parentYear", None) - if parent_year is None: - parent_year = getattr(episode, "year", None) - - seasons = getattr(show_tmdb, "seasons", []) if show_tmdb else [] - - # direct numeric match - if parent_index is not None: - for season in seasons: - if season.season_number == parent_index: - return season.season_number - - # match by season name (e.g. "Season 2018" -> "2018") - title_norm: Optional[str] = None - if isinstance(parent_title, str): - title_norm = parent_title.lower().lstrip("season ").strip() - for season in seasons: - name_norm = (season.name or "").lower().lstrip("season ").strip() - if name_norm == title_norm: - return season.season_number - - # match by air date year when Plex uses year-based seasons - year: Optional[int] = None - if isinstance(parent_year, int): - year = parent_year - elif isinstance(parent_index, int): - year = parent_index - elif title_norm and title_norm.isdigit(): - year = int(title_norm) - - if year is not None: - for season in seasons: - air = getattr(season, "air_date", None) - if isinstance(air, str) and len(air) >= 4 and air[:4].isdigit(): - if int(air[:4]) == year: - return season.season_number - - if isinstance(parent_index, int): - return parent_index - if isinstance(parent_index, str) and parent_index.isdigit(): - return int(parent_index) - if isinstance(parent_title, str) and parent_title.isdigit(): - return int(parent_title) - return None def _extract_external_ids(item: PlexPartialObject) -> ExternalIDs: """Extract IMDb and TMDb IDs from a Plex object.""" - imdb_id: Optional[str] = None - tmdb_id: Optional[str] = None - for guid in getattr(item, "guids", []) or []: - gid = getattr(guid, "id", "") - if gid.startswith("imdb://"): - imdb_id = gid.split("imdb://", 1)[1] - elif gid.startswith("tmdb://"): - tmdb_id = gid.split("tmdb://", 1)[1] - return ExternalIDs(imdb=imdb_id, tmdb=tmdb_id) + return _extract_external_ids_impl(item) def _build_plex_item(item: PlexPartialObject) -> PlexItem: """Convert a Plex object into the internal :class:`PlexItem`.""" - guids = [PlexGuid(id=g.id) for g in getattr(item, "guids", [])] - directors = [ - PlexPerson(id=getattr(d, "id", 0), tag=str(getattr(d, "tag", "")), thumb=getattr(d, "thumb", None)) - for d in getattr(item, "directors", []) or [] - ] - writers = [ - PlexPerson(id=getattr(w, "id", 0), tag=str(getattr(w, "tag", "")), thumb=getattr(w, "thumb", None)) - for w in getattr(item, "writers", []) or [] - ] - actors = [ - PlexPerson( - id=getattr(a, "id", 0), - tag=str(getattr(a, "tag", "")), - thumb=getattr(a, "thumb", None), - role=getattr(a, "role", None), - ) - for a in getattr(item, "actors", []) or getattr(item, "roles", []) or [] - ] - genres = [ - str(getattr(g, "tag", "")) - for g in getattr(item, "genres", []) or [] - if getattr(g, "tag", None) - ] - collections = [ - str(getattr(c, "tag", "")) - for c in getattr(item, "collections", []) or [] - if getattr(c, "tag", None) - ] - season_number = getattr(item, "parentIndex", None) - if isinstance(season_number, str): - season_number = int(season_number) if season_number.isdigit() else None - episode_number = getattr(item, "index", None) - if isinstance(episode_number, str): - episode_number = int(episode_number) if episode_number.isdigit() else None - - return PlexItem( - rating_key=str(getattr(item, "ratingKey", "")), - guid=str(getattr(item, "guid", "")), - type=str(getattr(item, "type", "")), - title=str(getattr(item, "title", "")), - show_title=getattr(item, "grandparentTitle", None), - season_title=getattr(item, "parentTitle", None), - season_number=season_number, - episode_number=episode_number, - summary=getattr(item, "summary", None), - year=getattr(item, "year", None), - added_at=getattr(item, "addedAt", None), - guids=guids, - thumb=getattr(item, "thumb", None), - art=getattr(item, "art", None), - tagline=getattr(item, "tagline", None), - content_rating=getattr(item, "contentRating", None), - directors=directors, - writers=writers, - actors=actors, - genres=genres, - collections=collections, - ) + return _build_plex_item_impl(item) -def _format_primary_title(item: AggregatedItem) -> str: - """Format the primary title text for ``item``.""" - - primary_title = item.plex.title - if item.plex.type == "episode": - title_bits: list[str] = [] - if item.plex.show_title: - title_bits.append(item.plex.show_title) - se_parts: list[str] = [] - if item.plex.season_number is not None: - se_parts.append(f"S{item.plex.season_number:02d}") - if item.plex.episode_number is not None: - se_parts.append(f"E{item.plex.episode_number:02d}") - if se_parts: - title_bits.append("".join(se_parts)) - if item.plex.title: - title_bits.append(item.plex.title) - if title_bits: - primary_title = " - ".join(title_bits) - return primary_title - - -def _build_point_text(item: AggregatedItem) -> str: - """Return the vector text for ``item``.""" - - parts = [ - _format_primary_title(item), - item.plex.summary or "", - item.tmdb.overview if item.tmdb and hasattr(item.tmdb, "overview") else "", - item.imdb.plot if item.imdb else "", - ] - directors_text = ", ".join(p.tag for p in item.plex.directors if p.tag) - writers_text = ", ".join(p.tag for p in item.plex.writers if p.tag) - actors_text = ", ".join(p.tag for p in item.plex.actors if p.tag) - if directors_text: - parts.append(f"Directed by {directors_text}") - if writers_text: - parts.append(f"Written by {writers_text}") - if actors_text: - parts.append(f"Starring {actors_text}") - if item.plex.tagline: - parts.append(item.plex.tagline) - if item.tmdb and hasattr(item.tmdb, "tagline"): - tagline = getattr(item.tmdb, "tagline", None) - if tagline: - parts.append(tagline) - if item.tmdb and hasattr(item.tmdb, "reviews"): - parts.extend(r.get("content", "") for r in getattr(item.tmdb, "reviews", [])) - return "\n".join(p for p in parts if p) - - -def _build_point_payload(item: AggregatedItem) -> dict[str, object]: - """Construct the Qdrant payload for ``item``.""" - - payload: dict[str, object] = { - "data": item.model_dump(mode="json"), - "title": item.plex.title, - "type": item.plex.type, - } - if item.plex.type == "episode": - if item.plex.show_title: - payload["show_title"] = item.plex.show_title - if item.plex.season_title: - payload["season_title"] = item.plex.season_title - if item.plex.season_number is not None: - payload["season_number"] = item.plex.season_number - if item.plex.episode_number is not None: - payload["episode_number"] = item.plex.episode_number - if item.plex.actors: - payload["actors"] = [p.tag for p in item.plex.actors if p.tag] - if item.plex.directors: - payload["directors"] = [p.tag for p in item.plex.directors if p.tag] - if item.plex.writers: - payload["writers"] = [p.tag for p in item.plex.writers if p.tag] - if item.plex.genres: - payload["genres"] = item.plex.genres - if item.plex.collections: - payload["collections"] = item.plex.collections - summary = item.plex.summary - if summary: - payload["summary"] = summary - overview = getattr(item.tmdb, "overview", None) if item.tmdb else None - if overview: - payload["overview"] = overview - plot = item.imdb.plot if item.imdb else None - if plot: - payload["plot"] = plot - taglines = [item.plex.tagline] - if item.tmdb and hasattr(item.tmdb, "tagline"): - taglines.append(getattr(item.tmdb, "tagline", None)) - taglines = [t for t in taglines if t] - if taglines: - payload["tagline"] = "\n".join(dict.fromkeys(taglines)) - if item.tmdb and hasattr(item.tmdb, "reviews"): - review_texts = [r.get("content", "") for r in getattr(item.tmdb, "reviews", [])] - review_texts = [r for r in review_texts if r] - if review_texts: - payload["reviews"] = review_texts - if item.plex.year is not None: - payload["year"] = item.plex.year - if item.plex.added_at is not None: - added = item.plex.added_at - if hasattr(added, "timestamp"): - payload["added_at"] = int(added.timestamp()) - return payload - - -def build_point( - item: AggregatedItem, - dense_model_name: str, - sparse_model_name: str, -) -> models.PointStruct: - """Build a Qdrant point for ``item`` using the configured model names.""" - - text = _build_point_text(item) - payload = _build_point_payload(item) - point_id: int | str = ( - int(item.plex.rating_key) - if item.plex.rating_key.isdigit() - else item.plex.rating_key - ) - return models.PointStruct( - id=point_id, - vector={ - "dense": models.Document(text=text, model=dense_model_name), - "sparse": models.Document(text=text, model=sparse_model_name), - }, - payload=payload, - ) - async def _iter_from_plex( server: PlexServer, tmdb_api_key: str, *, batch_size: int = 50 @@ -896,124 +247,91 @@ async def _load_from_plex( return [item async for item in _iter_from_plex(server, tmdb_api_key, batch_size=batch_size)] +async def _fetch_tmdb_movie( + client: httpx.AsyncClient, tmdb_id: str, api_key: str +) -> Optional[TMDBMovie]: + return await _fetch_tmdb_movie_impl(client, tmdb_id, api_key, logger) + + +async def _fetch_tmdb_show( + client: httpx.AsyncClient, tmdb_id: str, api_key: str +) -> Optional[TMDBShow]: + return await _fetch_tmdb_show_impl(client, tmdb_id, api_key, logger) + + +async def _fetch_tmdb_episode( + client: httpx.AsyncClient, + show_id: int, + season_number: int, + episode_number: int, + api_key: str, +) -> Optional[TMDBEpisode]: + return await _fetch_tmdb_episode_impl( + client, show_id, season_number, episode_number, api_key, logger + ) + def _load_from_sample(sample_dir: Path) -> List[AggregatedItem]: - """Load items from local sample JSON files.""" - - results: List[AggregatedItem] = [] - movie_dir = sample_dir / "movie" - episode_dir = sample_dir / "episode" - - # Movie sample - with (movie_dir / "plex.json").open("r", encoding="utf-8") as f: - movie_data = json.load(f)["MediaContainer"]["Metadata"][0] - plex_movie = PlexItem( - rating_key=str(movie_data.get("ratingKey", "")), - guid=str(movie_data.get("guid", "")), - type=movie_data.get("type", "movie"), - title=movie_data.get("title", ""), - summary=movie_data.get("summary"), - year=movie_data.get("year"), - added_at=movie_data.get("addedAt"), - guids=[PlexGuid(id=g["id"]) for g in movie_data.get("Guid", [])], - thumb=movie_data.get("thumb"), - art=movie_data.get("art"), - tagline=movie_data.get("tagline"), - content_rating=movie_data.get("contentRating"), - directors=[ - PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) - for d in movie_data.get("Director", []) - ], - writers=[ - PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) - for w in movie_data.get("Writer", []) - ], - actors=[ - PlexPerson( - id=a.get("id", 0), - tag=a.get("tag", ""), - role=a.get("role"), - thumb=a.get("thumb"), - ) - for a in movie_data.get("Role", []) - ], - genres=[g.get("tag", "") for g in movie_data.get("Genre", []) if g.get("tag")], - collections=[ - c.get("tag", "") - for key in ("Collection", "Collections") - for c in movie_data.get(key, []) or [] - if c.get("tag") - ], + return _load_from_sample_impl(sample_dir) + + +async def _ensure_collection( + client: AsyncQdrantClient, + collection_name: str, + *, + dense_size: int, + dense_distance: models.Distance, + logger_override: logging.Logger | None = None, +) -> None: + await _ensure_collection_impl( + client, + collection_name, + dense_size=dense_size, + dense_distance=dense_distance, + logger=logger_override or logger, ) - with (movie_dir / "imdb.json").open("r", encoding="utf-8") as f: - imdb_movie = IMDbTitle.model_validate(json.load(f)) - with (movie_dir / "tmdb.json").open("r", encoding="utf-8") as f: - tmdb_movie = TMDBMovie.model_validate(json.load(f)) - results.append(AggregatedItem(plex=plex_movie, imdb=imdb_movie, tmdb=tmdb_movie)) - - # Episode sample - with (episode_dir / "plex.tv.json").open("r", encoding="utf-8") as f: - episode_data = json.load(f)["MediaContainer"]["Metadata"][0] - plex_episode = PlexItem( - rating_key=str(episode_data.get("ratingKey", "")), - guid=str(episode_data.get("guid", "")), - type=episode_data.get("type", "episode"), - title=episode_data.get("title", ""), - show_title=episode_data.get("grandparentTitle"), - season_title=episode_data.get("parentTitle"), - season_number=episode_data.get("parentIndex"), - episode_number=episode_data.get("index"), - summary=episode_data.get("summary"), - year=episode_data.get("year"), - added_at=episode_data.get("addedAt"), - guids=[PlexGuid(id=g["id"]) for g in episode_data.get("Guid", [])], - thumb=episode_data.get("thumb"), - art=episode_data.get("art"), - tagline=episode_data.get("tagline"), - content_rating=episode_data.get("contentRating"), - directors=[ - PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) - for d in episode_data.get("Director", []) - ], - writers=[ - PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) - for w in episode_data.get("Writer", []) - ], - actors=[ - PlexPerson( - id=a.get("id", 0), - tag=a.get("tag", ""), - role=a.get("role"), - thumb=a.get("thumb"), - ) - for a in episode_data.get("Role", []) - ], - genres=[g.get("tag", "") for g in episode_data.get("Genre", []) if g.get("tag")], - collections=[ - c.get("tag", "") - for key in ("Collection", "Collections") - for c in episode_data.get(key, []) or [] - if c.get("tag") - ], + + +async def _upsert_in_batches( + client: AsyncQdrantClient, + collection_name: str, + points: Sequence[models.PointStruct], + *, + batch_size: int | None = None, + retry_queue: asyncio.Queue[list[models.PointStruct]] | None = None, + logger_override: logging.Logger | None = None, +) -> None: + await _upsert_in_batches_impl( + client, + collection_name, + points, + batch_size=batch_size if batch_size is not None else _qdrant_batch_size, + retry_queue=retry_queue, + logger=logger_override or logger, ) - with (episode_dir / "imdb.tv.json").open("r", encoding="utf-8") as f: - imdb_episode = IMDbTitle.model_validate(json.load(f)) - with (episode_dir / "tmdb.tv.json").open("r", encoding="utf-8") as f: - tmdb_show = TMDBShow.model_validate(json.load(f)) - results.append(AggregatedItem(plex=plex_episode, imdb=imdb_episode, tmdb=tmdb_show)) - return results +async def _process_qdrant_retry_queue( + client: AsyncQdrantClient, + collection_name: str, + retry_queue: asyncio.Queue[list[models.PointStruct]], +) -> None: + await _process_qdrant_retry_queue_impl( + client, + collection_name, + retry_queue, + logger, + max_attempts=_qdrant_retry_attempts, + backoff=_qdrant_retry_backoff, + ) -async def _iter_from_sample(sample_dir: Path) -> AsyncIterator[AggregatedItem]: - """Yield sample data items for streaming pipelines.""" - for item in _load_from_sample(sample_dir): - yield item +def _resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]: + return _resolve_dense_model_params_impl(model_name) class LoaderPipeline: - """Coordinate ingestion, enrichment, and Qdrant upserts.""" + """Coordinate ingestion, enrichment, and Qdrant storage.""" def __init__( self, @@ -1057,7 +375,7 @@ def __init__( if self._sample_items is None and not self._tmdb_api_key: raise RuntimeError("TMDB API key required for live ingestion") - self._ingest_queue: asyncio.Queue[_IngestBatch | None] = asyncio.Queue( + self._ingest_queue: asyncio.Queue[IngestBatch | None] = asyncio.Queue( maxsize=self._enrichment_workers * 2 ) self._points_queue: asyncio.Queue[list[models.PointStruct] | None] = ( @@ -1065,45 +383,110 @@ def __init__( ) self._upsert_capacity = asyncio.Semaphore(self._max_concurrent_upserts) self._items: list[AggregatedItem] = [] - self._qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = ( - asyncio.Queue() - ) - self._show_tmdb_cache: dict[str, TMDBShow | None] = {} - - self._ingested_count = 0 - self._enriched_count = 0 - self._upserted_points = 0 - now = time.perf_counter() - self._ingest_start = now - self._enrich_start = now - self._upsert_start = now + self._storage_task: StorageTask | None = None + self._enrichment_task: EnrichmentTask | None = None + self._dense_params: tuple[int, models.Distance] | None = None + self._collection_ensured = False @property def qdrant_retry_queue(self) -> asyncio.Queue[list[models.PointStruct]]: - """Expose the Qdrant retry queue for post-processing.""" - - return self._qdrant_retry_queue + if self._storage_task is None: + raise RuntimeError("Pipeline has not been executed") + return self._storage_task.retry_queue @property def items(self) -> list[AggregatedItem]: - """Return the aggregated items processed by the pipeline.""" - return self._items - async def execute(self) -> None: - """Run the full ingestion/enrichment/upsert pipeline.""" + async def ensure_collection(self) -> None: + if self._dense_params is None: + self._dense_params = _resolve_dense_model_params(self._dense_model_name) + dense_size, dense_distance = self._dense_params + if not hasattr(self._client, "collection_exists"): + self._collection_ensured = True + return + await _ensure_collection( + self._client, + self._collection_name, + dense_size=dense_size, + dense_distance=dense_distance, + ) + self._collection_ensured = True + def _log_progress(self, stage: str, count: int, start: float, queue_size: int) -> None: + elapsed = time.perf_counter() - start + rate = count / elapsed if elapsed > 0 else 0.0 + logger.info( + "%s processed %d items (%.2f items/sec, queue size=%d)", + stage, + count, + rate, + queue_size, + ) + + async def execute(self) -> None: async with httpx.AsyncClient(timeout=30) as client: - self._http_client = client - ingest_task = asyncio.create_task(self._ingest()) - enrichment_tasks = [ - asyncio.create_task(self._enrichment_worker(worker_id)) - for worker_id in range(self._enrichment_workers) - ] - upsert_tasks = [ - asyncio.create_task(self._upsert_worker(worker_id)) - for worker_id in range(self._max_concurrent_upserts) - ] + imdb_queue = _imdb_retry_queue or IMDbRetryQueue() + if _imdb_retry_queue is None: + globals()['_imdb_retry_queue'] = imdb_queue + ingestion = IngestionTask( + self._ingest_queue, + sample_items=self._sample_items, + plex_server=self._server, + plex_chunk_size=self._plex_chunk_size, + enrichment_batch_size=self._enrichment_batch_size, + enrichment_workers=self._enrichment_workers, + log_progress=self._log_progress, + ) + enrichment = EnrichmentTask( + self._ingest_queue, + self._points_queue, + http_client=client, + tmdb_api_key=self._tmdb_api_key, + imdb_cache=_imdb_cache, + imdb_retry_queue=imdb_queue, + imdb_batch_limit=_imdb_batch_limit, + imdb_max_retries=_imdb_max_retries, + imdb_backoff=_imdb_backoff, + dense_model_name=self._dense_model_name, + sparse_model_name=self._sparse_model_name, + enrichment_batch_size=self._enrichment_batch_size, + worker_count=self._enrichment_workers, + upsert_buffer_size=self._upsert_buffer_size, + upsert_capacity=self._upsert_capacity, + log_progress=self._log_progress, + logger=logger, + ) + if self._dense_params is None: + self._dense_params = _resolve_dense_model_params( + self._dense_model_name + ) + dense_size, dense_distance = self._dense_params + storage = StorageTask( + self._points_queue, + client=self._client, + collection_name=self._collection_name, + dense_size=dense_size, + dense_distance=dense_distance, + upsert_batch_size=_qdrant_batch_size, + worker_count=self._max_concurrent_upserts, + upsert_capacity=self._upsert_capacity, + log_progress=self._log_progress, + logger=logger, + retry_attempts=_qdrant_retry_attempts, + retry_backoff=_qdrant_retry_backoff, + ensure_collection_fn=_ensure_collection, + upsert_fn=_upsert_in_batches, + ) + if not self._collection_ensured: + if hasattr(self._client, "collection_exists"): + await storage.ensure_collection() + self._collection_ensured = True + + ingest_task = asyncio.create_task(ingestion.run()) + enrichment_tasks = enrichment.start_workers() + storage_tasks = storage.start_workers() + error: BaseException | None = None try: await ingest_task @@ -1115,10 +498,10 @@ async def execute(self) -> None: finally: for _ in range(self._max_concurrent_upserts): await self._points_queue.put(None) - upsert_results = await asyncio.gather( - *upsert_tasks, return_exceptions=True + storage_results = await asyncio.gather( + *storage_tasks, return_exceptions=True ) - for result in upsert_results: + for result in storage_results: if isinstance(result, BaseException) and not isinstance( result, asyncio.CancelledError ): @@ -1135,302 +518,16 @@ async def execute(self) -> None: if error is not None: raise error - def _log_progress( - self, stage: str, count: int, start: float, queue_size: int - ) -> None: - elapsed = time.perf_counter() - start - rate = count / elapsed if elapsed > 0 else 0.0 - logger.info( - "%s processed %d items (%.2f items/sec, queue size=%d)", - stage, - count, - rate, - queue_size, - ) - - async def _ingest(self) -> None: - start = time.perf_counter() - self._ingest_start = start - try: - if self._sample_items is not None: - await self._ingest_sample() - else: - await self._ingest_from_plex() - finally: - for _ in range(self._enrichment_workers): - await self._ingest_queue.put(None) - self._log_progress("Ingestion", self._ingested_count, start, self._ingest_queue.qsize()) - - async def _ingest_sample(self) -> None: - for chunk in _chunk_sequence(self._sample_items or [], self._enrichment_batch_size): - batch = _SampleBatch(items=list(chunk)) - if not batch.items: - continue - await self._ingest_queue.put(batch) - self._ingested_count += len(batch.items) - self._log_progress( - "Ingestion", - self._ingested_count, - self._ingest_start, - self._ingest_queue.qsize(), - ) + self._items = enrichment.items + self._storage_task = storage + self._enrichment_task = enrichment - async def _ingest_from_plex(self) -> None: - if self._server is None: - raise RuntimeError("Plex server unavailable for ingestion") - movie_section = self._server.library.section("Movies") - movie_keys = [int(m.ratingKey) for m in movie_section.all()] - for key_chunk in _chunk_sequence(movie_keys, self._plex_chunk_size): - key_list = list(key_chunk) - movies = list(self._server.fetchItems(key_list)) if key_list else [] - if not movies: - continue - await self._ingest_queue.put(_MovieBatch(movies=movies)) - self._ingested_count += len(movies) - self._log_progress( - "Ingestion", - self._ingested_count, - self._ingest_start, - self._ingest_queue.qsize(), - ) - show_section = self._server.library.section("TV Shows") - show_keys = [int(s.ratingKey) for s in show_section.all()] - for show_chunk in _chunk_sequence(show_keys, self._plex_chunk_size): - shows = list(self._server.fetchItems(list(show_chunk))) - for show in shows: - episode_keys = [int(e.ratingKey) for e in show.episodes()] - for episode_chunk in _chunk_sequence(episode_keys, self._plex_chunk_size): - keys = list(episode_chunk) - episodes = list(self._server.fetchItems(keys)) if keys else [] - if not episodes: - continue - await self._ingest_queue.put( - _EpisodeBatch(show=show, episodes=episodes) - ) - self._ingested_count += len(episodes) - self._log_progress( - "Ingestion", - self._ingested_count, - self._ingest_start, - self._ingest_queue.qsize(), - ) - - async def _enrichment_worker(self, worker_id: int) -> None: - while True: - batch = await self._ingest_queue.get() - if batch is None: - self._ingest_queue.task_done() - break - try: - if isinstance(batch, _MovieBatch): - await self._process_movie_batch(batch) - elif isinstance(batch, _EpisodeBatch): - await self._process_episode_batch(batch) - else: - await self._process_sample_batch(batch) - finally: - self._ingest_queue.task_done() - - async def _process_movie_batch(self, batch: _MovieBatch) -> None: - for chunk in _chunk_sequence(batch.movies, self._enrichment_batch_size): - movies = list(chunk) - if not movies: - continue - if self._enriched_count == 0: - self._enrich_start = time.perf_counter() - aggregated = await self._enrich_movies(movies) - self._enriched_count += len(aggregated) - self._log_progress( - "Enrichment", - self._enriched_count, - self._enrich_start, - self._points_queue.qsize(), - ) - await self._emit_points(aggregated) - - async def _process_episode_batch(self, batch: _EpisodeBatch) -> None: - for chunk in _chunk_sequence(batch.episodes, self._enrichment_batch_size): - episodes = list(chunk) - if not episodes: - continue - if self._enriched_count == 0: - self._enrich_start = time.perf_counter() - aggregated = await self._enrich_episodes(batch.show, episodes) - self._enriched_count += len(aggregated) - self._log_progress( - "Enrichment", - self._enriched_count, - self._enrich_start, - self._points_queue.qsize(), - ) - await self._emit_points(aggregated) - - async def _process_sample_batch(self, batch: _SampleBatch) -> None: - for chunk in _chunk_sequence(batch.items, self._enrichment_batch_size): - aggregated = list(chunk) - if not aggregated: - continue - if self._enriched_count == 0: - self._enrich_start = time.perf_counter() - self._enriched_count += len(aggregated) - self._log_progress( - "Enrichment", - self._enriched_count, - self._enrich_start, - self._points_queue.qsize(), - ) - await self._emit_points(aggregated) - - async def _enrich_movies( - self, movies: Sequence[PlexPartialObject] - ) -> list[AggregatedItem]: - movie_ids = [_extract_external_ids(movie) for movie in movies] - imdb_ids = [ids.imdb for ids in movie_ids if ids.imdb] - imdb_map = ( - await _fetch_imdb_batch(self._http_client, imdb_ids) - if imdb_ids - else {} - ) + async def drain_retry_queue(self) -> None: + if self._storage_task is None: + return + await self._storage_task.drain_retry_queue() - api_key = self._tmdb_api_key - tmdb_results: list[TMDBMovie | None] = [] - if api_key: - tmdb_tasks = [ - _fetch_tmdb_movie(self._http_client, ids.tmdb, api_key) - for ids in movie_ids - if ids.tmdb - ] - if tmdb_tasks: - tmdb_results = await asyncio.gather(*tmdb_tasks) - tmdb_iter = iter(tmdb_results) - - aggregated: list[AggregatedItem] = [] - for movie, ids in zip(movies, movie_ids): - tmdb = next(tmdb_iter, None) if ids.tmdb else None - imdb = imdb_map.get(ids.imdb) if ids.imdb else None - aggregated.append( - AggregatedItem( - plex=_build_plex_item(movie), - imdb=imdb, - tmdb=tmdb, - ) - ) - return aggregated - - async def _enrich_episodes( - self, show: PlexPartialObject, episodes: Sequence[PlexPartialObject] - ) -> list[AggregatedItem]: - show_ids = _extract_external_ids(show) - show_tmdb: TMDBShow | None = None - if show_ids.tmdb: - show_tmdb = await self._get_tmdb_show(show_ids.tmdb) - episode_ids = [_extract_external_ids(ep) for ep in episodes] - imdb_ids = [ids.imdb for ids in episode_ids if ids.imdb] - imdb_map = ( - await _fetch_imdb_batch(self._http_client, imdb_ids) - if imdb_ids - else {} - ) - tmdb_results: list[TMDBEpisode | None] = [] - if show_tmdb: - episode_tasks = [ - self._lookup_tmdb_episode(show_tmdb, ep) - for ep in episodes - ] - if episode_tasks: - tmdb_results = await asyncio.gather(*episode_tasks) - tmdb_iter = iter(tmdb_results) - aggregated: list[AggregatedItem] = [] - for ep, ids in zip(episodes, episode_ids): - tmdb_episode = next(tmdb_iter, None) if show_tmdb else None - imdb = imdb_map.get(ids.imdb) if ids.imdb else None - tmdb_item: TMDBItem | None = tmdb_episode or show_tmdb - aggregated.append( - AggregatedItem( - plex=_build_plex_item(ep), - imdb=imdb, - tmdb=tmdb_item, - ) - ) - return aggregated - - async def _get_tmdb_show(self, tmdb_id: str) -> TMDBShow | None: - if tmdb_id in self._show_tmdb_cache: - return self._show_tmdb_cache[tmdb_id] - show = await _fetch_tmdb_show(self._http_client, tmdb_id, self._tmdb_api_key or "") - self._show_tmdb_cache[tmdb_id] = show - return show - - async def _lookup_tmdb_episode( - self, show_tmdb: TMDBShow | None, episode: PlexPartialObject - ) -> TMDBEpisode | None: - if not show_tmdb or not self._tmdb_api_key: - return None - season = resolve_tmdb_season_number(show_tmdb, episode) - ep_num = getattr(episode, "index", None) - if isinstance(ep_num, str) and ep_num.isdigit(): - ep_num = int(ep_num) - if season is None or ep_num is None: - return None - return await _fetch_tmdb_episode( - self._http_client, - show_tmdb.id, - season, - ep_num, - self._tmdb_api_key, - ) - - async def _emit_points(self, aggregated: Sequence[AggregatedItem]) -> None: - if not aggregated: - return - self._items.extend(aggregated) - points = [ - build_point(item, self._dense_model_name, self._sparse_model_name) - for item in aggregated - ] - for chunk in _chunk_sequence(points, self._upsert_buffer_size): - batch = list(chunk) - if not batch: - continue - await self._upsert_capacity.acquire() - try: - await self._points_queue.put(batch) - except BaseException: - self._upsert_capacity.release() - raise - - async def _upsert_worker(self, worker_id: int) -> None: - while True: - batch = await self._points_queue.get() - if batch is None: - self._points_queue.task_done() - break - logger.info( - "Upsert worker %d handling %d points (queue size=%d)", - worker_id, - len(batch), - self._points_queue.qsize(), - ) - try: - if self._upserted_points == 0: - self._upsert_start = time.perf_counter() - await _upsert_in_batches( - self._client, - self._collection_name, - batch, - retry_queue=self._qdrant_retry_queue, - ) - self._upserted_points += len(batch) - self._log_progress( - f"Upsert worker {worker_id}", - self._upserted_points, - self._upsert_start, - self._points_queue.qsize(), - ) - finally: - self._points_queue.task_done() - self._upsert_capacity.release() async def run( plex_url: Optional[str], plex_token: Optional[str], @@ -1454,8 +551,6 @@ async def run( enrichment_batch_size: int = 100, enrichment_workers: int = 4, ) -> None: - """Core execution logic for the CLI.""" - global _imdb_cache, _imdb_max_retries, _imdb_backoff, _imdb_retry_queue _imdb_cache = IMDbCache(imdb_cache_path) if imdb_cache_path else None _imdb_max_retries = imdb_max_retries @@ -1465,14 +560,13 @@ async def run( async with httpx.AsyncClient(timeout=30) as client: await _process_imdb_retry_queue(client) else: - _imdb_retry_queue = _IMDbRetryQueue() + _imdb_retry_queue = IMDbRetryQueue() _require_positive(upsert_buffer_size, name="upsert_buffer_size") _require_positive(plex_chunk_size, name="plex_chunk_size") _require_positive(enrichment_batch_size, name="enrichment_batch_size") _require_positive(enrichment_workers, name="enrichment_workers") - dense_size, dense_distance = _resolve_dense_model_params(dense_model_name) if qdrant_url is None and qdrant_host is None: qdrant_url = ":memory:" client = AsyncQdrantClient( @@ -1485,12 +579,6 @@ async def run( prefer_grpc=qdrant_prefer_grpc, ) collection_name = "media-items" - await _ensure_collection( - client, - collection_name, - dense_size=dense_size, - dense_distance=dense_distance, - ) items: List[AggregatedItem] if sample_dir is not None: @@ -1534,15 +622,14 @@ async def run( max_concurrent_upserts=_qdrant_max_concurrent_upserts, ) + await pipeline.ensure_collection() await pipeline.execute() items = pipeline.items logger.info("Loaded %d items", len(items)) if not items: logger.info("No points to upsert") - await _process_qdrant_retry_queue( - client, collection_name, pipeline.qdrant_retry_queue - ) + await pipeline.drain_retry_queue() if imdb_queue_path: _persist_imdb_retry_queue(imdb_queue_path) @@ -1641,7 +728,7 @@ async def run( type=int, default=_qdrant_upsert_buffer_size, show_default=True, - help="Number of media items to buffer before scheduling an async upsert", + help="Number of media items to buffer before scheduling an async storage write", ) @click.option( "--plex-chunk-size", @@ -1763,8 +850,6 @@ def main( imdb_backoff: float, imdb_queue: Path, ) -> None: - """Entry-point for the ``load-data`` script.""" - asyncio.run( load_media( plex_url, @@ -1819,8 +904,6 @@ async def load_media( enrichment_batch_size: int, enrichment_workers: int, ) -> None: - """Orchestrate one or more runs of :func:`run`.""" - while True: await run( plex_url, @@ -1853,3 +936,34 @@ async def load_media( if __name__ == "__main__": main() + + +__all__ = [ + "LoaderPipeline", + "_build_plex_item", + "_chunk_sequence", + "_close_coroutines", + "_ensure_collection", + "_IMDbRetryQueue", + "_extract_external_ids", + "_fetch_imdb", + "_fetch_imdb_batch", + "_fetch_tmdb_episode", + "_fetch_tmdb_movie", + "_fetch_tmdb_show", + "_gather_in_batches", + "_iter_gather_in_batches", + "_load_from_sample", + "_load_imdb_retry_queue", + "_persist_imdb_retry_queue", + "_process_imdb_retry_queue", + "_process_qdrant_retry_queue", + "_resolve_dense_model_params", + "_require_positive", + "_upsert_in_batches", + "build_point", + "load_media", + "main", + "resolve_tmdb_season_number", + "run", +] diff --git a/mcp_plex/loader/enrichment/__init__.py b/mcp_plex/loader/enrichment/__init__.py new file mode 100644 index 0000000..c0414e4 --- /dev/null +++ b/mcp_plex/loader/enrichment/__init__.py @@ -0,0 +1,204 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from typing import Callable, Sequence + +import httpx +from qdrant_client import models + +from mcp_plex.common.types import AggregatedItem, TMDBShow + +from ..imdb_cache import IMDbCache +from ..ingestion.utils import chunk_sequence +from ..utils import require_positive +from ..storage.utils import build_point +from .types import IMDbRetryQueue +from .utils import ( + enrich_episodes, + enrich_movies, +) + +try: + from plexapi.base import PlexPartialObject +except Exception: # pragma: no cover - fall back when plexapi unavailable + PlexPartialObject = object # type: ignore[assignment] + + +class EnrichmentTask: + """Enrich Plex metadata and emit Qdrant points for storage.""" + + def __init__( + self, + ingest_queue: asyncio.Queue[object | None], + points_queue: asyncio.Queue[list[models.PointStruct] | None], + *, + http_client: httpx.AsyncClient, + tmdb_api_key: str | None, + imdb_cache: IMDbCache | None, + imdb_retry_queue: IMDbRetryQueue, + imdb_batch_limit: int, + imdb_max_retries: int, + imdb_backoff: float, + dense_model_name: str, + sparse_model_name: str, + enrichment_batch_size: int, + worker_count: int, + upsert_buffer_size: int, + upsert_capacity: asyncio.Semaphore, + log_progress: Callable[[str, int, float, int], None], + logger: logging.Logger, + ) -> None: + self._ingest_queue = ingest_queue + self._points_queue = points_queue + self._http_client = http_client + self._tmdb_api_key = tmdb_api_key + self._imdb_cache = imdb_cache + self._imdb_retry_queue = imdb_retry_queue + self._imdb_batch_limit = imdb_batch_limit + self._imdb_max_retries = imdb_max_retries + self._imdb_backoff = imdb_backoff + self._dense_model_name = dense_model_name + self._sparse_model_name = sparse_model_name + self._enrichment_batch_size = require_positive( + enrichment_batch_size, name="enrichment_batch_size" + ) + self._worker_count = require_positive(worker_count, name="worker_count") + self._upsert_buffer_size = require_positive( + upsert_buffer_size, name="upsert_buffer_size" + ) + self._upsert_capacity = upsert_capacity + self._log_progress = log_progress + self._logger = logger + + self._items: list[AggregatedItem] = [] + self._show_tmdb_cache: dict[str, TMDBShow | None] = {} + self._enriched_count = 0 + self._enrich_start = time.perf_counter() + + @property + def items(self) -> list[AggregatedItem]: + return self._items + + def start_workers(self) -> list[asyncio.Task[None]]: + return [ + asyncio.create_task(self._worker(worker_id)) + for worker_id in range(self._worker_count) + ] + + async def _worker(self, worker_id: int) -> None: + while True: + batch = await self._ingest_queue.get() + if batch is None: + self._ingest_queue.task_done() + break + try: + await self._process_batch(batch) + finally: + self._ingest_queue.task_done() + + async def _process_batch(self, batch: object) -> None: + from ..ingestion.types import EpisodeBatch, MovieBatch, SampleBatch + + if isinstance(batch, MovieBatch): + await self._process_movie_batch(batch) + elif isinstance(batch, EpisodeBatch): + await self._process_episode_batch(batch) + elif isinstance(batch, SampleBatch): + await self._process_sample_batch(batch) + else: + raise TypeError(f"Unsupported batch type: {type(batch)!r}") + + async def _process_movie_batch(self, batch: object) -> None: + from ..ingestion.types import MovieBatch + + assert isinstance(batch, MovieBatch) + for chunk in chunk_sequence(batch.movies, self._enrichment_batch_size): + movies = list(chunk) + if not movies: + continue + await self._enrich_movies(movies) + + async def _process_episode_batch(self, batch: object) -> None: + from ..ingestion.types import EpisodeBatch + + assert isinstance(batch, EpisodeBatch) + for chunk in chunk_sequence(batch.episodes, self._enrichment_batch_size): + episodes = list(chunk) + if not episodes: + continue + await self._enrich_episodes(batch.show, episodes) + + async def _process_sample_batch(self, batch: object) -> None: + from ..ingestion.types import SampleBatch + + assert isinstance(batch, SampleBatch) + for chunk in chunk_sequence(batch.items, self._enrichment_batch_size): + aggregated = list(chunk) + if not aggregated: + continue + await self._emit_points(aggregated) + + async def _enrich_movies(self, movies: Sequence[PlexPartialObject]) -> None: + aggregated = await enrich_movies( + self._http_client, + movies, + tmdb_api_key=self._tmdb_api_key, + imdb_cache=self._imdb_cache, + imdb_batch_limit=self._imdb_batch_limit, + imdb_max_retries=self._imdb_max_retries, + imdb_backoff=self._imdb_backoff, + imdb_retry_queue=self._imdb_retry_queue, + logger=self._logger, + ) + await self._emit_points(aggregated) + + async def _enrich_episodes( + self, show: PlexPartialObject, episodes: Sequence[PlexPartialObject] + ) -> None: + aggregated = await enrich_episodes( + self._http_client, + show, + episodes, + tmdb_api_key=self._tmdb_api_key, + imdb_cache=self._imdb_cache, + imdb_batch_limit=self._imdb_batch_limit, + imdb_max_retries=self._imdb_max_retries, + imdb_backoff=self._imdb_backoff, + imdb_retry_queue=self._imdb_retry_queue, + show_tmdb_cache=self._show_tmdb_cache, + logger=self._logger, + ) + await self._emit_points(aggregated) + + async def _emit_points(self, aggregated: Sequence[AggregatedItem]) -> None: + if not aggregated: + return + if self._enriched_count == 0: + self._enrich_start = time.perf_counter() + self._items.extend(aggregated) + self._enriched_count += len(aggregated) + self._log_progress( + "Enrichment", + self._enriched_count, + self._enrich_start, + self._points_queue.qsize(), + ) + points = [ + build_point(item, self._dense_model_name, self._sparse_model_name) + for item in aggregated + ] + for chunk in chunk_sequence(points, self._upsert_buffer_size): + batch = list(chunk) + if not batch: + continue + await self._upsert_capacity.acquire() + try: + await self._points_queue.put(batch) + except BaseException: + self._upsert_capacity.release() + raise + + +__all__ = ["EnrichmentTask", "IMDbRetryQueue"] diff --git a/mcp_plex/loader/enrichment/types.py b/mcp_plex/loader/enrichment/types.py new file mode 100644 index 0000000..c867ac4 --- /dev/null +++ b/mcp_plex/loader/enrichment/types.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import asyncio +from collections import deque +from typing import Iterable + + +class IMDbRetryQueue(asyncio.Queue[str]): + """Queue that tracks items in a deque for safe serialization.""" + + def __init__(self, initial: Iterable[str] | None = None): + super().__init__() + self._items: deque[str] = deque() + if initial: + for imdb_id in initial: + imdb_id_str = str(imdb_id) + super().put_nowait(imdb_id_str) + self._items.append(imdb_id_str) + + def put_nowait(self, item: str) -> None: # type: ignore[override] + super().put_nowait(item) + self._items.append(item) + + def get_nowait(self) -> str: # type: ignore[override] + if not self._items: + raise RuntimeError("Desynchronization: Queue is not empty but self._items is empty.") + try: + item = super().get_nowait() + except asyncio.QueueEmpty: + raise RuntimeError( + "Desynchronization: self._items is not empty but asyncio.Queue is empty." + ) + self._items.popleft() + return item + + def snapshot(self) -> list[str]: + """Return a list of the current queue contents.""" + + return list(self._items) + + +__all__ = ["IMDbRetryQueue"] diff --git a/mcp_plex/loader/enrichment/utils.py b/mcp_plex/loader/enrichment/utils.py new file mode 100644 index 0000000..72681b5 --- /dev/null +++ b/mcp_plex/loader/enrichment/utils.py @@ -0,0 +1,552 @@ +from __future__ import annotations + +import asyncio +import json +import logging +from pathlib import Path +from typing import Awaitable, Callable, Optional, Sequence + +import httpx + +from mcp_plex.common.types import ( + AggregatedItem, + ExternalIDs, + IMDbTitle, + PlexGuid, + PlexItem, + PlexPerson, + TMDBEpisode, + TMDBItem, + TMDBMovie, + TMDBShow, +) + +from ..imdb_cache import IMDbCache +from ..utils import gather_in_batches +from .types import IMDbRetryQueue + +try: + from plexapi.base import PlexPartialObject +except Exception: # pragma: no cover - fall back when plexapi unavailable + PlexPartialObject = object # type: ignore[assignment] + + +async def fetch_imdb( + client: httpx.AsyncClient, + imdb_id: str, + *, + cache: IMDbCache | None, + max_retries: int, + backoff: float, + retry_queue: IMDbRetryQueue | None, + logger: logging.Logger, +) -> Optional[IMDbTitle]: + """Fetch metadata for an IMDb ID with caching and retry logic.""" + + if cache: + cached = cache.get(imdb_id) + if cached: + return IMDbTitle.model_validate(cached) + + url = f"https://api.imdbapi.dev/titles/{imdb_id}" + delay = backoff + for attempt in range(max_retries + 1): + try: + resp = await client.get(url) + except httpx.HTTPError: + logger.exception("HTTP error fetching IMDb ID %s", imdb_id) + return None + if resp.status_code == 429: + if attempt == max_retries: + if retry_queue is not None: + await retry_queue.put(imdb_id) + return None + await asyncio.sleep(delay) + delay *= 2 + continue + if resp.is_success: + data = resp.json() + if cache: + cache.set(imdb_id, data) + return IMDbTitle.model_validate(data) + return None + return None + + +async def fetch_imdb_batch( + client: httpx.AsyncClient, + imdb_ids: Sequence[str], + *, + cache: IMDbCache | None, + batch_limit: int, + max_retries: int, + backoff: float, + retry_queue: IMDbRetryQueue | None, + logger: logging.Logger, +) -> dict[str, Optional[IMDbTitle]]: + """Fetch metadata for multiple IMDb IDs, batching requests.""" + + results: dict[str, Optional[IMDbTitle]] = {} + ids_to_fetch: list[str] = [] + for imdb_id in imdb_ids: + if cache: + cached = cache.get(imdb_id) + if cached: + results[imdb_id] = IMDbTitle.model_validate(cached) + continue + ids_to_fetch.append(imdb_id) + + if not ids_to_fetch: + return results + + url = "https://api.imdbapi.dev/titles:batchGet" + for i in range(0, len(ids_to_fetch), batch_limit): + chunk = ids_to_fetch[i : i + batch_limit] + params = [("titleIds", imdb_id) for imdb_id in chunk] + delay = backoff + for attempt in range(max_retries + 1): + try: + resp = await client.get(url, params=params) + except httpx.HTTPError: + logger.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk)) + for imdb_id in chunk: + results[imdb_id] = None + break + if resp.status_code == 429: + if attempt == max_retries: + if retry_queue is not None: + for imdb_id in chunk: + await retry_queue.put(imdb_id) + for imdb_id in chunk: + results[imdb_id] = None + break + await asyncio.sleep(delay) + delay *= 2 + continue + if resp.is_success: + data = resp.json() + found: set[str] = set() + for title_data in data.get("titles", []): + imdb_title = IMDbTitle.model_validate(title_data) + results[imdb_title.id] = imdb_title + found.add(imdb_title.id) + if cache: + cache.set(imdb_title.id, title_data) + for missing in set(chunk) - found: + results[missing] = None + break + for imdb_id in chunk: + results[imdb_id] = None + break + + return results + + +def load_imdb_retry_queue(path: Path, logger: logging.Logger) -> IMDbRetryQueue: + """Populate the retry queue from a JSON file if it exists.""" + + ids: list[str] = [] + if path.exists(): + try: + data = json.loads(path.read_text()) + if isinstance(data, list): + ids = [str(imdb_id) for imdb_id in data] + else: + logger.warning( + "IMDb retry queue file %s did not contain a list; ignoring its contents", + path, + ) + except Exception: + logger.exception("Failed to load IMDb retry queue from %s", path) + return IMDbRetryQueue(ids) + + +async def process_imdb_retry_queue( + client: httpx.AsyncClient, + queue: IMDbRetryQueue, + *, + cache: IMDbCache | None, + max_retries: int, + backoff: float, + logger: logging.Logger, + fetch_fn: Callable[..., Awaitable[Optional[IMDbTitle]]] | None = None, +) -> None: + """Attempt to fetch queued IMDb IDs, re-queueing failures.""" + + if queue.empty(): + return + size = queue.qsize() + for _ in range(size): + imdb_id = await queue.get() + fetch = fetch_fn or fetch_imdb + title = await fetch( + client, + imdb_id, + cache=cache, + max_retries=max_retries, + backoff=backoff, + retry_queue=queue, + logger=logger, + ) + if title is None: + await queue.put(imdb_id) + + +def persist_imdb_retry_queue(path: Path, queue: IMDbRetryQueue) -> None: + """Persist the retry queue to disk.""" + + path.write_text(json.dumps(queue.snapshot())) + + +async def fetch_tmdb_movie( + client: httpx.AsyncClient, tmdb_id: str, api_key: str, logger: logging.Logger +) -> Optional[TMDBMovie]: + url = ( + f"https://api.themoviedb.org/3/movie/{tmdb_id}?append_to_response=reviews" + ) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception("HTTP error fetching TMDb movie %s", tmdb_id) + return None + if resp.is_success: + return TMDBMovie.model_validate(resp.json()) + return None + + +async def fetch_tmdb_show( + client: httpx.AsyncClient, tmdb_id: str, api_key: str, logger: logging.Logger +) -> Optional[TMDBShow]: + url = ( + f"https://api.themoviedb.org/3/tv/{tmdb_id}?append_to_response=reviews" + ) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception("HTTP error fetching TMDb show %s", tmdb_id) + return None + if resp.is_success: + return TMDBShow.model_validate(resp.json()) + return None + + +async def fetch_tmdb_episode( + client: httpx.AsyncClient, + show_id: int, + season_number: int, + episode_number: int, + api_key: str, + logger: logging.Logger, +) -> Optional[TMDBEpisode]: + """Fetch TMDb data for a TV episode.""" + + url = ( + f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}" + ) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception( + "HTTP error fetching TMDb episode %s S%sE%s", + show_id, + season_number, + episode_number, + ) + return None + if resp.is_success: + return TMDBEpisode.model_validate(resp.json()) + return None + + +def extract_external_ids(item: PlexPartialObject) -> ExternalIDs: + """Extract IMDb and TMDb IDs from a Plex object.""" + + imdb_id: Optional[str] = None + tmdb_id: Optional[str] = None + for guid in getattr(item, "guids", []) or []: + gid = getattr(guid, "id", "") + if gid.startswith("imdb://"): + imdb_id = gid.split("imdb://", 1)[1] + elif gid.startswith("tmdb://"): + tmdb_id = gid.split("tmdb://", 1)[1] + return ExternalIDs(imdb=imdb_id, tmdb=tmdb_id) + + +def build_plex_item(item: PlexPartialObject) -> PlexItem: + """Convert a Plex object into the internal :class:`PlexItem`.""" + + guids = [PlexGuid(id=g.id) for g in getattr(item, "guids", [])] + directors = [ + PlexPerson( + id=getattr(d, "id", 0), + tag=str(getattr(d, "tag", "")), + thumb=getattr(d, "thumb", None), + ) + for d in getattr(item, "directors", []) or [] + ] + writers = [ + PlexPerson( + id=getattr(w, "id", 0), + tag=str(getattr(w, "tag", "")), + thumb=getattr(w, "thumb", None), + ) + for w in getattr(item, "writers", []) or [] + ] + actors = [ + PlexPerson( + id=getattr(a, "id", 0), + tag=str(getattr(a, "tag", "")), + thumb=getattr(a, "thumb", None), + role=getattr(a, "role", None), + ) + for a in getattr(item, "actors", []) or getattr(item, "roles", []) or [] + ] + genres = [ + str(getattr(g, "tag", "")) + for g in getattr(item, "genres", []) or [] + if getattr(g, "tag", None) + ] + collections = [ + str(getattr(c, "tag", "")) + for c in getattr(item, "collections", []) or [] + if getattr(c, "tag", None) + ] + season_number = getattr(item, "parentIndex", None) + if isinstance(season_number, str): + season_number = int(season_number) if season_number.isdigit() else None + episode_number = getattr(item, "index", None) + if isinstance(episode_number, str): + episode_number = int(episode_number) if episode_number.isdigit() else None + + return PlexItem( + rating_key=str(getattr(item, "ratingKey", "")), + guid=str(getattr(item, "guid", "")), + type=str(getattr(item, "type", "")), + title=str(getattr(item, "title", "")), + show_title=getattr(item, "grandparentTitle", None), + season_title=getattr(item, "parentTitle", None), + season_number=season_number, + episode_number=episode_number, + summary=getattr(item, "summary", None), + year=getattr(item, "year", None), + added_at=getattr(item, "addedAt", None), + guids=guids, + thumb=getattr(item, "thumb", None), + art=getattr(item, "art", None), + tagline=getattr(item, "tagline", None), + content_rating=getattr(item, "contentRating", None), + directors=directors, + writers=writers, + actors=actors, + genres=genres, + collections=collections, + ) + + +def resolve_tmdb_season_number( + show_tmdb: Optional[TMDBShow], episode: PlexPartialObject +) -> Optional[int]: + """Map a Plex episode to the appropriate TMDb season number.""" + + parent_index = getattr(episode, "parentIndex", None) + parent_title = getattr(episode, "parentTitle", None) + parent_year = getattr(episode, "parentYear", None) + if parent_year is None: + parent_year = getattr(episode, "year", None) + + seasons = getattr(show_tmdb, "seasons", []) if show_tmdb else [] + + if parent_index is not None: + for season in seasons: + if season.season_number == parent_index: + return season.season_number + + title_norm: Optional[str] = None + if isinstance(parent_title, str): + title_norm = parent_title.lower().lstrip("season ").strip() + for season in seasons: + name_norm = (season.name or "").lower().lstrip("season ").strip() + if name_norm == title_norm: + return season.season_number + + year: Optional[int] = None + if isinstance(parent_year, int): + year = parent_year + elif isinstance(parent_index, int): + year = parent_index + elif title_norm and title_norm.isdigit(): + year = int(title_norm) + + if year is not None: + for season in seasons: + air = getattr(season, "air_date", None) + if isinstance(air, str) and len(air) >= 4 and air[:4].isdigit(): + if int(air[:4]) == year: + return season.season_number + + if isinstance(parent_index, int): + return parent_index + if isinstance(parent_index, str) and parent_index.isdigit(): + return int(parent_index) + if isinstance(parent_title, str) and parent_title.isdigit(): + return int(parent_title) + return None + + +async def enrich_movies( + client: httpx.AsyncClient, + movies: Sequence[PlexPartialObject], + *, + tmdb_api_key: str | None, + imdb_cache: IMDbCache | None, + imdb_batch_limit: int, + imdb_max_retries: int, + imdb_backoff: float, + imdb_retry_queue: IMDbRetryQueue | None, + logger: logging.Logger, +) -> list[AggregatedItem]: + """Enrich Plex movie metadata with IMDb and TMDb details.""" + + movie_ids = [extract_external_ids(movie) for movie in movies] + imdb_ids = [ids.imdb for ids in movie_ids if ids.imdb] + imdb_map = ( + await fetch_imdb_batch( + client, + imdb_ids, + cache=imdb_cache, + batch_limit=imdb_batch_limit, + max_retries=imdb_max_retries, + backoff=imdb_backoff, + retry_queue=imdb_retry_queue, + logger=logger, + ) + if imdb_ids + else {} + ) + + tmdb_results: list[TMDBMovie | None] = [] + if tmdb_api_key: + tmdb_tasks = [ + fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key, logger) + for ids in movie_ids + if ids.tmdb + ] + if tmdb_tasks: + tmdb_results = await gather_in_batches(tmdb_tasks, len(tmdb_tasks)) + tmdb_iter = iter(tmdb_results) + + aggregated: list[AggregatedItem] = [] + for movie, ids in zip(movies, movie_ids): + tmdb = next(tmdb_iter, None) if ids.tmdb else None + imdb = imdb_map.get(ids.imdb) if ids.imdb else None + aggregated.append( + AggregatedItem( + plex=build_plex_item(movie), + imdb=imdb, + tmdb=tmdb, + ) + ) + return aggregated + + +async def enrich_episodes( + client: httpx.AsyncClient, + show: PlexPartialObject, + episodes: Sequence[PlexPartialObject], + *, + tmdb_api_key: str | None, + imdb_cache: IMDbCache | None, + imdb_batch_limit: int, + imdb_max_retries: int, + imdb_backoff: float, + imdb_retry_queue: IMDbRetryQueue | None, + show_tmdb_cache: dict[str, TMDBShow | None], + logger: logging.Logger, +) -> list[AggregatedItem]: + """Enrich Plex episode metadata with IMDb and TMDb details.""" + + show_ids = extract_external_ids(show) + show_tmdb: TMDBShow | None = None + if show_ids.tmdb: + if show_ids.tmdb in show_tmdb_cache: + show_tmdb = show_tmdb_cache[show_ids.tmdb] + elif tmdb_api_key: + show_tmdb = await fetch_tmdb_show( + client, show_ids.tmdb, tmdb_api_key, logger + ) + show_tmdb_cache[show_ids.tmdb] = show_tmdb + + episode_ids = [extract_external_ids(ep) for ep in episodes] + imdb_ids = [ids.imdb for ids in episode_ids if ids.imdb] + imdb_map = ( + await fetch_imdb_batch( + client, + imdb_ids, + cache=imdb_cache, + batch_limit=imdb_batch_limit, + max_retries=imdb_max_retries, + backoff=imdb_backoff, + retry_queue=imdb_retry_queue, + logger=logger, + ) + if imdb_ids + else {} + ) + + tmdb_results: list[TMDBEpisode | None] = [None] * len(episodes) + if show_tmdb and tmdb_api_key: + episode_tasks: list[asyncio.Future[TMDBEpisode | None]] = [] + indices: list[int] = [] + for idx, ep in enumerate(episodes): + season = resolve_tmdb_season_number(show_tmdb, ep) + ep_num = getattr(ep, "index", None) + if isinstance(ep_num, str) and ep_num.isdigit(): + ep_num = int(ep_num) + if season is None or ep_num is None: + continue + indices.append(idx) + episode_tasks.append( + fetch_tmdb_episode( + client, + show_tmdb.id, + season, + ep_num, + tmdb_api_key, + logger, + ) + ) + if episode_tasks: + fetched = await gather_in_batches(episode_tasks, len(episode_tasks)) + for idx, value in zip(indices, fetched): + tmdb_results[idx] = value + + aggregated: list[AggregatedItem] = [] + for ep, ids, tmdb_episode in zip(episodes, episode_ids, tmdb_results): + imdb = imdb_map.get(ids.imdb) if ids.imdb else None + tmdb_item: TMDBItem | None = tmdb_episode or show_tmdb + aggregated.append( + AggregatedItem( + plex=build_plex_item(ep), + imdb=imdb, + tmdb=tmdb_item, + ) + ) + return aggregated + + +__all__ = [ + "IMDbRetryQueue", + "build_plex_item", + "enrich_episodes", + "enrich_movies", + "extract_external_ids", + "fetch_imdb", + "fetch_imdb_batch", + "fetch_tmdb_episode", + "fetch_tmdb_movie", + "fetch_tmdb_show", + "load_imdb_retry_queue", + "persist_imdb_retry_queue", + "process_imdb_retry_queue", + "resolve_tmdb_season_number", +] diff --git a/mcp_plex/loader/ingestion/__init__.py b/mcp_plex/loader/ingestion/__init__.py new file mode 100644 index 0000000..5ef4f16 --- /dev/null +++ b/mcp_plex/loader/ingestion/__init__.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import asyncio +import time +from typing import Callable, Sequence + +from mcp_plex.common.types import AggregatedItem + +from ..utils import require_positive +from .types import EpisodeBatch, IngestBatch, MovieBatch, SampleBatch +from .utils import chunk_sequence + +try: + from plexapi.server import PlexServer +except Exception: # pragma: no cover - fall back when plexapi unavailable + PlexServer = None # type: ignore[assignment] + + +class IngestionTask: + """Fetch Plex data and feed ingestion batches to the enrichment stage.""" + + def __init__( + self, + queue: asyncio.Queue[IngestBatch | None], + *, + sample_items: list[AggregatedItem] | None, + plex_server: PlexServer | None, + plex_chunk_size: int, + enrichment_batch_size: int, + enrichment_workers: int, + log_progress: Callable[[str, int, float, int], None], + ) -> None: + self._queue = queue + self._sample_items = sample_items + self._server = plex_server + self._plex_chunk_size = require_positive(plex_chunk_size, name="plex_chunk_size") + self._enrichment_batch_size = require_positive( + enrichment_batch_size, name="enrichment_batch_size" + ) + self._worker_count = require_positive( + enrichment_workers, name="enrichment_workers" + ) + self._log_progress = log_progress + self._count = 0 + self._start = time.perf_counter() + + @property + def count(self) -> int: + return self._count + + @property + def start_time(self) -> float: + return self._start + + async def run(self) -> None: + self._start = time.perf_counter() + try: + if self._sample_items is not None: + await self._ingest_sample(self._sample_items) + else: + await self._ingest_from_plex() + finally: + for _ in range(self._worker_count): + await self._queue.put(None) + self._log_progress("Ingestion", self._count, self._start, self._queue.qsize()) + + async def _ingest_sample(self, items: Sequence[AggregatedItem]) -> None: + for chunk in chunk_sequence(items, self._enrichment_batch_size): + batch = SampleBatch(items=list(chunk)) + if not batch.items: + continue + await self._queue.put(batch) + self._count += len(batch.items) + self._log_progress( + "Ingestion", + self._count, + self._start, + self._queue.qsize(), + ) + + async def _ingest_from_plex(self) -> None: + if self._server is None: + raise RuntimeError("Plex server unavailable for ingestion") + movie_section = self._server.library.section("Movies") + movie_keys = [int(m.ratingKey) for m in movie_section.all()] + for key_chunk in chunk_sequence(movie_keys, self._plex_chunk_size): + key_list = list(key_chunk) + movies = list(self._server.fetchItems(key_list)) if key_list else [] + if not movies: + continue + await self._queue.put(MovieBatch(movies=movies)) + self._count += len(movies) + self._log_progress( + "Ingestion", + self._count, + self._start, + self._queue.qsize(), + ) + show_section = self._server.library.section("TV Shows") + show_keys = [int(s.ratingKey) for s in show_section.all()] + for show_chunk in chunk_sequence(show_keys, self._plex_chunk_size): + shows = list(self._server.fetchItems(list(show_chunk))) + for show in shows: + episode_keys = [int(e.ratingKey) for e in show.episodes()] + for episode_chunk in chunk_sequence( + episode_keys, self._plex_chunk_size + ): + keys = list(episode_chunk) + episodes = list(self._server.fetchItems(keys)) if keys else [] + if not episodes: + continue + await self._queue.put(EpisodeBatch(show=show, episodes=episodes)) + self._count += len(episodes) + self._log_progress( + "Ingestion", + self._count, + self._start, + self._queue.qsize(), + ) + + +__all__ = ["IngestionTask", "EpisodeBatch", "IngestBatch", "MovieBatch", "SampleBatch"] diff --git a/mcp_plex/loader/ingestion/types.py b/mcp_plex/loader/ingestion/types.py new file mode 100644 index 0000000..04443ac --- /dev/null +++ b/mcp_plex/loader/ingestion/types.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Union + +try: + from plexapi.base import PlexPartialObject +except Exception: # pragma: no cover - fall back when plexapi unavailable + PlexPartialObject = object # type: ignore[assignment] + +from mcp_plex.common.types import AggregatedItem + + +@dataclass(slots=True) +class MovieBatch: + """Batch of Plex movie items pending metadata enrichment.""" + + movies: list[PlexPartialObject] + + +@dataclass(slots=True) +class EpisodeBatch: + """Batch of Plex episodes along with their parent show.""" + + show: PlexPartialObject + episodes: list[PlexPartialObject] + + +@dataclass(slots=True) +class SampleBatch: + """Batch of pre-enriched items used by sample mode.""" + + items: list[AggregatedItem] + + +IngestBatch = Union[MovieBatch, EpisodeBatch, SampleBatch] + +__all__ = [ + "EpisodeBatch", + "IngestBatch", + "MovieBatch", + "SampleBatch", +] diff --git a/mcp_plex/loader/ingestion/utils.py b/mcp_plex/loader/ingestion/utils.py new file mode 100644 index 0000000..cb263ca --- /dev/null +++ b/mcp_plex/loader/ingestion/utils.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Iterable, Sequence, TypeVar + +from mcp_plex.common.types import ( + AggregatedItem, + IMDbTitle, + PlexGuid, + PlexItem, + PlexPerson, + TMDBMovie, + TMDBShow, +) + +from ..utils import require_positive + +T = TypeVar("T") + + +def chunk_sequence(items: Sequence[T], size: int) -> Iterable[Sequence[T]]: + """Yield ``items`` in chunks of at most ``size`` elements.""" + + size = require_positive(int(size), name="size") + for start in range(0, len(items), size): + yield items[start : start + size] + + +def load_from_sample(sample_dir: Path) -> list[AggregatedItem]: + """Load items from local sample JSON files.""" + + results: list[AggregatedItem] = [] + movie_dir = sample_dir / "movie" + episode_dir = sample_dir / "episode" + + with (movie_dir / "plex.json").open("r", encoding="utf-8") as f: + movie_data = json.load(f)["MediaContainer"]["Metadata"][0] + plex_movie = PlexItem( + rating_key=str(movie_data.get("ratingKey", "")), + guid=str(movie_data.get("guid", "")), + type=movie_data.get("type", "movie"), + title=movie_data.get("title", ""), + summary=movie_data.get("summary"), + year=movie_data.get("year"), + added_at=movie_data.get("addedAt"), + guids=[PlexGuid(id=g["id"]) for g in movie_data.get("Guid", [])], + thumb=movie_data.get("thumb"), + art=movie_data.get("art"), + tagline=movie_data.get("tagline"), + content_rating=movie_data.get("contentRating"), + directors=[ + PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) + for d in movie_data.get("Director", []) + ], + writers=[ + PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) + for w in movie_data.get("Writer", []) + ], + actors=[ + PlexPerson( + id=a.get("id", 0), + tag=a.get("tag", ""), + role=a.get("role"), + thumb=a.get("thumb"), + ) + for a in movie_data.get("Role", []) + ], + genres=[g.get("tag", "") for g in movie_data.get("Genre", []) if g.get("tag")], + collections=[ + c.get("tag", "") + for key in ("Collection", "Collections") + for c in movie_data.get(key, []) or [] + if c.get("tag") + ], + ) + with (movie_dir / "imdb.json").open("r", encoding="utf-8") as f: + imdb_movie = IMDbTitle.model_validate(json.load(f)) + with (movie_dir / "tmdb.json").open("r", encoding="utf-8") as f: + tmdb_movie = TMDBMovie.model_validate(json.load(f)) + results.append(AggregatedItem(plex=plex_movie, imdb=imdb_movie, tmdb=tmdb_movie)) + + with (episode_dir / "plex.tv.json").open("r", encoding="utf-8") as f: + episode_data = json.load(f)["MediaContainer"]["Metadata"][0] + plex_episode = PlexItem( + rating_key=str(episode_data.get("ratingKey", "")), + guid=str(episode_data.get("guid", "")), + type=episode_data.get("type", "episode"), + title=episode_data.get("title", ""), + show_title=episode_data.get("grandparentTitle"), + season_title=episode_data.get("parentTitle"), + season_number=episode_data.get("parentIndex"), + episode_number=episode_data.get("index"), + summary=episode_data.get("summary"), + year=episode_data.get("year"), + added_at=episode_data.get("addedAt"), + guids=[PlexGuid(id=g["id"]) for g in episode_data.get("Guid", [])], + thumb=episode_data.get("thumb"), + art=episode_data.get("art"), + tagline=episode_data.get("tagline"), + content_rating=episode_data.get("contentRating"), + directors=[ + PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) + for d in episode_data.get("Director", []) + ], + writers=[ + PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) + for w in episode_data.get("Writer", []) + ], + actors=[ + PlexPerson( + id=a.get("id", 0), + tag=a.get("tag", ""), + role=a.get("role"), + thumb=a.get("thumb"), + ) + for a in episode_data.get("Role", []) + ], + genres=[g.get("tag", "") for g in episode_data.get("Genre", []) if g.get("tag")], + collections=[ + c.get("tag", "") + for key in ("Collection", "Collections") + for c in episode_data.get(key, []) or [] + if c.get("tag") + ], + ) + with (episode_dir / "imdb.tv.json").open("r", encoding="utf-8") as f: + imdb_episode = IMDbTitle.model_validate(json.load(f)) + with (episode_dir / "tmdb.tv.json").open("r", encoding="utf-8") as f: + tmdb_show = TMDBShow.model_validate(json.load(f)) + results.append(AggregatedItem(plex=plex_episode, imdb=imdb_episode, tmdb=tmdb_show)) + + return results + + +__all__ = ["chunk_sequence", "load_from_sample"] diff --git a/mcp_plex/loader/storage/__init__.py b/mcp_plex/loader/storage/__init__.py new file mode 100644 index 0000000..55a8c5a --- /dev/null +++ b/mcp_plex/loader/storage/__init__.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from typing import Awaitable, Callable + +from qdrant_client import models +from qdrant_client.async_qdrant_client import AsyncQdrantClient + +from ..utils import require_positive +from .types import StorageBatch +from .utils import ( + build_point, + ensure_collection, + process_qdrant_retry_queue, + upsert_in_batches, +) + + +class StorageTask: + """Persist enriched points into Qdrant.""" + + def __init__( + self, + points_queue: asyncio.Queue[StorageBatch | None], + *, + client: AsyncQdrantClient, + collection_name: str, + dense_size: int, + dense_distance: models.Distance, + upsert_batch_size: int, + worker_count: int, + upsert_capacity: asyncio.Semaphore, + log_progress: Callable[[str, int, float, int], None], + logger: logging.Logger, + retry_attempts: int, + retry_backoff: float, + ensure_collection_fn: Callable[..., Awaitable[None]] = ensure_collection, + upsert_fn: Callable[..., Awaitable[None]] = upsert_in_batches, + ) -> None: + self._points_queue = points_queue + self._client = client + self._collection_name = collection_name + self._upsert_batch_size = require_positive(upsert_batch_size, name="upsert_batch_size") + self._worker_count = require_positive(worker_count, name="worker_count") + self._upsert_capacity = upsert_capacity + self._log_progress = log_progress + self._logger = logger + self._retry_attempts = retry_attempts + self._retry_backoff = retry_backoff + self._ensure_collection_fn = ensure_collection_fn + self._upsert_fn = upsert_fn + + self._upserted_points = 0 + self._upsert_start = time.perf_counter() + self._retry_queue: asyncio.Queue[list[models.PointStruct]] = asyncio.Queue() + self._dense_size = dense_size + self._dense_distance = dense_distance + + @property + def retry_queue(self) -> asyncio.Queue[list[models.PointStruct]]: + return self._retry_queue + + async def ensure_collection(self) -> None: + await self._ensure_collection_fn( + self._client, + self._collection_name, + dense_size=self._dense_size, + dense_distance=self._dense_distance, + logger_override=self._logger, + ) + + def start_workers(self) -> list[asyncio.Task[None]]: + return [ + asyncio.create_task(self._worker(worker_id)) + for worker_id in range(self._worker_count) + ] + + async def _worker(self, worker_id: int) -> None: + while True: + batch = await self._points_queue.get() + if batch is None: + self._points_queue.task_done() + break + self._logger.info( + "Storage worker %d handling %d points (queue size=%d)", + worker_id, + len(batch), + self._points_queue.qsize(), + ) + try: + await self._upsert_fn( + self._client, + self._collection_name, + batch, + batch_size=self._upsert_batch_size, + retry_queue=self._retry_queue, + logger_override=self._logger, + ) + if self._upserted_points == 0: + self._upsert_start = time.perf_counter() + self._upserted_points += len(batch) + self._log_progress( + f"Storage worker {worker_id}", + self._upserted_points, + self._upsert_start, + self._points_queue.qsize(), + ) + finally: + self._points_queue.task_done() + self._upsert_capacity.release() + + async def drain_retry_queue(self) -> None: + await process_qdrant_retry_queue( + self._client, + self._collection_name, + self._retry_queue, + self._logger, + max_attempts=self._retry_attempts, + backoff=self._retry_backoff, + ) + + +__all__ = ["StorageTask", "build_point", "ensure_collection", "process_qdrant_retry_queue"] diff --git a/mcp_plex/loader/storage/types.py b/mcp_plex/loader/storage/types.py new file mode 100644 index 0000000..a59b0ed --- /dev/null +++ b/mcp_plex/loader/storage/types.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from typing import List + +from qdrant_client import models + +StorageBatch = List[models.PointStruct] + +__all__ = ["StorageBatch"] diff --git a/mcp_plex/loader/storage/utils.py b/mcp_plex/loader/storage/utils.py new file mode 100644 index 0000000..8b31c66 --- /dev/null +++ b/mcp_plex/loader/storage/utils.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import asyncio +import logging +import warnings +from typing import Sequence + +from qdrant_client import models +from qdrant_client.async_qdrant_client import AsyncQdrantClient + +from mcp_plex.common.types import AggregatedItem + +from ..utils import is_local_qdrant + + +def format_primary_title(item: AggregatedItem) -> str: + """Format the primary title text for ``item``.""" + + primary_title = item.plex.title + if item.plex.type == "episode": + title_bits: list[str] = [] + if item.plex.show_title: + title_bits.append(item.plex.show_title) + se_parts: list[str] = [] + if item.plex.season_number is not None: + se_parts.append(f"S{item.plex.season_number:02d}") + if item.plex.episode_number is not None: + se_parts.append(f"E{item.plex.episode_number:02d}") + if se_parts: + title_bits.append("".join(se_parts)) + if item.plex.title: + title_bits.append(item.plex.title) + if title_bits: + primary_title = " - ".join(title_bits) + return primary_title + + +def build_point_text(item: AggregatedItem) -> str: + """Return the vector text for ``item``.""" + + parts = [ + format_primary_title(item), + item.plex.summary or "", + item.tmdb.overview if item.tmdb and hasattr(item.tmdb, "overview") else "", + item.imdb.plot if item.imdb else "", + ] + directors_text = ", ".join(p.tag for p in item.plex.directors if p.tag) + writers_text = ", ".join(p.tag for p in item.plex.writers if p.tag) + actors_text = ", ".join(p.tag for p in item.plex.actors if p.tag) + if directors_text: + parts.append(f"Directed by {directors_text}") + if writers_text: + parts.append(f"Written by {writers_text}") + if actors_text: + parts.append(f"Starring {actors_text}") + if item.plex.tagline: + parts.append(item.plex.tagline) + if item.tmdb and hasattr(item.tmdb, "tagline"): + tagline = getattr(item.tmdb, "tagline", None) + if tagline: + parts.append(tagline) + if item.tmdb and hasattr(item.tmdb, "reviews"): + parts.extend(r.get("content", "") for r in getattr(item.tmdb, "reviews", [])) + return "\n".join(p for p in parts if p) + + +def build_point_payload(item: AggregatedItem) -> dict[str, object]: + """Construct the Qdrant payload for ``item``.""" + + payload: dict[str, object] = { + "data": item.model_dump(mode="json"), + "title": item.plex.title, + "type": item.plex.type, + } + if item.plex.type == "episode": + if item.plex.show_title: + payload["show_title"] = item.plex.show_title + if item.plex.season_title: + payload["season_title"] = item.plex.season_title + if item.plex.season_number is not None: + payload["season_number"] = item.plex.season_number + if item.plex.episode_number is not None: + payload["episode_number"] = item.plex.episode_number + if item.plex.actors: + payload["actors"] = [p.tag for p in item.plex.actors if p.tag] + if item.plex.directors: + payload["directors"] = [p.tag for p in item.plex.directors if p.tag] + if item.plex.writers: + payload["writers"] = [p.tag for p in item.plex.writers if p.tag] + if item.plex.genres: + payload["genres"] = item.plex.genres + if item.plex.collections: + payload["collections"] = item.plex.collections + summary = item.plex.summary + if summary: + payload["summary"] = summary + overview = getattr(item.tmdb, "overview", None) if item.tmdb else None + if overview: + payload["overview"] = overview + plot = item.imdb.plot if item.imdb else None + if plot: + payload["plot"] = plot + taglines = [item.plex.tagline] + if item.tmdb and hasattr(item.tmdb, "tagline"): + taglines.append(getattr(item.tmdb, "tagline", None)) + taglines = [t for t in taglines if t] + if taglines: + payload["tagline"] = "\n".join(dict.fromkeys(taglines)) + if item.tmdb and hasattr(item.tmdb, "reviews"): + review_texts = [r.get("content", "") for r in getattr(item.tmdb, "reviews", [])] + review_texts = [r for r in review_texts if r] + if review_texts: + payload["reviews"] = review_texts + if item.plex.year is not None: + payload["year"] = item.plex.year + if item.plex.added_at is not None: + added = item.plex.added_at + if hasattr(added, "timestamp"): + payload["added_at"] = int(added.timestamp()) + return payload + + +def build_point( + item: AggregatedItem, + dense_model_name: str, + sparse_model_name: str, +) -> models.PointStruct: + """Build a Qdrant point for ``item`` using the configured model names.""" + + text = build_point_text(item) + payload = build_point_payload(item) + point_id: int | str = ( + int(item.plex.rating_key) + if item.plex.rating_key.isdigit() + else item.plex.rating_key + ) + return models.PointStruct( + id=point_id, + vector={ + "dense": models.Document(text=text, model=dense_model_name), + "sparse": models.Document(text=text, model=sparse_model_name), + }, + payload=payload, + ) + + +async def upsert_in_batches( + client: AsyncQdrantClient, + collection_name: str, + points: Sequence[models.PointStruct], + *, + batch_size: int, + retry_queue: asyncio.Queue[list[models.PointStruct]] | None = None, + logger: logging.Logger, +) -> None: + """Upsert points into Qdrant in batches, logging HTTP errors.""" + + total = len(points) + for i in range(0, total, batch_size): + batch = points[i : i + batch_size] + try: + await client.upsert(collection_name=collection_name, points=batch) + except Exception: + logger.exception( + "Failed to upsert batch %d-%d", i, i + len(batch) + ) + if retry_queue is not None: + await retry_queue.put(list(batch)) + else: + logger.info( + "Upserted %d/%d points", min(i + len(batch), total), total + ) + + +async def process_qdrant_retry_queue( + client: AsyncQdrantClient, + collection_name: str, + retry_queue: asyncio.Queue[list[models.PointStruct]], + logger: logging.Logger, + *, + max_attempts: int, + backoff: float, +) -> None: + """Retry failed Qdrant batches with exponential backoff.""" + + if retry_queue.empty(): + return + + pending = retry_queue.qsize() + logger.info("Retrying %d failed Qdrant batches", pending) + while not retry_queue.empty(): + batch = await retry_queue.get() + attempt = 1 + while attempt <= max_attempts: + try: + await client.upsert( + collection_name=collection_name, + points=batch, + ) + except Exception: + logger.exception( + "Retry %d/%d failed for Qdrant batch of %d points", + attempt, + max_attempts, + len(batch), + ) + attempt += 1 + if attempt > max_attempts: + logger.error( + "Giving up on Qdrant batch after %d attempts; %d points were not indexed", + max_attempts, + len(batch), + ) + break + await asyncio.sleep(backoff * attempt) + continue + else: + logger.info( + "Successfully retried Qdrant batch of %d points on attempt %d", + len(batch), + attempt, + ) + break + + +async def ensure_collection( + client: AsyncQdrantClient, + collection_name: str, + *, + dense_size: int, + dense_distance: models.Distance, + logger: logging.Logger, +) -> None: + """Create the collection and payload indexes if they do not already exist.""" + + created_collection = False + if not await client.collection_exists(collection_name): + await client.create_collection( + collection_name=collection_name, + vectors_config={"dense": models.VectorParams(size=dense_size, distance=dense_distance)}, + sparse_vectors_config={"sparse": models.SparseVectorParams()}, + ) + created_collection = True + + if not created_collection: + return + + suppress_payload_warning = is_local_qdrant(client) + + async def _create_index( + field_name: str, + field_schema: models.PayloadSchemaType | models.TextIndexParams, + ) -> None: + if suppress_payload_warning: + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="Payload indexes have no effect in the local Qdrant.*", + category=UserWarning, + ) + await client.create_payload_index( + collection_name=collection_name, + field_name=field_name, + field_schema=field_schema, + ) + else: + await client.create_payload_index( + collection_name=collection_name, + field_name=field_name, + field_schema=field_schema, + ) + + text_index = models.TextIndexParams( + type=models.PayloadSchemaType.TEXT, + tokenizer=models.TokenizerType.WORD, + min_token_len=2, + lowercase=True, + ) + for field, schema in ( + ("title", text_index), + ("type", models.PayloadSchemaType.KEYWORD), + ("year", models.PayloadSchemaType.INTEGER), + ("added_at", models.PayloadSchemaType.INTEGER), + ("actors", models.PayloadSchemaType.KEYWORD), + ("directors", models.PayloadSchemaType.KEYWORD), + ("writers", models.PayloadSchemaType.KEYWORD), + ("genres", models.PayloadSchemaType.KEYWORD), + ("show_title", models.PayloadSchemaType.KEYWORD), + ("season_number", models.PayloadSchemaType.INTEGER), + ("episode_number", models.PayloadSchemaType.INTEGER), + ("collections", models.PayloadSchemaType.KEYWORD), + ("summary", text_index), + ("overview", text_index), + ("plot", text_index), + ("tagline", text_index), + ("reviews", text_index), + ("data.plex.rating_key", models.PayloadSchemaType.KEYWORD), + ("data.imdb.id", models.PayloadSchemaType.KEYWORD), + ("data.tmdb.id", models.PayloadSchemaType.INTEGER), + ): + await _create_index(field, schema) + + logger.info("Ensured collection %s exists", collection_name) + + +__all__ = [ + "build_point", + "build_point_payload", + "build_point_text", + "ensure_collection", + "format_primary_title", + "process_qdrant_retry_queue", + "upsert_in_batches", +] diff --git a/mcp_plex/loader/utils.py b/mcp_plex/loader/utils.py new file mode 100644 index 0000000..a9cf62d --- /dev/null +++ b/mcp_plex/loader/utils.py @@ -0,0 +1,89 @@ +"""Shared utilities for the loader package.""" +from __future__ import annotations + +import asyncio +import inspect +import logging +from typing import AsyncIterator, Awaitable, Iterable, List, Sequence, TypeVar + +from qdrant_client import models +from qdrant_client.async_qdrant_client import AsyncQdrantClient + +T = TypeVar("T") + + +_DENSE_MODEL_PARAMS: dict[str, tuple[int, models.Distance]] = { + "BAAI/bge-small-en-v1.5": (384, models.Distance.COSINE), + "BAAI/bge-base-en-v1.5": (768, models.Distance.COSINE), + "BAAI/bge-large-en-v1.5": (1024, models.Distance.COSINE), + "text-embedding-3-small": (1536, models.Distance.COSINE), + "text-embedding-3-large": (3072, models.Distance.COSINE), +} + + +def require_positive(value: int, *, name: str) -> int: + """Return *value* if positive, otherwise raise a ``ValueError``.""" + + if value <= 0: + raise ValueError(f"{name} must be positive") + return value + + +def close_coroutines(tasks: Sequence[Awaitable[object]]) -> None: + """Close coroutine objects to avoid unawaited warnings.""" + + for task in tasks: + if inspect.iscoroutine(task): + task.close() + + +logger = logging.getLogger("mcp_plex.loader") + + +async def iter_gather_in_batches( + tasks: Sequence[Awaitable[T]], batch_size: int +) -> AsyncIterator[T]: + """Yield results from awaitable tasks in fixed-size batches.""" + + try: + require_positive(batch_size, name="batch_size") + except ValueError: + close_coroutines(tasks) + raise + + total = len(tasks) + for i in range(0, total, batch_size): + batch = tasks[i : i + batch_size] + for result in await asyncio.gather(*batch): + yield result + logger.info("Processed %d/%d items", min(i + batch_size, total), total) + + +def resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]: + """Look up Qdrant vector parameters for a known dense embedding model.""" + + try: + return _DENSE_MODEL_PARAMS[model_name] + except KeyError as exc: + raise ValueError( + "Unknown dense embedding model " + f"'{model_name}'. Update _DENSE_MODEL_PARAMS with the model's size " + "and distance." + ) from exc + + +def is_local_qdrant(client: AsyncQdrantClient) -> bool: + """Return ``True`` if *client* targets an in-process Qdrant instance.""" + + inner = getattr(client, "_client", None) + return bool(inner) and inner.__class__.__module__.startswith( + "qdrant_client.local" + ) + + +async def gather_in_batches( + tasks: Sequence[Awaitable[T]], batch_size: int +) -> List[T]: + """Gather awaitable tasks in fixed-size batches.""" + + return [result async for result in iter_gather_in_batches(tasks, batch_size)] diff --git a/pyproject.toml b/pyproject.toml index 59866ba..7f09cb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.60" +version = "0.26.61" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_loader_logging.py b/tests/test_loader_logging.py index e3740e9..a37aa1d 100644 --- a/tests/test_loader_logging.py +++ b/tests/test_loader_logging.py @@ -34,7 +34,7 @@ def test_run_logs_upsert(monkeypatch, caplog): with caplog.at_level(logging.INFO, logger="mcp_plex.loader"): asyncio.run(loader.run(None, None, None, sample_dir, None, None)) assert "Loaded 2 items" in caplog.text - assert "Upsert worker" in caplog.text + assert "Storage worker" in caplog.text assert "handling 2 points" in caplog.text assert "processed 2 items" in caplog.text diff --git a/uv.lock b/uv.lock index d7b7214..5bfd57b 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.60" +version = "0.26.61" source = { editable = "." } dependencies = [ { name = "fastapi" }, From 8015a5d9d7e1c41549eb71e0748693efc9ba4b54 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 08:45:25 -0600 Subject: [PATCH 2/2] Remove unused import --- mcp_plex/loader/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp_plex/loader/utils.py b/mcp_plex/loader/utils.py index a9cf62d..2350184 100644 --- a/mcp_plex/loader/utils.py +++ b/mcp_plex/loader/utils.py @@ -4,7 +4,7 @@ import asyncio import inspect import logging -from typing import AsyncIterator, Awaitable, Iterable, List, Sequence, TypeVar +from typing import AsyncIterator, Awaitable, List, Sequence, TypeVar from qdrant_client import models from qdrant_client.async_qdrant_client import AsyncQdrantClient