From 6c6030bfe3f8709b0152d601d4e773e62b7693d6 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 09:23:46 -0600 Subject: [PATCH] Revert "Refactor loader tasks into modular subpackages (#84)" This reverts commit 9d786a1745dcbb76fc7400d7faee50029f0fdb21. --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/__init__.py | 1472 +++++++++++++++++++----- mcp_plex/loader/enrichment/__init__.py | 204 ---- mcp_plex/loader/enrichment/types.py | 42 - mcp_plex/loader/enrichment/utils.py | 552 --------- mcp_plex/loader/ingestion/__init__.py | 122 -- mcp_plex/loader/ingestion/types.py | 43 - mcp_plex/loader/ingestion/utils.py | 136 --- mcp_plex/loader/storage/__init__.py | 125 -- mcp_plex/loader/storage/types.py | 9 - mcp_plex/loader/storage/utils.py | 314 ----- mcp_plex/loader/utils.py | 89 -- pyproject.toml | 2 +- tests/test_loader_logging.py | 2 +- uv.lock | 2 +- 15 files changed, 1183 insertions(+), 1933 deletions(-) delete mode 100644 mcp_plex/loader/enrichment/__init__.py delete mode 100644 mcp_plex/loader/enrichment/types.py delete mode 100644 mcp_plex/loader/enrichment/utils.py delete mode 100644 mcp_plex/loader/ingestion/__init__.py delete mode 100644 mcp_plex/loader/ingestion/types.py delete mode 100644 mcp_plex/loader/ingestion/utils.py delete mode 100644 mcp_plex/loader/storage/__init__.py delete mode 100644 mcp_plex/loader/storage/types.py delete mode 100644 mcp_plex/loader/storage/utils.py delete mode 100644 mcp_plex/loader/utils.py diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 07e25a2..c98e32b 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "0.26.61" +version = "0.26.60" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index c26c6c1..d88bb19 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -2,13 +2,24 @@ from __future__ import annotations import asyncio +import inspect import json import logging import sys import time import warnings +from collections import deque +from dataclasses import dataclass from pathlib import Path -from typing import AsyncIterator, List, Optional, Sequence +from typing import ( + AsyncIterator, + Awaitable, + Iterable, + List, + Optional, + Sequence, + TypeVar, +) import click import httpx @@ -20,42 +31,14 @@ AggregatedItem, ExternalIDs, IMDbTitle, + PlexGuid, PlexItem, + PlexPerson, TMDBEpisode, TMDBItem, TMDBMovie, TMDBShow, ) -from .utils import ( - close_coroutines as _close_coroutines, - gather_in_batches as _gather_in_batches, - iter_gather_in_batches as _iter_gather_in_batches, - require_positive as _require_positive, - resolve_dense_model_params as _resolve_dense_model_params_impl, -) -from .ingestion import IngestionTask, IngestBatch -from .ingestion.utils import chunk_sequence as _chunk_sequence, load_from_sample as _load_from_sample_impl -from .enrichment import EnrichmentTask, IMDbRetryQueue -from .enrichment.utils import ( - build_plex_item as _build_plex_item_impl, - extract_external_ids as _extract_external_ids_impl, - fetch_imdb as _fetch_imdb_impl, - fetch_imdb_batch as _fetch_imdb_batch_impl, - fetch_tmdb_episode as _fetch_tmdb_episode_impl, - fetch_tmdb_movie as _fetch_tmdb_movie_impl, - fetch_tmdb_show as _fetch_tmdb_show_impl, - load_imdb_retry_queue as _load_imdb_retry_queue_impl, - persist_imdb_retry_queue as _persist_imdb_retry_queue_impl, - process_imdb_retry_queue as _process_imdb_retry_queue_impl, - resolve_tmdb_season_number, -) -from .storage import StorageTask -from .storage.utils import ( - build_point, - ensure_collection as _ensure_collection_impl, - process_qdrant_retry_queue as _process_qdrant_retry_queue_impl, - upsert_in_batches as _upsert_in_batches_impl, -) try: # Only import plexapi when available; the sample data mode does not require it. from plexapi.base import PlexPartialObject @@ -70,16 +53,16 @@ warnings.filterwarnings( "ignore", - message=r".*'mcp_plex\.loader' found in sys.modules after import of package 'mcp_plex'.*", + message=".*'mcp_plex\\.loader' found in sys.modules after import of package 'mcp_plex'.*", category=RuntimeWarning, ) -_IMDbRetryQueue = IMDbRetryQueue +T = TypeVar("T") _imdb_cache: IMDbCache | None = None _imdb_max_retries: int = 3 _imdb_backoff: float = 1.0 -_imdb_retry_queue: IMDbRetryQueue | None = None +_imdb_retry_queue: "_IMDbRetryQueue" | None = None _imdb_batch_limit: int = 5 _qdrant_batch_size: int = 1000 _qdrant_upsert_buffer_size: int = 200 @@ -88,84 +71,750 @@ _qdrant_retry_backoff: float = 1.0 -async def _fetch_imdb( - client: httpx.AsyncClient, - imdb_id: str, - *, - cache: IMDbCache | None = None, - max_retries: int | None = None, - backoff: float | None = None, - retry_queue: IMDbRetryQueue | None = None, - logger_override: logging.Logger | None = None, -) -> Optional[IMDbTitle]: - effective_logger = logger_override or logger - return await _fetch_imdb_impl( - client, - imdb_id, - cache=cache if cache is not None else _imdb_cache, - max_retries=max_retries if max_retries is not None else _imdb_max_retries, - backoff=backoff if backoff is not None else _imdb_backoff, - retry_queue=retry_queue if retry_queue is not None else _imdb_retry_queue, - logger=effective_logger, +@dataclass(slots=True) +class _MovieBatch: + """Batch of Plex movie items pending metadata enrichment.""" + + movies: list[PlexPartialObject] + + +@dataclass(slots=True) +class _EpisodeBatch: + """Batch of Plex episodes along with their parent show.""" + + show: PlexPartialObject + episodes: list[PlexPartialObject] + + +@dataclass(slots=True) +class _SampleBatch: + """Batch of pre-enriched items used by sample mode.""" + + items: list[AggregatedItem] + + +_IngestBatch = _MovieBatch | _EpisodeBatch | _SampleBatch + + +def _require_positive(value: int, *, name: str) -> int: + """Return *value* if positive, otherwise raise a ``ValueError``.""" + + if value <= 0: + raise ValueError(f"{name} must be positive") + return value + + +def _chunk_sequence(items: Sequence[T], size: int) -> Iterable[Sequence[T]]: + """Yield ``items`` in chunks of at most ``size`` elements.""" + + size = _require_positive(int(size), name="size") + for start in range(0, len(items), size): + yield items[start : start + size] + + +def _is_local_qdrant(client: AsyncQdrantClient) -> bool: + """Return ``True`` if *client* targets an in-process Qdrant instance.""" + + inner = getattr(client, "_client", None) + return bool(inner) and inner.__class__.__module__.startswith( + "qdrant_client.local" ) +class _IMDbRetryQueue(asyncio.Queue[str]): + """Queue that tracks items in a deque for safe serialization.""" + + def __init__(self, initial: Iterable[str] | None = None): + super().__init__() + self._items: deque[str] = deque() + if initial: + for imdb_id in initial: + imdb_id_str = str(imdb_id) + super().put_nowait(imdb_id_str) + self._items.append(imdb_id_str) + + def put_nowait(self, item: str) -> None: # type: ignore[override] + super().put_nowait(item) + self._items.append(item) + + def get_nowait(self) -> str: # type: ignore[override] + if not self._items: + raise RuntimeError("Desynchronization: Queue is not empty but self._items is empty.") + try: + item = super().get_nowait() + except asyncio.QueueEmpty: + raise RuntimeError("Desynchronization: self._items is not empty but asyncio.Queue is empty.") + self._items.popleft() + return item + + def snapshot(self) -> list[str]: + """Return a list of the current queue contents.""" + + return list(self._items) + +# Known Qdrant-managed dense embedding models with their dimensionality and +# similarity metric. To support a new server-side embedding model, add an entry +# here with the appropriate vector size and `models.Distance` value. +_DENSE_MODEL_PARAMS: dict[str, tuple[int, models.Distance]] = { + "BAAI/bge-small-en-v1.5": (384, models.Distance.COSINE), + "BAAI/bge-base-en-v1.5": (768, models.Distance.COSINE), + "BAAI/bge-large-en-v1.5": (1024, models.Distance.COSINE), + "text-embedding-3-small": (1536, models.Distance.COSINE), + "text-embedding-3-large": (3072, models.Distance.COSINE), +} + + +def _close_coroutines(tasks: Sequence[Awaitable[object]]) -> None: + """Close coroutine objects to avoid unawaited warnings.""" + + for task in tasks: + if inspect.iscoroutine(task): + task.close() + + +async def _iter_gather_in_batches( + tasks: Sequence[Awaitable[T]], batch_size: int +) -> AsyncIterator[T]: + """Yield results from awaitable tasks in fixed-size batches.""" + + try: + _require_positive(batch_size, name="batch_size") + except ValueError: + _close_coroutines(tasks) + raise + + total = len(tasks) + for i in range(0, total, batch_size): + batch = tasks[i : i + batch_size] + for result in await asyncio.gather(*batch): + yield result + logger.info("Processed %d/%d items", min(i + batch_size, total), total) + + +async def _gather_in_batches( + tasks: Sequence[Awaitable[T]], batch_size: int +) -> List[T]: + """Gather awaitable tasks in fixed-size batches.""" + + return [result async for result in _iter_gather_in_batches(tasks, batch_size)] + + +def _resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]: + """Look up Qdrant vector parameters for a known dense embedding model.""" + + try: + return _DENSE_MODEL_PARAMS[model_name] + except KeyError as exc: + raise ValueError( + f"Unknown dense embedding model '{model_name}'. Update _DENSE_MODEL_PARAMS with the model's size and distance." + ) from exc + + +async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbTitle]: + """Fetch metadata for an IMDb ID with caching and retry logic.""" + + if _imdb_cache: + cached = _imdb_cache.get(imdb_id) + if cached: + return IMDbTitle.model_validate(cached) + + url = f"https://api.imdbapi.dev/titles/{imdb_id}" + delay = _imdb_backoff + for attempt in range(_imdb_max_retries + 1): + try: + resp = await client.get(url) + except httpx.HTTPError: + logger.exception("HTTP error fetching IMDb ID %s", imdb_id) + return None + if resp.status_code == 429: + if attempt == _imdb_max_retries: + if _imdb_retry_queue is not None: + await _imdb_retry_queue.put(imdb_id) + return None + await asyncio.sleep(delay) + delay *= 2 + continue + if resp.is_success: + data = resp.json() + if _imdb_cache: + _imdb_cache.set(imdb_id, data) + return IMDbTitle.model_validate(data) + return None + return None + + async def _fetch_imdb_batch( client: httpx.AsyncClient, imdb_ids: Sequence[str] ) -> dict[str, Optional[IMDbTitle]]: - return await _fetch_imdb_batch_impl( - client, - imdb_ids, - cache=_imdb_cache, - batch_limit=_imdb_batch_limit, - max_retries=_imdb_max_retries, - backoff=_imdb_backoff, - retry_queue=_imdb_retry_queue, - logger=logger, - ) + """Fetch metadata for multiple IMDb IDs, batching requests.""" + + results: dict[str, Optional[IMDbTitle]] = {} + ids_to_fetch: list[str] = [] + for imdb_id in imdb_ids: + if _imdb_cache: + cached = _imdb_cache.get(imdb_id) + if cached: + results[imdb_id] = IMDbTitle.model_validate(cached) + continue + ids_to_fetch.append(imdb_id) + + if not ids_to_fetch: + return results + + url = "https://api.imdbapi.dev/titles:batchGet" + for i in range(0, len(ids_to_fetch), _imdb_batch_limit): + chunk = ids_to_fetch[i : i + _imdb_batch_limit] + params = [("titleIds", imdb_id) for imdb_id in chunk] + delay = _imdb_backoff + for attempt in range(_imdb_max_retries + 1): + try: + resp = await client.get(url, params=params) + except httpx.HTTPError: + logger.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk)) + for imdb_id in chunk: + results[imdb_id] = None + break + if resp.status_code == 429: + if attempt == _imdb_max_retries: + if _imdb_retry_queue is not None: + for imdb_id in chunk: + await _imdb_retry_queue.put(imdb_id) + for imdb_id in chunk: + results[imdb_id] = None + break + await asyncio.sleep(delay) + delay *= 2 + continue + if resp.is_success: + data = resp.json() + found: set[str] = set() + for title_data in data.get("titles", []): + imdb_title = IMDbTitle.model_validate(title_data) + results[imdb_title.id] = imdb_title + found.add(imdb_title.id) + if _imdb_cache: + _imdb_cache.set(imdb_title.id, title_data) + for missing in set(chunk) - found: + results[missing] = None + break + for imdb_id in chunk: + results[imdb_id] = None + break + + return results def _load_imdb_retry_queue(path: Path) -> None: + """Populate the retry queue from a JSON file if it exists.""" + global _imdb_retry_queue - _imdb_retry_queue = _load_imdb_retry_queue_impl(path, logger) + ids: list[str] = [] + if path.exists(): + try: + data = json.loads(path.read_text()) + if isinstance(data, list): + ids = [str(imdb_id) for imdb_id in data] + else: + logger.warning( + "IMDb retry queue file %s did not contain a list; ignoring its contents", + path, + ) + except Exception: + logger.exception("Failed to load IMDb retry queue from %s", path) + _imdb_retry_queue = _IMDbRetryQueue(ids) async def _process_imdb_retry_queue(client: httpx.AsyncClient) -> None: - if _imdb_retry_queue is None: + """Attempt to fetch queued IMDb IDs, re-queueing failures.""" + + if _imdb_retry_queue is None or _imdb_retry_queue.empty(): return - async def _retry_fetch(client: httpx.AsyncClient, imdb_id: str, **_: object) -> Optional[IMDbTitle]: - return await _fetch_imdb(client, imdb_id) - await _process_imdb_retry_queue_impl( - client, - _imdb_retry_queue, - cache=_imdb_cache, - max_retries=_imdb_max_retries, - backoff=_imdb_backoff, - logger=logger, - fetch_fn=_retry_fetch, - ) + size = _imdb_retry_queue.qsize() + for _ in range(size): + imdb_id = await _imdb_retry_queue.get() + title = await _fetch_imdb(client, imdb_id) + if title is None: + await _imdb_retry_queue.put(imdb_id) def _persist_imdb_retry_queue(path: Path) -> None: + """Persist the retry queue to disk.""" + if _imdb_retry_queue is None: return - _persist_imdb_retry_queue_impl(path, _imdb_retry_queue) + path.write_text(json.dumps(_imdb_retry_queue.snapshot())) + + +async def _upsert_in_batches( + client: AsyncQdrantClient, + collection_name: str, + points: Sequence[models.PointStruct], + *, + retry_queue: asyncio.Queue[list[models.PointStruct]] | None = None, +) -> None: + """Upsert points into Qdrant in batches, logging HTTP errors.""" + + total = len(points) + for i in range(0, total, _qdrant_batch_size): + batch = points[i : i + _qdrant_batch_size] + try: + await client.upsert(collection_name=collection_name, points=batch) + except Exception: + logger.exception( + "Failed to upsert batch %d-%d", i, i + len(batch) + ) + if retry_queue is not None: + await retry_queue.put(list(batch)) + else: + logger.info( + "Upserted %d/%d points", min(i + len(batch), total), total + ) + + +async def _process_qdrant_retry_queue( + client: AsyncQdrantClient, + collection_name: str, + retry_queue: asyncio.Queue[list[models.PointStruct]], +) -> None: + """Retry failed Qdrant batches with exponential backoff.""" + + if retry_queue.empty(): + return + + pending = retry_queue.qsize() + logger.info("Retrying %d failed Qdrant batches", pending) + while not retry_queue.empty(): + batch = await retry_queue.get() + attempt = 1 + while attempt <= _qdrant_retry_attempts: + try: + await client.upsert( + collection_name=collection_name, + points=batch, + ) + except Exception: + logger.exception( + "Retry %d/%d failed for Qdrant batch of %d points", + attempt, + _qdrant_retry_attempts, + len(batch), + ) + attempt += 1 + if attempt > _qdrant_retry_attempts: + logger.error( + "Giving up on Qdrant batch after %d attempts; %d points were not indexed", + _qdrant_retry_attempts, + len(batch), + ) + break + await asyncio.sleep(_qdrant_retry_backoff * attempt) + continue + else: + logger.info( + "Successfully retried Qdrant batch of %d points on attempt %d", + len(batch), + attempt, + ) + break + + +async def _ensure_collection( + client: AsyncQdrantClient, + collection_name: str, + *, + dense_size: int, + dense_distance: models.Distance, +) -> None: + """Create the collection and payload indexes if they do not already exist.""" + + created_collection = False + if not await client.collection_exists(collection_name): + await client.create_collection( + collection_name=collection_name, + vectors_config={"dense": models.VectorParams(size=dense_size, distance=dense_distance)}, + sparse_vectors_config={"sparse": models.SparseVectorParams()}, + ) + created_collection = True + + if not created_collection: + return + + suppress_payload_warning = _is_local_qdrant(client) + + async def _create_index( + field_name: str, + field_schema: models.PayloadSchemaType | models.TextIndexParams, + ) -> None: + if suppress_payload_warning: + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="Payload indexes have no effect in the local Qdrant.*", + category=UserWarning, + ) + await client.create_payload_index( + collection_name=collection_name, + field_name=field_name, + field_schema=field_schema, + ) + else: + await client.create_payload_index( + collection_name=collection_name, + field_name=field_name, + field_schema=field_schema, + ) + + text_index = models.TextIndexParams( + type=models.PayloadSchemaType.TEXT, + tokenizer=models.TokenizerType.WORD, + min_token_len=2, + lowercase=True, + ) + await _create_index("title", text_index) + await _create_index("type", models.PayloadSchemaType.KEYWORD) + await _create_index("year", models.PayloadSchemaType.INTEGER) + await _create_index("added_at", models.PayloadSchemaType.INTEGER) + await _create_index("actors", models.PayloadSchemaType.KEYWORD) + await _create_index("directors", models.PayloadSchemaType.KEYWORD) + await _create_index("writers", models.PayloadSchemaType.KEYWORD) + await _create_index("genres", models.PayloadSchemaType.KEYWORD) + await _create_index("show_title", models.PayloadSchemaType.KEYWORD) + await _create_index("season_number", models.PayloadSchemaType.INTEGER) + await _create_index("episode_number", models.PayloadSchemaType.INTEGER) + await _create_index("collections", models.PayloadSchemaType.KEYWORD) + await _create_index("summary", text_index) + await _create_index("overview", text_index) + await _create_index("plot", text_index) + await _create_index("tagline", text_index) + await _create_index("reviews", text_index) + await _create_index("data.plex.rating_key", models.PayloadSchemaType.KEYWORD) + await _create_index("data.imdb.id", models.PayloadSchemaType.KEYWORD) + await _create_index("data.tmdb.id", models.PayloadSchemaType.INTEGER) + + +async def _fetch_tmdb_movie( + client: httpx.AsyncClient, tmdb_id: str, api_key: str +) -> Optional[TMDBMovie]: + url = ( + f"https://api.themoviedb.org/3/movie/{tmdb_id}?append_to_response=reviews" + ) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception("HTTP error fetching TMDb movie %s", tmdb_id) + return None + if resp.is_success: + return TMDBMovie.model_validate(resp.json()) + return None + + +async def _fetch_tmdb_show( + client: httpx.AsyncClient, tmdb_id: str, api_key: str +) -> Optional[TMDBShow]: + url = ( + f"https://api.themoviedb.org/3/tv/{tmdb_id}?append_to_response=reviews" + ) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception("HTTP error fetching TMDb show %s", tmdb_id) + return None + if resp.is_success: + return TMDBShow.model_validate(resp.json()) + return None + +async def _fetch_tmdb_episode( + client: httpx.AsyncClient, + show_id: int, + season_number: int, + episode_number: int, + api_key: str, +) -> Optional[TMDBEpisode]: + """Fetch TMDb data for a TV episode.""" + url = ( + f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}" + ) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception( + "HTTP error fetching TMDb episode %s S%sE%s", + show_id, + season_number, + episode_number, + ) + return None + if resp.is_success: + return TMDBEpisode.model_validate(resp.json()) + return None + + +def resolve_tmdb_season_number( + show_tmdb: Optional[TMDBShow], episode: PlexPartialObject +) -> Optional[int]: + """Map a Plex episode to the appropriate TMDb season number. + + This resolves cases where Plex uses year-based season indices that do not + match TMDb's sequential ``season_number`` values. + """ + + parent_index = getattr(episode, "parentIndex", None) + parent_title = getattr(episode, "parentTitle", None) + parent_year = getattr(episode, "parentYear", None) + if parent_year is None: + parent_year = getattr(episode, "year", None) + + seasons = getattr(show_tmdb, "seasons", []) if show_tmdb else [] + + # direct numeric match + if parent_index is not None: + for season in seasons: + if season.season_number == parent_index: + return season.season_number + + # match by season name (e.g. "Season 2018" -> "2018") + title_norm: Optional[str] = None + if isinstance(parent_title, str): + title_norm = parent_title.lower().lstrip("season ").strip() + for season in seasons: + name_norm = (season.name or "").lower().lstrip("season ").strip() + if name_norm == title_norm: + return season.season_number + + # match by air date year when Plex uses year-based seasons + year: Optional[int] = None + if isinstance(parent_year, int): + year = parent_year + elif isinstance(parent_index, int): + year = parent_index + elif title_norm and title_norm.isdigit(): + year = int(title_norm) + + if year is not None: + for season in seasons: + air = getattr(season, "air_date", None) + if isinstance(air, str) and len(air) >= 4 and air[:4].isdigit(): + if int(air[:4]) == year: + return season.season_number + + if isinstance(parent_index, int): + return parent_index + if isinstance(parent_index, str) and parent_index.isdigit(): + return int(parent_index) + if isinstance(parent_title, str) and parent_title.isdigit(): + return int(parent_title) + return None def _extract_external_ids(item: PlexPartialObject) -> ExternalIDs: """Extract IMDb and TMDb IDs from a Plex object.""" - return _extract_external_ids_impl(item) + imdb_id: Optional[str] = None + tmdb_id: Optional[str] = None + for guid in getattr(item, "guids", []) or []: + gid = getattr(guid, "id", "") + if gid.startswith("imdb://"): + imdb_id = gid.split("imdb://", 1)[1] + elif gid.startswith("tmdb://"): + tmdb_id = gid.split("tmdb://", 1)[1] + return ExternalIDs(imdb=imdb_id, tmdb=tmdb_id) def _build_plex_item(item: PlexPartialObject) -> PlexItem: """Convert a Plex object into the internal :class:`PlexItem`.""" - return _build_plex_item_impl(item) + guids = [PlexGuid(id=g.id) for g in getattr(item, "guids", [])] + directors = [ + PlexPerson(id=getattr(d, "id", 0), tag=str(getattr(d, "tag", "")), thumb=getattr(d, "thumb", None)) + for d in getattr(item, "directors", []) or [] + ] + writers = [ + PlexPerson(id=getattr(w, "id", 0), tag=str(getattr(w, "tag", "")), thumb=getattr(w, "thumb", None)) + for w in getattr(item, "writers", []) or [] + ] + actors = [ + PlexPerson( + id=getattr(a, "id", 0), + tag=str(getattr(a, "tag", "")), + thumb=getattr(a, "thumb", None), + role=getattr(a, "role", None), + ) + for a in getattr(item, "actors", []) or getattr(item, "roles", []) or [] + ] + genres = [ + str(getattr(g, "tag", "")) + for g in getattr(item, "genres", []) or [] + if getattr(g, "tag", None) + ] + collections = [ + str(getattr(c, "tag", "")) + for c in getattr(item, "collections", []) or [] + if getattr(c, "tag", None) + ] + season_number = getattr(item, "parentIndex", None) + if isinstance(season_number, str): + season_number = int(season_number) if season_number.isdigit() else None + episode_number = getattr(item, "index", None) + if isinstance(episode_number, str): + episode_number = int(episode_number) if episode_number.isdigit() else None + + return PlexItem( + rating_key=str(getattr(item, "ratingKey", "")), + guid=str(getattr(item, "guid", "")), + type=str(getattr(item, "type", "")), + title=str(getattr(item, "title", "")), + show_title=getattr(item, "grandparentTitle", None), + season_title=getattr(item, "parentTitle", None), + season_number=season_number, + episode_number=episode_number, + summary=getattr(item, "summary", None), + year=getattr(item, "year", None), + added_at=getattr(item, "addedAt", None), + guids=guids, + thumb=getattr(item, "thumb", None), + art=getattr(item, "art", None), + tagline=getattr(item, "tagline", None), + content_rating=getattr(item, "contentRating", None), + directors=directors, + writers=writers, + actors=actors, + genres=genres, + collections=collections, + ) +def _format_primary_title(item: AggregatedItem) -> str: + """Format the primary title text for ``item``.""" + + primary_title = item.plex.title + if item.plex.type == "episode": + title_bits: list[str] = [] + if item.plex.show_title: + title_bits.append(item.plex.show_title) + se_parts: list[str] = [] + if item.plex.season_number is not None: + se_parts.append(f"S{item.plex.season_number:02d}") + if item.plex.episode_number is not None: + se_parts.append(f"E{item.plex.episode_number:02d}") + if se_parts: + title_bits.append("".join(se_parts)) + if item.plex.title: + title_bits.append(item.plex.title) + if title_bits: + primary_title = " - ".join(title_bits) + return primary_title + + +def _build_point_text(item: AggregatedItem) -> str: + """Return the vector text for ``item``.""" + + parts = [ + _format_primary_title(item), + item.plex.summary or "", + item.tmdb.overview if item.tmdb and hasattr(item.tmdb, "overview") else "", + item.imdb.plot if item.imdb else "", + ] + directors_text = ", ".join(p.tag for p in item.plex.directors if p.tag) + writers_text = ", ".join(p.tag for p in item.plex.writers if p.tag) + actors_text = ", ".join(p.tag for p in item.plex.actors if p.tag) + if directors_text: + parts.append(f"Directed by {directors_text}") + if writers_text: + parts.append(f"Written by {writers_text}") + if actors_text: + parts.append(f"Starring {actors_text}") + if item.plex.tagline: + parts.append(item.plex.tagline) + if item.tmdb and hasattr(item.tmdb, "tagline"): + tagline = getattr(item.tmdb, "tagline", None) + if tagline: + parts.append(tagline) + if item.tmdb and hasattr(item.tmdb, "reviews"): + parts.extend(r.get("content", "") for r in getattr(item.tmdb, "reviews", [])) + return "\n".join(p for p in parts if p) + + +def _build_point_payload(item: AggregatedItem) -> dict[str, object]: + """Construct the Qdrant payload for ``item``.""" + + payload: dict[str, object] = { + "data": item.model_dump(mode="json"), + "title": item.plex.title, + "type": item.plex.type, + } + if item.plex.type == "episode": + if item.plex.show_title: + payload["show_title"] = item.plex.show_title + if item.plex.season_title: + payload["season_title"] = item.plex.season_title + if item.plex.season_number is not None: + payload["season_number"] = item.plex.season_number + if item.plex.episode_number is not None: + payload["episode_number"] = item.plex.episode_number + if item.plex.actors: + payload["actors"] = [p.tag for p in item.plex.actors if p.tag] + if item.plex.directors: + payload["directors"] = [p.tag for p in item.plex.directors if p.tag] + if item.plex.writers: + payload["writers"] = [p.tag for p in item.plex.writers if p.tag] + if item.plex.genres: + payload["genres"] = item.plex.genres + if item.plex.collections: + payload["collections"] = item.plex.collections + summary = item.plex.summary + if summary: + payload["summary"] = summary + overview = getattr(item.tmdb, "overview", None) if item.tmdb else None + if overview: + payload["overview"] = overview + plot = item.imdb.plot if item.imdb else None + if plot: + payload["plot"] = plot + taglines = [item.plex.tagline] + if item.tmdb and hasattr(item.tmdb, "tagline"): + taglines.append(getattr(item.tmdb, "tagline", None)) + taglines = [t for t in taglines if t] + if taglines: + payload["tagline"] = "\n".join(dict.fromkeys(taglines)) + if item.tmdb and hasattr(item.tmdb, "reviews"): + review_texts = [r.get("content", "") for r in getattr(item.tmdb, "reviews", [])] + review_texts = [r for r in review_texts if r] + if review_texts: + payload["reviews"] = review_texts + if item.plex.year is not None: + payload["year"] = item.plex.year + if item.plex.added_at is not None: + added = item.plex.added_at + if hasattr(added, "timestamp"): + payload["added_at"] = int(added.timestamp()) + return payload + + +def build_point( + item: AggregatedItem, + dense_model_name: str, + sparse_model_name: str, +) -> models.PointStruct: + """Build a Qdrant point for ``item`` using the configured model names.""" + + text = _build_point_text(item) + payload = _build_point_payload(item) + point_id: int | str = ( + int(item.plex.rating_key) + if item.plex.rating_key.isdigit() + else item.plex.rating_key + ) + return models.PointStruct( + id=point_id, + vector={ + "dense": models.Document(text=text, model=dense_model_name), + "sparse": models.Document(text=text, model=sparse_model_name), + }, + payload=payload, + ) + async def _iter_from_plex( server: PlexServer, tmdb_api_key: str, *, batch_size: int = 50 @@ -247,91 +896,124 @@ async def _load_from_plex( return [item async for item in _iter_from_plex(server, tmdb_api_key, batch_size=batch_size)] -async def _fetch_tmdb_movie( - client: httpx.AsyncClient, tmdb_id: str, api_key: str -) -> Optional[TMDBMovie]: - return await _fetch_tmdb_movie_impl(client, tmdb_id, api_key, logger) - - -async def _fetch_tmdb_show( - client: httpx.AsyncClient, tmdb_id: str, api_key: str -) -> Optional[TMDBShow]: - return await _fetch_tmdb_show_impl(client, tmdb_id, api_key, logger) - - -async def _fetch_tmdb_episode( - client: httpx.AsyncClient, - show_id: int, - season_number: int, - episode_number: int, - api_key: str, -) -> Optional[TMDBEpisode]: - return await _fetch_tmdb_episode_impl( - client, show_id, season_number, episode_number, api_key, logger - ) - def _load_from_sample(sample_dir: Path) -> List[AggregatedItem]: - return _load_from_sample_impl(sample_dir) - - -async def _ensure_collection( - client: AsyncQdrantClient, - collection_name: str, - *, - dense_size: int, - dense_distance: models.Distance, - logger_override: logging.Logger | None = None, -) -> None: - await _ensure_collection_impl( - client, - collection_name, - dense_size=dense_size, - dense_distance=dense_distance, - logger=logger_override or logger, + """Load items from local sample JSON files.""" + + results: List[AggregatedItem] = [] + movie_dir = sample_dir / "movie" + episode_dir = sample_dir / "episode" + + # Movie sample + with (movie_dir / "plex.json").open("r", encoding="utf-8") as f: + movie_data = json.load(f)["MediaContainer"]["Metadata"][0] + plex_movie = PlexItem( + rating_key=str(movie_data.get("ratingKey", "")), + guid=str(movie_data.get("guid", "")), + type=movie_data.get("type", "movie"), + title=movie_data.get("title", ""), + summary=movie_data.get("summary"), + year=movie_data.get("year"), + added_at=movie_data.get("addedAt"), + guids=[PlexGuid(id=g["id"]) for g in movie_data.get("Guid", [])], + thumb=movie_data.get("thumb"), + art=movie_data.get("art"), + tagline=movie_data.get("tagline"), + content_rating=movie_data.get("contentRating"), + directors=[ + PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) + for d in movie_data.get("Director", []) + ], + writers=[ + PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) + for w in movie_data.get("Writer", []) + ], + actors=[ + PlexPerson( + id=a.get("id", 0), + tag=a.get("tag", ""), + role=a.get("role"), + thumb=a.get("thumb"), + ) + for a in movie_data.get("Role", []) + ], + genres=[g.get("tag", "") for g in movie_data.get("Genre", []) if g.get("tag")], + collections=[ + c.get("tag", "") + for key in ("Collection", "Collections") + for c in movie_data.get(key, []) or [] + if c.get("tag") + ], ) - - -async def _upsert_in_batches( - client: AsyncQdrantClient, - collection_name: str, - points: Sequence[models.PointStruct], - *, - batch_size: int | None = None, - retry_queue: asyncio.Queue[list[models.PointStruct]] | None = None, - logger_override: logging.Logger | None = None, -) -> None: - await _upsert_in_batches_impl( - client, - collection_name, - points, - batch_size=batch_size if batch_size is not None else _qdrant_batch_size, - retry_queue=retry_queue, - logger=logger_override or logger, + with (movie_dir / "imdb.json").open("r", encoding="utf-8") as f: + imdb_movie = IMDbTitle.model_validate(json.load(f)) + with (movie_dir / "tmdb.json").open("r", encoding="utf-8") as f: + tmdb_movie = TMDBMovie.model_validate(json.load(f)) + results.append(AggregatedItem(plex=plex_movie, imdb=imdb_movie, tmdb=tmdb_movie)) + + # Episode sample + with (episode_dir / "plex.tv.json").open("r", encoding="utf-8") as f: + episode_data = json.load(f)["MediaContainer"]["Metadata"][0] + plex_episode = PlexItem( + rating_key=str(episode_data.get("ratingKey", "")), + guid=str(episode_data.get("guid", "")), + type=episode_data.get("type", "episode"), + title=episode_data.get("title", ""), + show_title=episode_data.get("grandparentTitle"), + season_title=episode_data.get("parentTitle"), + season_number=episode_data.get("parentIndex"), + episode_number=episode_data.get("index"), + summary=episode_data.get("summary"), + year=episode_data.get("year"), + added_at=episode_data.get("addedAt"), + guids=[PlexGuid(id=g["id"]) for g in episode_data.get("Guid", [])], + thumb=episode_data.get("thumb"), + art=episode_data.get("art"), + tagline=episode_data.get("tagline"), + content_rating=episode_data.get("contentRating"), + directors=[ + PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) + for d in episode_data.get("Director", []) + ], + writers=[ + PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) + for w in episode_data.get("Writer", []) + ], + actors=[ + PlexPerson( + id=a.get("id", 0), + tag=a.get("tag", ""), + role=a.get("role"), + thumb=a.get("thumb"), + ) + for a in episode_data.get("Role", []) + ], + genres=[g.get("tag", "") for g in episode_data.get("Genre", []) if g.get("tag")], + collections=[ + c.get("tag", "") + for key in ("Collection", "Collections") + for c in episode_data.get(key, []) or [] + if c.get("tag") + ], ) + with (episode_dir / "imdb.tv.json").open("r", encoding="utf-8") as f: + imdb_episode = IMDbTitle.model_validate(json.load(f)) + with (episode_dir / "tmdb.tv.json").open("r", encoding="utf-8") as f: + tmdb_show = TMDBShow.model_validate(json.load(f)) + results.append(AggregatedItem(plex=plex_episode, imdb=imdb_episode, tmdb=tmdb_show)) + return results -async def _process_qdrant_retry_queue( - client: AsyncQdrantClient, - collection_name: str, - retry_queue: asyncio.Queue[list[models.PointStruct]], -) -> None: - await _process_qdrant_retry_queue_impl( - client, - collection_name, - retry_queue, - logger, - max_attempts=_qdrant_retry_attempts, - backoff=_qdrant_retry_backoff, - ) +async def _iter_from_sample(sample_dir: Path) -> AsyncIterator[AggregatedItem]: + """Yield sample data items for streaming pipelines.""" -def _resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]: - return _resolve_dense_model_params_impl(model_name) + for item in _load_from_sample(sample_dir): + yield item class LoaderPipeline: - """Coordinate ingestion, enrichment, and Qdrant storage.""" + """Coordinate ingestion, enrichment, and Qdrant upserts.""" def __init__( self, @@ -375,7 +1057,7 @@ def __init__( if self._sample_items is None and not self._tmdb_api_key: raise RuntimeError("TMDB API key required for live ingestion") - self._ingest_queue: asyncio.Queue[IngestBatch | None] = asyncio.Queue( + self._ingest_queue: asyncio.Queue[_IngestBatch | None] = asyncio.Queue( maxsize=self._enrichment_workers * 2 ) self._points_queue: asyncio.Queue[list[models.PointStruct] | None] = ( @@ -383,110 +1065,45 @@ def __init__( ) self._upsert_capacity = asyncio.Semaphore(self._max_concurrent_upserts) self._items: list[AggregatedItem] = [] - self._storage_task: StorageTask | None = None - self._enrichment_task: EnrichmentTask | None = None - self._dense_params: tuple[int, models.Distance] | None = None - self._collection_ensured = False + self._qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = ( + asyncio.Queue() + ) + self._show_tmdb_cache: dict[str, TMDBShow | None] = {} + + self._ingested_count = 0 + self._enriched_count = 0 + self._upserted_points = 0 + now = time.perf_counter() + self._ingest_start = now + self._enrich_start = now + self._upsert_start = now @property def qdrant_retry_queue(self) -> asyncio.Queue[list[models.PointStruct]]: - if self._storage_task is None: - raise RuntimeError("Pipeline has not been executed") - return self._storage_task.retry_queue + """Expose the Qdrant retry queue for post-processing.""" + + return self._qdrant_retry_queue @property def items(self) -> list[AggregatedItem]: - return self._items + """Return the aggregated items processed by the pipeline.""" - async def ensure_collection(self) -> None: - if self._dense_params is None: - self._dense_params = _resolve_dense_model_params(self._dense_model_name) - dense_size, dense_distance = self._dense_params - if not hasattr(self._client, "collection_exists"): - self._collection_ensured = True - return - await _ensure_collection( - self._client, - self._collection_name, - dense_size=dense_size, - dense_distance=dense_distance, - ) - self._collection_ensured = True - - def _log_progress(self, stage: str, count: int, start: float, queue_size: int) -> None: - elapsed = time.perf_counter() - start - rate = count / elapsed if elapsed > 0 else 0.0 - logger.info( - "%s processed %d items (%.2f items/sec, queue size=%d)", - stage, - count, - rate, - queue_size, - ) + return self._items async def execute(self) -> None: - async with httpx.AsyncClient(timeout=30) as client: - imdb_queue = _imdb_retry_queue or IMDbRetryQueue() - if _imdb_retry_queue is None: - globals()['_imdb_retry_queue'] = imdb_queue - ingestion = IngestionTask( - self._ingest_queue, - sample_items=self._sample_items, - plex_server=self._server, - plex_chunk_size=self._plex_chunk_size, - enrichment_batch_size=self._enrichment_batch_size, - enrichment_workers=self._enrichment_workers, - log_progress=self._log_progress, - ) - enrichment = EnrichmentTask( - self._ingest_queue, - self._points_queue, - http_client=client, - tmdb_api_key=self._tmdb_api_key, - imdb_cache=_imdb_cache, - imdb_retry_queue=imdb_queue, - imdb_batch_limit=_imdb_batch_limit, - imdb_max_retries=_imdb_max_retries, - imdb_backoff=_imdb_backoff, - dense_model_name=self._dense_model_name, - sparse_model_name=self._sparse_model_name, - enrichment_batch_size=self._enrichment_batch_size, - worker_count=self._enrichment_workers, - upsert_buffer_size=self._upsert_buffer_size, - upsert_capacity=self._upsert_capacity, - log_progress=self._log_progress, - logger=logger, - ) - if self._dense_params is None: - self._dense_params = _resolve_dense_model_params( - self._dense_model_name - ) - dense_size, dense_distance = self._dense_params - storage = StorageTask( - self._points_queue, - client=self._client, - collection_name=self._collection_name, - dense_size=dense_size, - dense_distance=dense_distance, - upsert_batch_size=_qdrant_batch_size, - worker_count=self._max_concurrent_upserts, - upsert_capacity=self._upsert_capacity, - log_progress=self._log_progress, - logger=logger, - retry_attempts=_qdrant_retry_attempts, - retry_backoff=_qdrant_retry_backoff, - ensure_collection_fn=_ensure_collection, - upsert_fn=_upsert_in_batches, - ) - if not self._collection_ensured: - if hasattr(self._client, "collection_exists"): - await storage.ensure_collection() - self._collection_ensured = True - - ingest_task = asyncio.create_task(ingestion.run()) - enrichment_tasks = enrichment.start_workers() - storage_tasks = storage.start_workers() + """Run the full ingestion/enrichment/upsert pipeline.""" + async with httpx.AsyncClient(timeout=30) as client: + self._http_client = client + ingest_task = asyncio.create_task(self._ingest()) + enrichment_tasks = [ + asyncio.create_task(self._enrichment_worker(worker_id)) + for worker_id in range(self._enrichment_workers) + ] + upsert_tasks = [ + asyncio.create_task(self._upsert_worker(worker_id)) + for worker_id in range(self._max_concurrent_upserts) + ] error: BaseException | None = None try: await ingest_task @@ -498,10 +1115,10 @@ async def execute(self) -> None: finally: for _ in range(self._max_concurrent_upserts): await self._points_queue.put(None) - storage_results = await asyncio.gather( - *storage_tasks, return_exceptions=True + upsert_results = await asyncio.gather( + *upsert_tasks, return_exceptions=True ) - for result in storage_results: + for result in upsert_results: if isinstance(result, BaseException) and not isinstance( result, asyncio.CancelledError ): @@ -518,16 +1135,302 @@ async def execute(self) -> None: if error is not None: raise error - self._items = enrichment.items - self._storage_task = storage - self._enrichment_task = enrichment + def _log_progress( + self, stage: str, count: int, start: float, queue_size: int + ) -> None: + elapsed = time.perf_counter() - start + rate = count / elapsed if elapsed > 0 else 0.0 + logger.info( + "%s processed %d items (%.2f items/sec, queue size=%d)", + stage, + count, + rate, + queue_size, + ) - async def drain_retry_queue(self) -> None: - if self._storage_task is None: - return - await self._storage_task.drain_retry_queue() + async def _ingest(self) -> None: + start = time.perf_counter() + self._ingest_start = start + try: + if self._sample_items is not None: + await self._ingest_sample() + else: + await self._ingest_from_plex() + finally: + for _ in range(self._enrichment_workers): + await self._ingest_queue.put(None) + self._log_progress("Ingestion", self._ingested_count, start, self._ingest_queue.qsize()) + + async def _ingest_sample(self) -> None: + for chunk in _chunk_sequence(self._sample_items or [], self._enrichment_batch_size): + batch = _SampleBatch(items=list(chunk)) + if not batch.items: + continue + await self._ingest_queue.put(batch) + self._ingested_count += len(batch.items) + self._log_progress( + "Ingestion", + self._ingested_count, + self._ingest_start, + self._ingest_queue.qsize(), + ) + async def _ingest_from_plex(self) -> None: + if self._server is None: + raise RuntimeError("Plex server unavailable for ingestion") + movie_section = self._server.library.section("Movies") + movie_keys = [int(m.ratingKey) for m in movie_section.all()] + for key_chunk in _chunk_sequence(movie_keys, self._plex_chunk_size): + key_list = list(key_chunk) + movies = list(self._server.fetchItems(key_list)) if key_list else [] + if not movies: + continue + await self._ingest_queue.put(_MovieBatch(movies=movies)) + self._ingested_count += len(movies) + self._log_progress( + "Ingestion", + self._ingested_count, + self._ingest_start, + self._ingest_queue.qsize(), + ) + show_section = self._server.library.section("TV Shows") + show_keys = [int(s.ratingKey) for s in show_section.all()] + for show_chunk in _chunk_sequence(show_keys, self._plex_chunk_size): + shows = list(self._server.fetchItems(list(show_chunk))) + for show in shows: + episode_keys = [int(e.ratingKey) for e in show.episodes()] + for episode_chunk in _chunk_sequence(episode_keys, self._plex_chunk_size): + keys = list(episode_chunk) + episodes = list(self._server.fetchItems(keys)) if keys else [] + if not episodes: + continue + await self._ingest_queue.put( + _EpisodeBatch(show=show, episodes=episodes) + ) + self._ingested_count += len(episodes) + self._log_progress( + "Ingestion", + self._ingested_count, + self._ingest_start, + self._ingest_queue.qsize(), + ) + async def _enrichment_worker(self, worker_id: int) -> None: + while True: + batch = await self._ingest_queue.get() + if batch is None: + self._ingest_queue.task_done() + break + try: + if isinstance(batch, _MovieBatch): + await self._process_movie_batch(batch) + elif isinstance(batch, _EpisodeBatch): + await self._process_episode_batch(batch) + else: + await self._process_sample_batch(batch) + finally: + self._ingest_queue.task_done() + + async def _process_movie_batch(self, batch: _MovieBatch) -> None: + for chunk in _chunk_sequence(batch.movies, self._enrichment_batch_size): + movies = list(chunk) + if not movies: + continue + if self._enriched_count == 0: + self._enrich_start = time.perf_counter() + aggregated = await self._enrich_movies(movies) + self._enriched_count += len(aggregated) + self._log_progress( + "Enrichment", + self._enriched_count, + self._enrich_start, + self._points_queue.qsize(), + ) + await self._emit_points(aggregated) + + async def _process_episode_batch(self, batch: _EpisodeBatch) -> None: + for chunk in _chunk_sequence(batch.episodes, self._enrichment_batch_size): + episodes = list(chunk) + if not episodes: + continue + if self._enriched_count == 0: + self._enrich_start = time.perf_counter() + aggregated = await self._enrich_episodes(batch.show, episodes) + self._enriched_count += len(aggregated) + self._log_progress( + "Enrichment", + self._enriched_count, + self._enrich_start, + self._points_queue.qsize(), + ) + await self._emit_points(aggregated) + + async def _process_sample_batch(self, batch: _SampleBatch) -> None: + for chunk in _chunk_sequence(batch.items, self._enrichment_batch_size): + aggregated = list(chunk) + if not aggregated: + continue + if self._enriched_count == 0: + self._enrich_start = time.perf_counter() + self._enriched_count += len(aggregated) + self._log_progress( + "Enrichment", + self._enriched_count, + self._enrich_start, + self._points_queue.qsize(), + ) + await self._emit_points(aggregated) + + async def _enrich_movies( + self, movies: Sequence[PlexPartialObject] + ) -> list[AggregatedItem]: + movie_ids = [_extract_external_ids(movie) for movie in movies] + imdb_ids = [ids.imdb for ids in movie_ids if ids.imdb] + imdb_map = ( + await _fetch_imdb_batch(self._http_client, imdb_ids) + if imdb_ids + else {} + ) + + api_key = self._tmdb_api_key + tmdb_results: list[TMDBMovie | None] = [] + if api_key: + tmdb_tasks = [ + _fetch_tmdb_movie(self._http_client, ids.tmdb, api_key) + for ids in movie_ids + if ids.tmdb + ] + if tmdb_tasks: + tmdb_results = await asyncio.gather(*tmdb_tasks) + tmdb_iter = iter(tmdb_results) + + aggregated: list[AggregatedItem] = [] + for movie, ids in zip(movies, movie_ids): + tmdb = next(tmdb_iter, None) if ids.tmdb else None + imdb = imdb_map.get(ids.imdb) if ids.imdb else None + aggregated.append( + AggregatedItem( + plex=_build_plex_item(movie), + imdb=imdb, + tmdb=tmdb, + ) + ) + return aggregated + + async def _enrich_episodes( + self, show: PlexPartialObject, episodes: Sequence[PlexPartialObject] + ) -> list[AggregatedItem]: + show_ids = _extract_external_ids(show) + show_tmdb: TMDBShow | None = None + if show_ids.tmdb: + show_tmdb = await self._get_tmdb_show(show_ids.tmdb) + episode_ids = [_extract_external_ids(ep) for ep in episodes] + imdb_ids = [ids.imdb for ids in episode_ids if ids.imdb] + imdb_map = ( + await _fetch_imdb_batch(self._http_client, imdb_ids) + if imdb_ids + else {} + ) + + tmdb_results: list[TMDBEpisode | None] = [] + if show_tmdb: + episode_tasks = [ + self._lookup_tmdb_episode(show_tmdb, ep) + for ep in episodes + ] + if episode_tasks: + tmdb_results = await asyncio.gather(*episode_tasks) + tmdb_iter = iter(tmdb_results) + aggregated: list[AggregatedItem] = [] + for ep, ids in zip(episodes, episode_ids): + tmdb_episode = next(tmdb_iter, None) if show_tmdb else None + imdb = imdb_map.get(ids.imdb) if ids.imdb else None + tmdb_item: TMDBItem | None = tmdb_episode or show_tmdb + aggregated.append( + AggregatedItem( + plex=_build_plex_item(ep), + imdb=imdb, + tmdb=tmdb_item, + ) + ) + return aggregated + + async def _get_tmdb_show(self, tmdb_id: str) -> TMDBShow | None: + if tmdb_id in self._show_tmdb_cache: + return self._show_tmdb_cache[tmdb_id] + show = await _fetch_tmdb_show(self._http_client, tmdb_id, self._tmdb_api_key or "") + self._show_tmdb_cache[tmdb_id] = show + return show + + async def _lookup_tmdb_episode( + self, show_tmdb: TMDBShow | None, episode: PlexPartialObject + ) -> TMDBEpisode | None: + if not show_tmdb or not self._tmdb_api_key: + return None + season = resolve_tmdb_season_number(show_tmdb, episode) + ep_num = getattr(episode, "index", None) + if isinstance(ep_num, str) and ep_num.isdigit(): + ep_num = int(ep_num) + if season is None or ep_num is None: + return None + return await _fetch_tmdb_episode( + self._http_client, + show_tmdb.id, + season, + ep_num, + self._tmdb_api_key, + ) + + async def _emit_points(self, aggregated: Sequence[AggregatedItem]) -> None: + if not aggregated: + return + self._items.extend(aggregated) + points = [ + build_point(item, self._dense_model_name, self._sparse_model_name) + for item in aggregated + ] + for chunk in _chunk_sequence(points, self._upsert_buffer_size): + batch = list(chunk) + if not batch: + continue + await self._upsert_capacity.acquire() + try: + await self._points_queue.put(batch) + except BaseException: + self._upsert_capacity.release() + raise + + async def _upsert_worker(self, worker_id: int) -> None: + while True: + batch = await self._points_queue.get() + if batch is None: + self._points_queue.task_done() + break + logger.info( + "Upsert worker %d handling %d points (queue size=%d)", + worker_id, + len(batch), + self._points_queue.qsize(), + ) + try: + if self._upserted_points == 0: + self._upsert_start = time.perf_counter() + await _upsert_in_batches( + self._client, + self._collection_name, + batch, + retry_queue=self._qdrant_retry_queue, + ) + self._upserted_points += len(batch) + self._log_progress( + f"Upsert worker {worker_id}", + self._upserted_points, + self._upsert_start, + self._points_queue.qsize(), + ) + finally: + self._points_queue.task_done() + self._upsert_capacity.release() async def run( plex_url: Optional[str], plex_token: Optional[str], @@ -551,6 +1454,8 @@ async def run( enrichment_batch_size: int = 100, enrichment_workers: int = 4, ) -> None: + """Core execution logic for the CLI.""" + global _imdb_cache, _imdb_max_retries, _imdb_backoff, _imdb_retry_queue _imdb_cache = IMDbCache(imdb_cache_path) if imdb_cache_path else None _imdb_max_retries = imdb_max_retries @@ -560,13 +1465,14 @@ async def run( async with httpx.AsyncClient(timeout=30) as client: await _process_imdb_retry_queue(client) else: - _imdb_retry_queue = IMDbRetryQueue() + _imdb_retry_queue = _IMDbRetryQueue() _require_positive(upsert_buffer_size, name="upsert_buffer_size") _require_positive(plex_chunk_size, name="plex_chunk_size") _require_positive(enrichment_batch_size, name="enrichment_batch_size") _require_positive(enrichment_workers, name="enrichment_workers") + dense_size, dense_distance = _resolve_dense_model_params(dense_model_name) if qdrant_url is None and qdrant_host is None: qdrant_url = ":memory:" client = AsyncQdrantClient( @@ -579,6 +1485,12 @@ async def run( prefer_grpc=qdrant_prefer_grpc, ) collection_name = "media-items" + await _ensure_collection( + client, + collection_name, + dense_size=dense_size, + dense_distance=dense_distance, + ) items: List[AggregatedItem] if sample_dir is not None: @@ -622,14 +1534,15 @@ async def run( max_concurrent_upserts=_qdrant_max_concurrent_upserts, ) - await pipeline.ensure_collection() await pipeline.execute() items = pipeline.items logger.info("Loaded %d items", len(items)) if not items: logger.info("No points to upsert") - await pipeline.drain_retry_queue() + await _process_qdrant_retry_queue( + client, collection_name, pipeline.qdrant_retry_queue + ) if imdb_queue_path: _persist_imdb_retry_queue(imdb_queue_path) @@ -728,7 +1641,7 @@ async def run( type=int, default=_qdrant_upsert_buffer_size, show_default=True, - help="Number of media items to buffer before scheduling an async storage write", + help="Number of media items to buffer before scheduling an async upsert", ) @click.option( "--plex-chunk-size", @@ -850,6 +1763,8 @@ def main( imdb_backoff: float, imdb_queue: Path, ) -> None: + """Entry-point for the ``load-data`` script.""" + asyncio.run( load_media( plex_url, @@ -904,6 +1819,8 @@ async def load_media( enrichment_batch_size: int, enrichment_workers: int, ) -> None: + """Orchestrate one or more runs of :func:`run`.""" + while True: await run( plex_url, @@ -936,34 +1853,3 @@ async def load_media( if __name__ == "__main__": main() - - -__all__ = [ - "LoaderPipeline", - "_build_plex_item", - "_chunk_sequence", - "_close_coroutines", - "_ensure_collection", - "_IMDbRetryQueue", - "_extract_external_ids", - "_fetch_imdb", - "_fetch_imdb_batch", - "_fetch_tmdb_episode", - "_fetch_tmdb_movie", - "_fetch_tmdb_show", - "_gather_in_batches", - "_iter_gather_in_batches", - "_load_from_sample", - "_load_imdb_retry_queue", - "_persist_imdb_retry_queue", - "_process_imdb_retry_queue", - "_process_qdrant_retry_queue", - "_resolve_dense_model_params", - "_require_positive", - "_upsert_in_batches", - "build_point", - "load_media", - "main", - "resolve_tmdb_season_number", - "run", -] diff --git a/mcp_plex/loader/enrichment/__init__.py b/mcp_plex/loader/enrichment/__init__.py deleted file mode 100644 index c0414e4..0000000 --- a/mcp_plex/loader/enrichment/__init__.py +++ /dev/null @@ -1,204 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -import time -from typing import Callable, Sequence - -import httpx -from qdrant_client import models - -from mcp_plex.common.types import AggregatedItem, TMDBShow - -from ..imdb_cache import IMDbCache -from ..ingestion.utils import chunk_sequence -from ..utils import require_positive -from ..storage.utils import build_point -from .types import IMDbRetryQueue -from .utils import ( - enrich_episodes, - enrich_movies, -) - -try: - from plexapi.base import PlexPartialObject -except Exception: # pragma: no cover - fall back when plexapi unavailable - PlexPartialObject = object # type: ignore[assignment] - - -class EnrichmentTask: - """Enrich Plex metadata and emit Qdrant points for storage.""" - - def __init__( - self, - ingest_queue: asyncio.Queue[object | None], - points_queue: asyncio.Queue[list[models.PointStruct] | None], - *, - http_client: httpx.AsyncClient, - tmdb_api_key: str | None, - imdb_cache: IMDbCache | None, - imdb_retry_queue: IMDbRetryQueue, - imdb_batch_limit: int, - imdb_max_retries: int, - imdb_backoff: float, - dense_model_name: str, - sparse_model_name: str, - enrichment_batch_size: int, - worker_count: int, - upsert_buffer_size: int, - upsert_capacity: asyncio.Semaphore, - log_progress: Callable[[str, int, float, int], None], - logger: logging.Logger, - ) -> None: - self._ingest_queue = ingest_queue - self._points_queue = points_queue - self._http_client = http_client - self._tmdb_api_key = tmdb_api_key - self._imdb_cache = imdb_cache - self._imdb_retry_queue = imdb_retry_queue - self._imdb_batch_limit = imdb_batch_limit - self._imdb_max_retries = imdb_max_retries - self._imdb_backoff = imdb_backoff - self._dense_model_name = dense_model_name - self._sparse_model_name = sparse_model_name - self._enrichment_batch_size = require_positive( - enrichment_batch_size, name="enrichment_batch_size" - ) - self._worker_count = require_positive(worker_count, name="worker_count") - self._upsert_buffer_size = require_positive( - upsert_buffer_size, name="upsert_buffer_size" - ) - self._upsert_capacity = upsert_capacity - self._log_progress = log_progress - self._logger = logger - - self._items: list[AggregatedItem] = [] - self._show_tmdb_cache: dict[str, TMDBShow | None] = {} - self._enriched_count = 0 - self._enrich_start = time.perf_counter() - - @property - def items(self) -> list[AggregatedItem]: - return self._items - - def start_workers(self) -> list[asyncio.Task[None]]: - return [ - asyncio.create_task(self._worker(worker_id)) - for worker_id in range(self._worker_count) - ] - - async def _worker(self, worker_id: int) -> None: - while True: - batch = await self._ingest_queue.get() - if batch is None: - self._ingest_queue.task_done() - break - try: - await self._process_batch(batch) - finally: - self._ingest_queue.task_done() - - async def _process_batch(self, batch: object) -> None: - from ..ingestion.types import EpisodeBatch, MovieBatch, SampleBatch - - if isinstance(batch, MovieBatch): - await self._process_movie_batch(batch) - elif isinstance(batch, EpisodeBatch): - await self._process_episode_batch(batch) - elif isinstance(batch, SampleBatch): - await self._process_sample_batch(batch) - else: - raise TypeError(f"Unsupported batch type: {type(batch)!r}") - - async def _process_movie_batch(self, batch: object) -> None: - from ..ingestion.types import MovieBatch - - assert isinstance(batch, MovieBatch) - for chunk in chunk_sequence(batch.movies, self._enrichment_batch_size): - movies = list(chunk) - if not movies: - continue - await self._enrich_movies(movies) - - async def _process_episode_batch(self, batch: object) -> None: - from ..ingestion.types import EpisodeBatch - - assert isinstance(batch, EpisodeBatch) - for chunk in chunk_sequence(batch.episodes, self._enrichment_batch_size): - episodes = list(chunk) - if not episodes: - continue - await self._enrich_episodes(batch.show, episodes) - - async def _process_sample_batch(self, batch: object) -> None: - from ..ingestion.types import SampleBatch - - assert isinstance(batch, SampleBatch) - for chunk in chunk_sequence(batch.items, self._enrichment_batch_size): - aggregated = list(chunk) - if not aggregated: - continue - await self._emit_points(aggregated) - - async def _enrich_movies(self, movies: Sequence[PlexPartialObject]) -> None: - aggregated = await enrich_movies( - self._http_client, - movies, - tmdb_api_key=self._tmdb_api_key, - imdb_cache=self._imdb_cache, - imdb_batch_limit=self._imdb_batch_limit, - imdb_max_retries=self._imdb_max_retries, - imdb_backoff=self._imdb_backoff, - imdb_retry_queue=self._imdb_retry_queue, - logger=self._logger, - ) - await self._emit_points(aggregated) - - async def _enrich_episodes( - self, show: PlexPartialObject, episodes: Sequence[PlexPartialObject] - ) -> None: - aggregated = await enrich_episodes( - self._http_client, - show, - episodes, - tmdb_api_key=self._tmdb_api_key, - imdb_cache=self._imdb_cache, - imdb_batch_limit=self._imdb_batch_limit, - imdb_max_retries=self._imdb_max_retries, - imdb_backoff=self._imdb_backoff, - imdb_retry_queue=self._imdb_retry_queue, - show_tmdb_cache=self._show_tmdb_cache, - logger=self._logger, - ) - await self._emit_points(aggregated) - - async def _emit_points(self, aggregated: Sequence[AggregatedItem]) -> None: - if not aggregated: - return - if self._enriched_count == 0: - self._enrich_start = time.perf_counter() - self._items.extend(aggregated) - self._enriched_count += len(aggregated) - self._log_progress( - "Enrichment", - self._enriched_count, - self._enrich_start, - self._points_queue.qsize(), - ) - points = [ - build_point(item, self._dense_model_name, self._sparse_model_name) - for item in aggregated - ] - for chunk in chunk_sequence(points, self._upsert_buffer_size): - batch = list(chunk) - if not batch: - continue - await self._upsert_capacity.acquire() - try: - await self._points_queue.put(batch) - except BaseException: - self._upsert_capacity.release() - raise - - -__all__ = ["EnrichmentTask", "IMDbRetryQueue"] diff --git a/mcp_plex/loader/enrichment/types.py b/mcp_plex/loader/enrichment/types.py deleted file mode 100644 index c867ac4..0000000 --- a/mcp_plex/loader/enrichment/types.py +++ /dev/null @@ -1,42 +0,0 @@ -from __future__ import annotations - -import asyncio -from collections import deque -from typing import Iterable - - -class IMDbRetryQueue(asyncio.Queue[str]): - """Queue that tracks items in a deque for safe serialization.""" - - def __init__(self, initial: Iterable[str] | None = None): - super().__init__() - self._items: deque[str] = deque() - if initial: - for imdb_id in initial: - imdb_id_str = str(imdb_id) - super().put_nowait(imdb_id_str) - self._items.append(imdb_id_str) - - def put_nowait(self, item: str) -> None: # type: ignore[override] - super().put_nowait(item) - self._items.append(item) - - def get_nowait(self) -> str: # type: ignore[override] - if not self._items: - raise RuntimeError("Desynchronization: Queue is not empty but self._items is empty.") - try: - item = super().get_nowait() - except asyncio.QueueEmpty: - raise RuntimeError( - "Desynchronization: self._items is not empty but asyncio.Queue is empty." - ) - self._items.popleft() - return item - - def snapshot(self) -> list[str]: - """Return a list of the current queue contents.""" - - return list(self._items) - - -__all__ = ["IMDbRetryQueue"] diff --git a/mcp_plex/loader/enrichment/utils.py b/mcp_plex/loader/enrichment/utils.py deleted file mode 100644 index 72681b5..0000000 --- a/mcp_plex/loader/enrichment/utils.py +++ /dev/null @@ -1,552 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -import logging -from pathlib import Path -from typing import Awaitable, Callable, Optional, Sequence - -import httpx - -from mcp_plex.common.types import ( - AggregatedItem, - ExternalIDs, - IMDbTitle, - PlexGuid, - PlexItem, - PlexPerson, - TMDBEpisode, - TMDBItem, - TMDBMovie, - TMDBShow, -) - -from ..imdb_cache import IMDbCache -from ..utils import gather_in_batches -from .types import IMDbRetryQueue - -try: - from plexapi.base import PlexPartialObject -except Exception: # pragma: no cover - fall back when plexapi unavailable - PlexPartialObject = object # type: ignore[assignment] - - -async def fetch_imdb( - client: httpx.AsyncClient, - imdb_id: str, - *, - cache: IMDbCache | None, - max_retries: int, - backoff: float, - retry_queue: IMDbRetryQueue | None, - logger: logging.Logger, -) -> Optional[IMDbTitle]: - """Fetch metadata for an IMDb ID with caching and retry logic.""" - - if cache: - cached = cache.get(imdb_id) - if cached: - return IMDbTitle.model_validate(cached) - - url = f"https://api.imdbapi.dev/titles/{imdb_id}" - delay = backoff - for attempt in range(max_retries + 1): - try: - resp = await client.get(url) - except httpx.HTTPError: - logger.exception("HTTP error fetching IMDb ID %s", imdb_id) - return None - if resp.status_code == 429: - if attempt == max_retries: - if retry_queue is not None: - await retry_queue.put(imdb_id) - return None - await asyncio.sleep(delay) - delay *= 2 - continue - if resp.is_success: - data = resp.json() - if cache: - cache.set(imdb_id, data) - return IMDbTitle.model_validate(data) - return None - return None - - -async def fetch_imdb_batch( - client: httpx.AsyncClient, - imdb_ids: Sequence[str], - *, - cache: IMDbCache | None, - batch_limit: int, - max_retries: int, - backoff: float, - retry_queue: IMDbRetryQueue | None, - logger: logging.Logger, -) -> dict[str, Optional[IMDbTitle]]: - """Fetch metadata for multiple IMDb IDs, batching requests.""" - - results: dict[str, Optional[IMDbTitle]] = {} - ids_to_fetch: list[str] = [] - for imdb_id in imdb_ids: - if cache: - cached = cache.get(imdb_id) - if cached: - results[imdb_id] = IMDbTitle.model_validate(cached) - continue - ids_to_fetch.append(imdb_id) - - if not ids_to_fetch: - return results - - url = "https://api.imdbapi.dev/titles:batchGet" - for i in range(0, len(ids_to_fetch), batch_limit): - chunk = ids_to_fetch[i : i + batch_limit] - params = [("titleIds", imdb_id) for imdb_id in chunk] - delay = backoff - for attempt in range(max_retries + 1): - try: - resp = await client.get(url, params=params) - except httpx.HTTPError: - logger.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk)) - for imdb_id in chunk: - results[imdb_id] = None - break - if resp.status_code == 429: - if attempt == max_retries: - if retry_queue is not None: - for imdb_id in chunk: - await retry_queue.put(imdb_id) - for imdb_id in chunk: - results[imdb_id] = None - break - await asyncio.sleep(delay) - delay *= 2 - continue - if resp.is_success: - data = resp.json() - found: set[str] = set() - for title_data in data.get("titles", []): - imdb_title = IMDbTitle.model_validate(title_data) - results[imdb_title.id] = imdb_title - found.add(imdb_title.id) - if cache: - cache.set(imdb_title.id, title_data) - for missing in set(chunk) - found: - results[missing] = None - break - for imdb_id in chunk: - results[imdb_id] = None - break - - return results - - -def load_imdb_retry_queue(path: Path, logger: logging.Logger) -> IMDbRetryQueue: - """Populate the retry queue from a JSON file if it exists.""" - - ids: list[str] = [] - if path.exists(): - try: - data = json.loads(path.read_text()) - if isinstance(data, list): - ids = [str(imdb_id) for imdb_id in data] - else: - logger.warning( - "IMDb retry queue file %s did not contain a list; ignoring its contents", - path, - ) - except Exception: - logger.exception("Failed to load IMDb retry queue from %s", path) - return IMDbRetryQueue(ids) - - -async def process_imdb_retry_queue( - client: httpx.AsyncClient, - queue: IMDbRetryQueue, - *, - cache: IMDbCache | None, - max_retries: int, - backoff: float, - logger: logging.Logger, - fetch_fn: Callable[..., Awaitable[Optional[IMDbTitle]]] | None = None, -) -> None: - """Attempt to fetch queued IMDb IDs, re-queueing failures.""" - - if queue.empty(): - return - size = queue.qsize() - for _ in range(size): - imdb_id = await queue.get() - fetch = fetch_fn or fetch_imdb - title = await fetch( - client, - imdb_id, - cache=cache, - max_retries=max_retries, - backoff=backoff, - retry_queue=queue, - logger=logger, - ) - if title is None: - await queue.put(imdb_id) - - -def persist_imdb_retry_queue(path: Path, queue: IMDbRetryQueue) -> None: - """Persist the retry queue to disk.""" - - path.write_text(json.dumps(queue.snapshot())) - - -async def fetch_tmdb_movie( - client: httpx.AsyncClient, tmdb_id: str, api_key: str, logger: logging.Logger -) -> Optional[TMDBMovie]: - url = ( - f"https://api.themoviedb.org/3/movie/{tmdb_id}?append_to_response=reviews" - ) - try: - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) - except httpx.HTTPError: - logger.exception("HTTP error fetching TMDb movie %s", tmdb_id) - return None - if resp.is_success: - return TMDBMovie.model_validate(resp.json()) - return None - - -async def fetch_tmdb_show( - client: httpx.AsyncClient, tmdb_id: str, api_key: str, logger: logging.Logger -) -> Optional[TMDBShow]: - url = ( - f"https://api.themoviedb.org/3/tv/{tmdb_id}?append_to_response=reviews" - ) - try: - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) - except httpx.HTTPError: - logger.exception("HTTP error fetching TMDb show %s", tmdb_id) - return None - if resp.is_success: - return TMDBShow.model_validate(resp.json()) - return None - - -async def fetch_tmdb_episode( - client: httpx.AsyncClient, - show_id: int, - season_number: int, - episode_number: int, - api_key: str, - logger: logging.Logger, -) -> Optional[TMDBEpisode]: - """Fetch TMDb data for a TV episode.""" - - url = ( - f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}" - ) - try: - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) - except httpx.HTTPError: - logger.exception( - "HTTP error fetching TMDb episode %s S%sE%s", - show_id, - season_number, - episode_number, - ) - return None - if resp.is_success: - return TMDBEpisode.model_validate(resp.json()) - return None - - -def extract_external_ids(item: PlexPartialObject) -> ExternalIDs: - """Extract IMDb and TMDb IDs from a Plex object.""" - - imdb_id: Optional[str] = None - tmdb_id: Optional[str] = None - for guid in getattr(item, "guids", []) or []: - gid = getattr(guid, "id", "") - if gid.startswith("imdb://"): - imdb_id = gid.split("imdb://", 1)[1] - elif gid.startswith("tmdb://"): - tmdb_id = gid.split("tmdb://", 1)[1] - return ExternalIDs(imdb=imdb_id, tmdb=tmdb_id) - - -def build_plex_item(item: PlexPartialObject) -> PlexItem: - """Convert a Plex object into the internal :class:`PlexItem`.""" - - guids = [PlexGuid(id=g.id) for g in getattr(item, "guids", [])] - directors = [ - PlexPerson( - id=getattr(d, "id", 0), - tag=str(getattr(d, "tag", "")), - thumb=getattr(d, "thumb", None), - ) - for d in getattr(item, "directors", []) or [] - ] - writers = [ - PlexPerson( - id=getattr(w, "id", 0), - tag=str(getattr(w, "tag", "")), - thumb=getattr(w, "thumb", None), - ) - for w in getattr(item, "writers", []) or [] - ] - actors = [ - PlexPerson( - id=getattr(a, "id", 0), - tag=str(getattr(a, "tag", "")), - thumb=getattr(a, "thumb", None), - role=getattr(a, "role", None), - ) - for a in getattr(item, "actors", []) or getattr(item, "roles", []) or [] - ] - genres = [ - str(getattr(g, "tag", "")) - for g in getattr(item, "genres", []) or [] - if getattr(g, "tag", None) - ] - collections = [ - str(getattr(c, "tag", "")) - for c in getattr(item, "collections", []) or [] - if getattr(c, "tag", None) - ] - season_number = getattr(item, "parentIndex", None) - if isinstance(season_number, str): - season_number = int(season_number) if season_number.isdigit() else None - episode_number = getattr(item, "index", None) - if isinstance(episode_number, str): - episode_number = int(episode_number) if episode_number.isdigit() else None - - return PlexItem( - rating_key=str(getattr(item, "ratingKey", "")), - guid=str(getattr(item, "guid", "")), - type=str(getattr(item, "type", "")), - title=str(getattr(item, "title", "")), - show_title=getattr(item, "grandparentTitle", None), - season_title=getattr(item, "parentTitle", None), - season_number=season_number, - episode_number=episode_number, - summary=getattr(item, "summary", None), - year=getattr(item, "year", None), - added_at=getattr(item, "addedAt", None), - guids=guids, - thumb=getattr(item, "thumb", None), - art=getattr(item, "art", None), - tagline=getattr(item, "tagline", None), - content_rating=getattr(item, "contentRating", None), - directors=directors, - writers=writers, - actors=actors, - genres=genres, - collections=collections, - ) - - -def resolve_tmdb_season_number( - show_tmdb: Optional[TMDBShow], episode: PlexPartialObject -) -> Optional[int]: - """Map a Plex episode to the appropriate TMDb season number.""" - - parent_index = getattr(episode, "parentIndex", None) - parent_title = getattr(episode, "parentTitle", None) - parent_year = getattr(episode, "parentYear", None) - if parent_year is None: - parent_year = getattr(episode, "year", None) - - seasons = getattr(show_tmdb, "seasons", []) if show_tmdb else [] - - if parent_index is not None: - for season in seasons: - if season.season_number == parent_index: - return season.season_number - - title_norm: Optional[str] = None - if isinstance(parent_title, str): - title_norm = parent_title.lower().lstrip("season ").strip() - for season in seasons: - name_norm = (season.name or "").lower().lstrip("season ").strip() - if name_norm == title_norm: - return season.season_number - - year: Optional[int] = None - if isinstance(parent_year, int): - year = parent_year - elif isinstance(parent_index, int): - year = parent_index - elif title_norm and title_norm.isdigit(): - year = int(title_norm) - - if year is not None: - for season in seasons: - air = getattr(season, "air_date", None) - if isinstance(air, str) and len(air) >= 4 and air[:4].isdigit(): - if int(air[:4]) == year: - return season.season_number - - if isinstance(parent_index, int): - return parent_index - if isinstance(parent_index, str) and parent_index.isdigit(): - return int(parent_index) - if isinstance(parent_title, str) and parent_title.isdigit(): - return int(parent_title) - return None - - -async def enrich_movies( - client: httpx.AsyncClient, - movies: Sequence[PlexPartialObject], - *, - tmdb_api_key: str | None, - imdb_cache: IMDbCache | None, - imdb_batch_limit: int, - imdb_max_retries: int, - imdb_backoff: float, - imdb_retry_queue: IMDbRetryQueue | None, - logger: logging.Logger, -) -> list[AggregatedItem]: - """Enrich Plex movie metadata with IMDb and TMDb details.""" - - movie_ids = [extract_external_ids(movie) for movie in movies] - imdb_ids = [ids.imdb for ids in movie_ids if ids.imdb] - imdb_map = ( - await fetch_imdb_batch( - client, - imdb_ids, - cache=imdb_cache, - batch_limit=imdb_batch_limit, - max_retries=imdb_max_retries, - backoff=imdb_backoff, - retry_queue=imdb_retry_queue, - logger=logger, - ) - if imdb_ids - else {} - ) - - tmdb_results: list[TMDBMovie | None] = [] - if tmdb_api_key: - tmdb_tasks = [ - fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key, logger) - for ids in movie_ids - if ids.tmdb - ] - if tmdb_tasks: - tmdb_results = await gather_in_batches(tmdb_tasks, len(tmdb_tasks)) - tmdb_iter = iter(tmdb_results) - - aggregated: list[AggregatedItem] = [] - for movie, ids in zip(movies, movie_ids): - tmdb = next(tmdb_iter, None) if ids.tmdb else None - imdb = imdb_map.get(ids.imdb) if ids.imdb else None - aggregated.append( - AggregatedItem( - plex=build_plex_item(movie), - imdb=imdb, - tmdb=tmdb, - ) - ) - return aggregated - - -async def enrich_episodes( - client: httpx.AsyncClient, - show: PlexPartialObject, - episodes: Sequence[PlexPartialObject], - *, - tmdb_api_key: str | None, - imdb_cache: IMDbCache | None, - imdb_batch_limit: int, - imdb_max_retries: int, - imdb_backoff: float, - imdb_retry_queue: IMDbRetryQueue | None, - show_tmdb_cache: dict[str, TMDBShow | None], - logger: logging.Logger, -) -> list[AggregatedItem]: - """Enrich Plex episode metadata with IMDb and TMDb details.""" - - show_ids = extract_external_ids(show) - show_tmdb: TMDBShow | None = None - if show_ids.tmdb: - if show_ids.tmdb in show_tmdb_cache: - show_tmdb = show_tmdb_cache[show_ids.tmdb] - elif tmdb_api_key: - show_tmdb = await fetch_tmdb_show( - client, show_ids.tmdb, tmdb_api_key, logger - ) - show_tmdb_cache[show_ids.tmdb] = show_tmdb - - episode_ids = [extract_external_ids(ep) for ep in episodes] - imdb_ids = [ids.imdb for ids in episode_ids if ids.imdb] - imdb_map = ( - await fetch_imdb_batch( - client, - imdb_ids, - cache=imdb_cache, - batch_limit=imdb_batch_limit, - max_retries=imdb_max_retries, - backoff=imdb_backoff, - retry_queue=imdb_retry_queue, - logger=logger, - ) - if imdb_ids - else {} - ) - - tmdb_results: list[TMDBEpisode | None] = [None] * len(episodes) - if show_tmdb and tmdb_api_key: - episode_tasks: list[asyncio.Future[TMDBEpisode | None]] = [] - indices: list[int] = [] - for idx, ep in enumerate(episodes): - season = resolve_tmdb_season_number(show_tmdb, ep) - ep_num = getattr(ep, "index", None) - if isinstance(ep_num, str) and ep_num.isdigit(): - ep_num = int(ep_num) - if season is None or ep_num is None: - continue - indices.append(idx) - episode_tasks.append( - fetch_tmdb_episode( - client, - show_tmdb.id, - season, - ep_num, - tmdb_api_key, - logger, - ) - ) - if episode_tasks: - fetched = await gather_in_batches(episode_tasks, len(episode_tasks)) - for idx, value in zip(indices, fetched): - tmdb_results[idx] = value - - aggregated: list[AggregatedItem] = [] - for ep, ids, tmdb_episode in zip(episodes, episode_ids, tmdb_results): - imdb = imdb_map.get(ids.imdb) if ids.imdb else None - tmdb_item: TMDBItem | None = tmdb_episode or show_tmdb - aggregated.append( - AggregatedItem( - plex=build_plex_item(ep), - imdb=imdb, - tmdb=tmdb_item, - ) - ) - return aggregated - - -__all__ = [ - "IMDbRetryQueue", - "build_plex_item", - "enrich_episodes", - "enrich_movies", - "extract_external_ids", - "fetch_imdb", - "fetch_imdb_batch", - "fetch_tmdb_episode", - "fetch_tmdb_movie", - "fetch_tmdb_show", - "load_imdb_retry_queue", - "persist_imdb_retry_queue", - "process_imdb_retry_queue", - "resolve_tmdb_season_number", -] diff --git a/mcp_plex/loader/ingestion/__init__.py b/mcp_plex/loader/ingestion/__init__.py deleted file mode 100644 index 5ef4f16..0000000 --- a/mcp_plex/loader/ingestion/__init__.py +++ /dev/null @@ -1,122 +0,0 @@ -from __future__ import annotations - -import asyncio -import time -from typing import Callable, Sequence - -from mcp_plex.common.types import AggregatedItem - -from ..utils import require_positive -from .types import EpisodeBatch, IngestBatch, MovieBatch, SampleBatch -from .utils import chunk_sequence - -try: - from plexapi.server import PlexServer -except Exception: # pragma: no cover - fall back when plexapi unavailable - PlexServer = None # type: ignore[assignment] - - -class IngestionTask: - """Fetch Plex data and feed ingestion batches to the enrichment stage.""" - - def __init__( - self, - queue: asyncio.Queue[IngestBatch | None], - *, - sample_items: list[AggregatedItem] | None, - plex_server: PlexServer | None, - plex_chunk_size: int, - enrichment_batch_size: int, - enrichment_workers: int, - log_progress: Callable[[str, int, float, int], None], - ) -> None: - self._queue = queue - self._sample_items = sample_items - self._server = plex_server - self._plex_chunk_size = require_positive(plex_chunk_size, name="plex_chunk_size") - self._enrichment_batch_size = require_positive( - enrichment_batch_size, name="enrichment_batch_size" - ) - self._worker_count = require_positive( - enrichment_workers, name="enrichment_workers" - ) - self._log_progress = log_progress - self._count = 0 - self._start = time.perf_counter() - - @property - def count(self) -> int: - return self._count - - @property - def start_time(self) -> float: - return self._start - - async def run(self) -> None: - self._start = time.perf_counter() - try: - if self._sample_items is not None: - await self._ingest_sample(self._sample_items) - else: - await self._ingest_from_plex() - finally: - for _ in range(self._worker_count): - await self._queue.put(None) - self._log_progress("Ingestion", self._count, self._start, self._queue.qsize()) - - async def _ingest_sample(self, items: Sequence[AggregatedItem]) -> None: - for chunk in chunk_sequence(items, self._enrichment_batch_size): - batch = SampleBatch(items=list(chunk)) - if not batch.items: - continue - await self._queue.put(batch) - self._count += len(batch.items) - self._log_progress( - "Ingestion", - self._count, - self._start, - self._queue.qsize(), - ) - - async def _ingest_from_plex(self) -> None: - if self._server is None: - raise RuntimeError("Plex server unavailable for ingestion") - movie_section = self._server.library.section("Movies") - movie_keys = [int(m.ratingKey) for m in movie_section.all()] - for key_chunk in chunk_sequence(movie_keys, self._plex_chunk_size): - key_list = list(key_chunk) - movies = list(self._server.fetchItems(key_list)) if key_list else [] - if not movies: - continue - await self._queue.put(MovieBatch(movies=movies)) - self._count += len(movies) - self._log_progress( - "Ingestion", - self._count, - self._start, - self._queue.qsize(), - ) - show_section = self._server.library.section("TV Shows") - show_keys = [int(s.ratingKey) for s in show_section.all()] - for show_chunk in chunk_sequence(show_keys, self._plex_chunk_size): - shows = list(self._server.fetchItems(list(show_chunk))) - for show in shows: - episode_keys = [int(e.ratingKey) for e in show.episodes()] - for episode_chunk in chunk_sequence( - episode_keys, self._plex_chunk_size - ): - keys = list(episode_chunk) - episodes = list(self._server.fetchItems(keys)) if keys else [] - if not episodes: - continue - await self._queue.put(EpisodeBatch(show=show, episodes=episodes)) - self._count += len(episodes) - self._log_progress( - "Ingestion", - self._count, - self._start, - self._queue.qsize(), - ) - - -__all__ = ["IngestionTask", "EpisodeBatch", "IngestBatch", "MovieBatch", "SampleBatch"] diff --git a/mcp_plex/loader/ingestion/types.py b/mcp_plex/loader/ingestion/types.py deleted file mode 100644 index 04443ac..0000000 --- a/mcp_plex/loader/ingestion/types.py +++ /dev/null @@ -1,43 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Union - -try: - from plexapi.base import PlexPartialObject -except Exception: # pragma: no cover - fall back when plexapi unavailable - PlexPartialObject = object # type: ignore[assignment] - -from mcp_plex.common.types import AggregatedItem - - -@dataclass(slots=True) -class MovieBatch: - """Batch of Plex movie items pending metadata enrichment.""" - - movies: list[PlexPartialObject] - - -@dataclass(slots=True) -class EpisodeBatch: - """Batch of Plex episodes along with their parent show.""" - - show: PlexPartialObject - episodes: list[PlexPartialObject] - - -@dataclass(slots=True) -class SampleBatch: - """Batch of pre-enriched items used by sample mode.""" - - items: list[AggregatedItem] - - -IngestBatch = Union[MovieBatch, EpisodeBatch, SampleBatch] - -__all__ = [ - "EpisodeBatch", - "IngestBatch", - "MovieBatch", - "SampleBatch", -] diff --git a/mcp_plex/loader/ingestion/utils.py b/mcp_plex/loader/ingestion/utils.py deleted file mode 100644 index cb263ca..0000000 --- a/mcp_plex/loader/ingestion/utils.py +++ /dev/null @@ -1,136 +0,0 @@ -from __future__ import annotations - -import json -from pathlib import Path -from typing import Iterable, Sequence, TypeVar - -from mcp_plex.common.types import ( - AggregatedItem, - IMDbTitle, - PlexGuid, - PlexItem, - PlexPerson, - TMDBMovie, - TMDBShow, -) - -from ..utils import require_positive - -T = TypeVar("T") - - -def chunk_sequence(items: Sequence[T], size: int) -> Iterable[Sequence[T]]: - """Yield ``items`` in chunks of at most ``size`` elements.""" - - size = require_positive(int(size), name="size") - for start in range(0, len(items), size): - yield items[start : start + size] - - -def load_from_sample(sample_dir: Path) -> list[AggregatedItem]: - """Load items from local sample JSON files.""" - - results: list[AggregatedItem] = [] - movie_dir = sample_dir / "movie" - episode_dir = sample_dir / "episode" - - with (movie_dir / "plex.json").open("r", encoding="utf-8") as f: - movie_data = json.load(f)["MediaContainer"]["Metadata"][0] - plex_movie = PlexItem( - rating_key=str(movie_data.get("ratingKey", "")), - guid=str(movie_data.get("guid", "")), - type=movie_data.get("type", "movie"), - title=movie_data.get("title", ""), - summary=movie_data.get("summary"), - year=movie_data.get("year"), - added_at=movie_data.get("addedAt"), - guids=[PlexGuid(id=g["id"]) for g in movie_data.get("Guid", [])], - thumb=movie_data.get("thumb"), - art=movie_data.get("art"), - tagline=movie_data.get("tagline"), - content_rating=movie_data.get("contentRating"), - directors=[ - PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) - for d in movie_data.get("Director", []) - ], - writers=[ - PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) - for w in movie_data.get("Writer", []) - ], - actors=[ - PlexPerson( - id=a.get("id", 0), - tag=a.get("tag", ""), - role=a.get("role"), - thumb=a.get("thumb"), - ) - for a in movie_data.get("Role", []) - ], - genres=[g.get("tag", "") for g in movie_data.get("Genre", []) if g.get("tag")], - collections=[ - c.get("tag", "") - for key in ("Collection", "Collections") - for c in movie_data.get(key, []) or [] - if c.get("tag") - ], - ) - with (movie_dir / "imdb.json").open("r", encoding="utf-8") as f: - imdb_movie = IMDbTitle.model_validate(json.load(f)) - with (movie_dir / "tmdb.json").open("r", encoding="utf-8") as f: - tmdb_movie = TMDBMovie.model_validate(json.load(f)) - results.append(AggregatedItem(plex=plex_movie, imdb=imdb_movie, tmdb=tmdb_movie)) - - with (episode_dir / "plex.tv.json").open("r", encoding="utf-8") as f: - episode_data = json.load(f)["MediaContainer"]["Metadata"][0] - plex_episode = PlexItem( - rating_key=str(episode_data.get("ratingKey", "")), - guid=str(episode_data.get("guid", "")), - type=episode_data.get("type", "episode"), - title=episode_data.get("title", ""), - show_title=episode_data.get("grandparentTitle"), - season_title=episode_data.get("parentTitle"), - season_number=episode_data.get("parentIndex"), - episode_number=episode_data.get("index"), - summary=episode_data.get("summary"), - year=episode_data.get("year"), - added_at=episode_data.get("addedAt"), - guids=[PlexGuid(id=g["id"]) for g in episode_data.get("Guid", [])], - thumb=episode_data.get("thumb"), - art=episode_data.get("art"), - tagline=episode_data.get("tagline"), - content_rating=episode_data.get("contentRating"), - directors=[ - PlexPerson(id=d.get("id", 0), tag=d.get("tag", ""), thumb=d.get("thumb")) - for d in episode_data.get("Director", []) - ], - writers=[ - PlexPerson(id=w.get("id", 0), tag=w.get("tag", ""), thumb=w.get("thumb")) - for w in episode_data.get("Writer", []) - ], - actors=[ - PlexPerson( - id=a.get("id", 0), - tag=a.get("tag", ""), - role=a.get("role"), - thumb=a.get("thumb"), - ) - for a in episode_data.get("Role", []) - ], - genres=[g.get("tag", "") for g in episode_data.get("Genre", []) if g.get("tag")], - collections=[ - c.get("tag", "") - for key in ("Collection", "Collections") - for c in episode_data.get(key, []) or [] - if c.get("tag") - ], - ) - with (episode_dir / "imdb.tv.json").open("r", encoding="utf-8") as f: - imdb_episode = IMDbTitle.model_validate(json.load(f)) - with (episode_dir / "tmdb.tv.json").open("r", encoding="utf-8") as f: - tmdb_show = TMDBShow.model_validate(json.load(f)) - results.append(AggregatedItem(plex=plex_episode, imdb=imdb_episode, tmdb=tmdb_show)) - - return results - - -__all__ = ["chunk_sequence", "load_from_sample"] diff --git a/mcp_plex/loader/storage/__init__.py b/mcp_plex/loader/storage/__init__.py deleted file mode 100644 index 55a8c5a..0000000 --- a/mcp_plex/loader/storage/__init__.py +++ /dev/null @@ -1,125 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -import time -from typing import Awaitable, Callable - -from qdrant_client import models -from qdrant_client.async_qdrant_client import AsyncQdrantClient - -from ..utils import require_positive -from .types import StorageBatch -from .utils import ( - build_point, - ensure_collection, - process_qdrant_retry_queue, - upsert_in_batches, -) - - -class StorageTask: - """Persist enriched points into Qdrant.""" - - def __init__( - self, - points_queue: asyncio.Queue[StorageBatch | None], - *, - client: AsyncQdrantClient, - collection_name: str, - dense_size: int, - dense_distance: models.Distance, - upsert_batch_size: int, - worker_count: int, - upsert_capacity: asyncio.Semaphore, - log_progress: Callable[[str, int, float, int], None], - logger: logging.Logger, - retry_attempts: int, - retry_backoff: float, - ensure_collection_fn: Callable[..., Awaitable[None]] = ensure_collection, - upsert_fn: Callable[..., Awaitable[None]] = upsert_in_batches, - ) -> None: - self._points_queue = points_queue - self._client = client - self._collection_name = collection_name - self._upsert_batch_size = require_positive(upsert_batch_size, name="upsert_batch_size") - self._worker_count = require_positive(worker_count, name="worker_count") - self._upsert_capacity = upsert_capacity - self._log_progress = log_progress - self._logger = logger - self._retry_attempts = retry_attempts - self._retry_backoff = retry_backoff - self._ensure_collection_fn = ensure_collection_fn - self._upsert_fn = upsert_fn - - self._upserted_points = 0 - self._upsert_start = time.perf_counter() - self._retry_queue: asyncio.Queue[list[models.PointStruct]] = asyncio.Queue() - self._dense_size = dense_size - self._dense_distance = dense_distance - - @property - def retry_queue(self) -> asyncio.Queue[list[models.PointStruct]]: - return self._retry_queue - - async def ensure_collection(self) -> None: - await self._ensure_collection_fn( - self._client, - self._collection_name, - dense_size=self._dense_size, - dense_distance=self._dense_distance, - logger_override=self._logger, - ) - - def start_workers(self) -> list[asyncio.Task[None]]: - return [ - asyncio.create_task(self._worker(worker_id)) - for worker_id in range(self._worker_count) - ] - - async def _worker(self, worker_id: int) -> None: - while True: - batch = await self._points_queue.get() - if batch is None: - self._points_queue.task_done() - break - self._logger.info( - "Storage worker %d handling %d points (queue size=%d)", - worker_id, - len(batch), - self._points_queue.qsize(), - ) - try: - await self._upsert_fn( - self._client, - self._collection_name, - batch, - batch_size=self._upsert_batch_size, - retry_queue=self._retry_queue, - logger_override=self._logger, - ) - if self._upserted_points == 0: - self._upsert_start = time.perf_counter() - self._upserted_points += len(batch) - self._log_progress( - f"Storage worker {worker_id}", - self._upserted_points, - self._upsert_start, - self._points_queue.qsize(), - ) - finally: - self._points_queue.task_done() - self._upsert_capacity.release() - - async def drain_retry_queue(self) -> None: - await process_qdrant_retry_queue( - self._client, - self._collection_name, - self._retry_queue, - self._logger, - max_attempts=self._retry_attempts, - backoff=self._retry_backoff, - ) - - -__all__ = ["StorageTask", "build_point", "ensure_collection", "process_qdrant_retry_queue"] diff --git a/mcp_plex/loader/storage/types.py b/mcp_plex/loader/storage/types.py deleted file mode 100644 index a59b0ed..0000000 --- a/mcp_plex/loader/storage/types.py +++ /dev/null @@ -1,9 +0,0 @@ -from __future__ import annotations - -from typing import List - -from qdrant_client import models - -StorageBatch = List[models.PointStruct] - -__all__ = ["StorageBatch"] diff --git a/mcp_plex/loader/storage/utils.py b/mcp_plex/loader/storage/utils.py deleted file mode 100644 index 8b31c66..0000000 --- a/mcp_plex/loader/storage/utils.py +++ /dev/null @@ -1,314 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -import warnings -from typing import Sequence - -from qdrant_client import models -from qdrant_client.async_qdrant_client import AsyncQdrantClient - -from mcp_plex.common.types import AggregatedItem - -from ..utils import is_local_qdrant - - -def format_primary_title(item: AggregatedItem) -> str: - """Format the primary title text for ``item``.""" - - primary_title = item.plex.title - if item.plex.type == "episode": - title_bits: list[str] = [] - if item.plex.show_title: - title_bits.append(item.plex.show_title) - se_parts: list[str] = [] - if item.plex.season_number is not None: - se_parts.append(f"S{item.plex.season_number:02d}") - if item.plex.episode_number is not None: - se_parts.append(f"E{item.plex.episode_number:02d}") - if se_parts: - title_bits.append("".join(se_parts)) - if item.plex.title: - title_bits.append(item.plex.title) - if title_bits: - primary_title = " - ".join(title_bits) - return primary_title - - -def build_point_text(item: AggregatedItem) -> str: - """Return the vector text for ``item``.""" - - parts = [ - format_primary_title(item), - item.plex.summary or "", - item.tmdb.overview if item.tmdb and hasattr(item.tmdb, "overview") else "", - item.imdb.plot if item.imdb else "", - ] - directors_text = ", ".join(p.tag for p in item.plex.directors if p.tag) - writers_text = ", ".join(p.tag for p in item.plex.writers if p.tag) - actors_text = ", ".join(p.tag for p in item.plex.actors if p.tag) - if directors_text: - parts.append(f"Directed by {directors_text}") - if writers_text: - parts.append(f"Written by {writers_text}") - if actors_text: - parts.append(f"Starring {actors_text}") - if item.plex.tagline: - parts.append(item.plex.tagline) - if item.tmdb and hasattr(item.tmdb, "tagline"): - tagline = getattr(item.tmdb, "tagline", None) - if tagline: - parts.append(tagline) - if item.tmdb and hasattr(item.tmdb, "reviews"): - parts.extend(r.get("content", "") for r in getattr(item.tmdb, "reviews", [])) - return "\n".join(p for p in parts if p) - - -def build_point_payload(item: AggregatedItem) -> dict[str, object]: - """Construct the Qdrant payload for ``item``.""" - - payload: dict[str, object] = { - "data": item.model_dump(mode="json"), - "title": item.plex.title, - "type": item.plex.type, - } - if item.plex.type == "episode": - if item.plex.show_title: - payload["show_title"] = item.plex.show_title - if item.plex.season_title: - payload["season_title"] = item.plex.season_title - if item.plex.season_number is not None: - payload["season_number"] = item.plex.season_number - if item.plex.episode_number is not None: - payload["episode_number"] = item.plex.episode_number - if item.plex.actors: - payload["actors"] = [p.tag for p in item.plex.actors if p.tag] - if item.plex.directors: - payload["directors"] = [p.tag for p in item.plex.directors if p.tag] - if item.plex.writers: - payload["writers"] = [p.tag for p in item.plex.writers if p.tag] - if item.plex.genres: - payload["genres"] = item.plex.genres - if item.plex.collections: - payload["collections"] = item.plex.collections - summary = item.plex.summary - if summary: - payload["summary"] = summary - overview = getattr(item.tmdb, "overview", None) if item.tmdb else None - if overview: - payload["overview"] = overview - plot = item.imdb.plot if item.imdb else None - if plot: - payload["plot"] = plot - taglines = [item.plex.tagline] - if item.tmdb and hasattr(item.tmdb, "tagline"): - taglines.append(getattr(item.tmdb, "tagline", None)) - taglines = [t for t in taglines if t] - if taglines: - payload["tagline"] = "\n".join(dict.fromkeys(taglines)) - if item.tmdb and hasattr(item.tmdb, "reviews"): - review_texts = [r.get("content", "") for r in getattr(item.tmdb, "reviews", [])] - review_texts = [r for r in review_texts if r] - if review_texts: - payload["reviews"] = review_texts - if item.plex.year is not None: - payload["year"] = item.plex.year - if item.plex.added_at is not None: - added = item.plex.added_at - if hasattr(added, "timestamp"): - payload["added_at"] = int(added.timestamp()) - return payload - - -def build_point( - item: AggregatedItem, - dense_model_name: str, - sparse_model_name: str, -) -> models.PointStruct: - """Build a Qdrant point for ``item`` using the configured model names.""" - - text = build_point_text(item) - payload = build_point_payload(item) - point_id: int | str = ( - int(item.plex.rating_key) - if item.plex.rating_key.isdigit() - else item.plex.rating_key - ) - return models.PointStruct( - id=point_id, - vector={ - "dense": models.Document(text=text, model=dense_model_name), - "sparse": models.Document(text=text, model=sparse_model_name), - }, - payload=payload, - ) - - -async def upsert_in_batches( - client: AsyncQdrantClient, - collection_name: str, - points: Sequence[models.PointStruct], - *, - batch_size: int, - retry_queue: asyncio.Queue[list[models.PointStruct]] | None = None, - logger: logging.Logger, -) -> None: - """Upsert points into Qdrant in batches, logging HTTP errors.""" - - total = len(points) - for i in range(0, total, batch_size): - batch = points[i : i + batch_size] - try: - await client.upsert(collection_name=collection_name, points=batch) - except Exception: - logger.exception( - "Failed to upsert batch %d-%d", i, i + len(batch) - ) - if retry_queue is not None: - await retry_queue.put(list(batch)) - else: - logger.info( - "Upserted %d/%d points", min(i + len(batch), total), total - ) - - -async def process_qdrant_retry_queue( - client: AsyncQdrantClient, - collection_name: str, - retry_queue: asyncio.Queue[list[models.PointStruct]], - logger: logging.Logger, - *, - max_attempts: int, - backoff: float, -) -> None: - """Retry failed Qdrant batches with exponential backoff.""" - - if retry_queue.empty(): - return - - pending = retry_queue.qsize() - logger.info("Retrying %d failed Qdrant batches", pending) - while not retry_queue.empty(): - batch = await retry_queue.get() - attempt = 1 - while attempt <= max_attempts: - try: - await client.upsert( - collection_name=collection_name, - points=batch, - ) - except Exception: - logger.exception( - "Retry %d/%d failed for Qdrant batch of %d points", - attempt, - max_attempts, - len(batch), - ) - attempt += 1 - if attempt > max_attempts: - logger.error( - "Giving up on Qdrant batch after %d attempts; %d points were not indexed", - max_attempts, - len(batch), - ) - break - await asyncio.sleep(backoff * attempt) - continue - else: - logger.info( - "Successfully retried Qdrant batch of %d points on attempt %d", - len(batch), - attempt, - ) - break - - -async def ensure_collection( - client: AsyncQdrantClient, - collection_name: str, - *, - dense_size: int, - dense_distance: models.Distance, - logger: logging.Logger, -) -> None: - """Create the collection and payload indexes if they do not already exist.""" - - created_collection = False - if not await client.collection_exists(collection_name): - await client.create_collection( - collection_name=collection_name, - vectors_config={"dense": models.VectorParams(size=dense_size, distance=dense_distance)}, - sparse_vectors_config={"sparse": models.SparseVectorParams()}, - ) - created_collection = True - - if not created_collection: - return - - suppress_payload_warning = is_local_qdrant(client) - - async def _create_index( - field_name: str, - field_schema: models.PayloadSchemaType | models.TextIndexParams, - ) -> None: - if suppress_payload_warning: - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message="Payload indexes have no effect in the local Qdrant.*", - category=UserWarning, - ) - await client.create_payload_index( - collection_name=collection_name, - field_name=field_name, - field_schema=field_schema, - ) - else: - await client.create_payload_index( - collection_name=collection_name, - field_name=field_name, - field_schema=field_schema, - ) - - text_index = models.TextIndexParams( - type=models.PayloadSchemaType.TEXT, - tokenizer=models.TokenizerType.WORD, - min_token_len=2, - lowercase=True, - ) - for field, schema in ( - ("title", text_index), - ("type", models.PayloadSchemaType.KEYWORD), - ("year", models.PayloadSchemaType.INTEGER), - ("added_at", models.PayloadSchemaType.INTEGER), - ("actors", models.PayloadSchemaType.KEYWORD), - ("directors", models.PayloadSchemaType.KEYWORD), - ("writers", models.PayloadSchemaType.KEYWORD), - ("genres", models.PayloadSchemaType.KEYWORD), - ("show_title", models.PayloadSchemaType.KEYWORD), - ("season_number", models.PayloadSchemaType.INTEGER), - ("episode_number", models.PayloadSchemaType.INTEGER), - ("collections", models.PayloadSchemaType.KEYWORD), - ("summary", text_index), - ("overview", text_index), - ("plot", text_index), - ("tagline", text_index), - ("reviews", text_index), - ("data.plex.rating_key", models.PayloadSchemaType.KEYWORD), - ("data.imdb.id", models.PayloadSchemaType.KEYWORD), - ("data.tmdb.id", models.PayloadSchemaType.INTEGER), - ): - await _create_index(field, schema) - - logger.info("Ensured collection %s exists", collection_name) - - -__all__ = [ - "build_point", - "build_point_payload", - "build_point_text", - "ensure_collection", - "format_primary_title", - "process_qdrant_retry_queue", - "upsert_in_batches", -] diff --git a/mcp_plex/loader/utils.py b/mcp_plex/loader/utils.py deleted file mode 100644 index 2350184..0000000 --- a/mcp_plex/loader/utils.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Shared utilities for the loader package.""" -from __future__ import annotations - -import asyncio -import inspect -import logging -from typing import AsyncIterator, Awaitable, List, Sequence, TypeVar - -from qdrant_client import models -from qdrant_client.async_qdrant_client import AsyncQdrantClient - -T = TypeVar("T") - - -_DENSE_MODEL_PARAMS: dict[str, tuple[int, models.Distance]] = { - "BAAI/bge-small-en-v1.5": (384, models.Distance.COSINE), - "BAAI/bge-base-en-v1.5": (768, models.Distance.COSINE), - "BAAI/bge-large-en-v1.5": (1024, models.Distance.COSINE), - "text-embedding-3-small": (1536, models.Distance.COSINE), - "text-embedding-3-large": (3072, models.Distance.COSINE), -} - - -def require_positive(value: int, *, name: str) -> int: - """Return *value* if positive, otherwise raise a ``ValueError``.""" - - if value <= 0: - raise ValueError(f"{name} must be positive") - return value - - -def close_coroutines(tasks: Sequence[Awaitable[object]]) -> None: - """Close coroutine objects to avoid unawaited warnings.""" - - for task in tasks: - if inspect.iscoroutine(task): - task.close() - - -logger = logging.getLogger("mcp_plex.loader") - - -async def iter_gather_in_batches( - tasks: Sequence[Awaitable[T]], batch_size: int -) -> AsyncIterator[T]: - """Yield results from awaitable tasks in fixed-size batches.""" - - try: - require_positive(batch_size, name="batch_size") - except ValueError: - close_coroutines(tasks) - raise - - total = len(tasks) - for i in range(0, total, batch_size): - batch = tasks[i : i + batch_size] - for result in await asyncio.gather(*batch): - yield result - logger.info("Processed %d/%d items", min(i + batch_size, total), total) - - -def resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]: - """Look up Qdrant vector parameters for a known dense embedding model.""" - - try: - return _DENSE_MODEL_PARAMS[model_name] - except KeyError as exc: - raise ValueError( - "Unknown dense embedding model " - f"'{model_name}'. Update _DENSE_MODEL_PARAMS with the model's size " - "and distance." - ) from exc - - -def is_local_qdrant(client: AsyncQdrantClient) -> bool: - """Return ``True`` if *client* targets an in-process Qdrant instance.""" - - inner = getattr(client, "_client", None) - return bool(inner) and inner.__class__.__module__.startswith( - "qdrant_client.local" - ) - - -async def gather_in_batches( - tasks: Sequence[Awaitable[T]], batch_size: int -) -> List[T]: - """Gather awaitable tasks in fixed-size batches.""" - - return [result async for result in iter_gather_in_batches(tasks, batch_size)] diff --git a/pyproject.toml b/pyproject.toml index 7f09cb5..59866ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.61" +version = "0.26.60" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_loader_logging.py b/tests/test_loader_logging.py index a37aa1d..e3740e9 100644 --- a/tests/test_loader_logging.py +++ b/tests/test_loader_logging.py @@ -34,7 +34,7 @@ def test_run_logs_upsert(monkeypatch, caplog): with caplog.at_level(logging.INFO, logger="mcp_plex.loader"): asyncio.run(loader.run(None, None, None, sample_dir, None, None)) assert "Loaded 2 items" in caplog.text - assert "Storage worker" in caplog.text + assert "Upsert worker" in caplog.text assert "handling 2 points" in caplog.text assert "processed 2 items" in caplog.text diff --git a/uv.lock b/uv.lock index 5bfd57b..d7b7214 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.61" +version = "0.26.60" source = { editable = "." } dependencies = [ { name = "fastapi" },