Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "0.26.66"
version = "0.26.68"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
165 changes: 83 additions & 82 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import warnings
from pathlib import Path
from typing import (
Any,
AsyncIterator,
Awaitable,
List,
Expand Down Expand Up @@ -73,6 +74,9 @@
_imdb_backoff: float = 1.0
_imdb_retry_queue: "_IMDbRetryQueue" | None = None
_imdb_batch_limit: int = 5
_imdb_requests_per_window: int | None = None
_imdb_window_seconds: float = 1.0
_imdb_throttle: Any = None
_qdrant_batch_size: int = 1000
_qdrant_upsert_buffer_size: int = 200
_qdrant_max_concurrent_upserts: int = 4
Expand Down Expand Up @@ -159,98 +163,55 @@ def _resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]:
) from exc


def _get_imdb_throttle() -> Any:
"""Return the shared IMDb rate limiter instance if configured."""

global _imdb_throttle
if _imdb_requests_per_window is None:
return None
from .pipeline import enrichment as enrichment_mod

if _imdb_throttle is None:
_imdb_throttle = enrichment_mod._RequestThrottler(
limit=_imdb_requests_per_window,
interval=float(_imdb_window_seconds),
)
return _imdb_throttle


async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbTitle]:
"""Fetch metadata for an IMDb ID with caching and retry logic."""
"""Fetch metadata for an IMDb ID with caching, retry, and throttling."""

if _imdb_cache:
cached = _imdb_cache.get(imdb_id)
if cached:
return IMDbTitle.model_validate(cached)
from .pipeline import enrichment as enrichment_mod

url = f"https://api.imdbapi.dev/titles/{imdb_id}"
delay = _imdb_backoff
for attempt in range(_imdb_max_retries + 1):
try:
resp = await client.get(url)
except httpx.HTTPError:
logger.exception("HTTP error fetching IMDb ID %s", imdb_id)
return None
if resp.status_code == 429:
if attempt == _imdb_max_retries:
if _imdb_retry_queue is not None:
await _imdb_retry_queue.put(imdb_id)
return None
await asyncio.sleep(delay)
delay *= 2
continue
if resp.is_success:
data = resp.json()
if _imdb_cache:
_imdb_cache.set(imdb_id, data)
return IMDbTitle.model_validate(data)
return None
return None
return await enrichment_mod._fetch_imdb(
client,
imdb_id,
cache=_imdb_cache,
throttle=_get_imdb_throttle(),
max_retries=_imdb_max_retries,
backoff=_imdb_backoff,
retry_queue=_imdb_retry_queue,
)


async def _fetch_imdb_batch(
client: httpx.AsyncClient, imdb_ids: Sequence[str]
) -> dict[str, Optional[IMDbTitle]]:
"""Fetch metadata for multiple IMDb IDs, batching requests."""

results: dict[str, Optional[IMDbTitle]] = {}
ids_to_fetch: list[str] = []
for imdb_id in imdb_ids:
if _imdb_cache:
cached = _imdb_cache.get(imdb_id)
if cached:
results[imdb_id] = IMDbTitle.model_validate(cached)
continue
ids_to_fetch.append(imdb_id)

if not ids_to_fetch:
return results
from .pipeline import enrichment as enrichment_mod

url = "https://api.imdbapi.dev/titles:batchGet"
for i in range(0, len(ids_to_fetch), _imdb_batch_limit):
chunk = ids_to_fetch[i : i + _imdb_batch_limit]
params = [("titleIds", imdb_id) for imdb_id in chunk]
delay = _imdb_backoff
for attempt in range(_imdb_max_retries + 1):
try:
resp = await client.get(url, params=params)
except httpx.HTTPError:
logger.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk))
for imdb_id in chunk:
results[imdb_id] = None
break
if resp.status_code == 429:
if attempt == _imdb_max_retries:
if _imdb_retry_queue is not None:
for imdb_id in chunk:
await _imdb_retry_queue.put(imdb_id)
for imdb_id in chunk:
results[imdb_id] = None
break
await asyncio.sleep(delay)
delay *= 2
continue
if resp.is_success:
data = resp.json()
found: set[str] = set()
for title_data in data.get("titles", []):
imdb_title = IMDbTitle.model_validate(title_data)
results[imdb_title.id] = imdb_title
found.add(imdb_title.id)
if _imdb_cache:
_imdb_cache.set(imdb_title.id, title_data)
for missing in set(chunk) - found:
results[missing] = None
break
for imdb_id in chunk:
results[imdb_id] = None
break

return results
return await enrichment_mod._fetch_imdb_batch(
client,
imdb_ids,
cache=_imdb_cache,
throttle=_get_imdb_throttle(),
max_retries=_imdb_max_retries,
backoff=_imdb_backoff,
retry_queue=_imdb_retry_queue,
batch_limit=_imdb_batch_limit,
)


def _load_imdb_retry_queue(path: Path) -> None:
Expand Down Expand Up @@ -1396,17 +1357,32 @@ async def run(
imdb_max_retries: int = 3,
imdb_backoff: float = 1.0,
imdb_queue_path: Path | None = None,
imdb_requests_per_window: int | None = None,
imdb_window_seconds: float = 1.0,
upsert_buffer_size: int = _qdrant_upsert_buffer_size,
plex_chunk_size: int = 200,
enrichment_batch_size: int = 100,
enrichment_workers: int = 4,
) -> None:
"""Core execution logic for the CLI."""

global _imdb_cache, _imdb_max_retries, _imdb_backoff, _imdb_retry_queue
global _imdb_cache
global _imdb_max_retries
global _imdb_backoff
global _imdb_retry_queue
global _imdb_requests_per_window
global _imdb_window_seconds
global _imdb_throttle
_imdb_cache = IMDbCache(imdb_cache_path) if imdb_cache_path else None
_imdb_max_retries = imdb_max_retries
_imdb_backoff = imdb_backoff
if imdb_requests_per_window is not None:
_require_positive(imdb_requests_per_window, name="imdb_requests_per_window")
if imdb_window_seconds <= 0:
raise ValueError("imdb_window_seconds must be positive")
_imdb_requests_per_window = imdb_requests_per_window
_imdb_window_seconds = imdb_window_seconds
_imdb_throttle = None
if imdb_queue_path:
_load_imdb_retry_queue(imdb_queue_path)
async with httpx.AsyncClient(timeout=30) as client:
Expand Down Expand Up @@ -1676,6 +1652,23 @@ async def run(
show_default=True,
help="Initial backoff delay in seconds for IMDb retries",
)
@click.option(
"--imdb-requests-per-window",
envvar="IMDB_REQUESTS_PER_WINDOW",
show_envvar=True,
type=int,
default=None,
help="Maximum IMDb requests per rate-limit window (set to disable)",
)
@click.option(
"--imdb-window-seconds",
envvar="IMDB_WINDOW_SECONDS",
show_envvar=True,
type=float,
default=1.0,
show_default=True,
help="Duration in seconds for the IMDb rate-limit window",
)
@click.option(
"--imdb-queue",
envvar="IMDB_QUEUE",
Expand Down Expand Up @@ -1708,6 +1701,8 @@ def main(
imdb_cache: Path,
imdb_max_retries: int,
imdb_backoff: float,
imdb_requests_per_window: Optional[int],
imdb_window_seconds: float,
imdb_queue: Path,
) -> None:
"""Entry-point for the ``load-data`` script."""
Expand All @@ -1732,6 +1727,8 @@ def main(
imdb_cache,
imdb_max_retries,
imdb_backoff,
imdb_requests_per_window,
imdb_window_seconds,
imdb_queue,
upsert_buffer_size,
plex_chunk_size,
Expand Down Expand Up @@ -1760,6 +1757,8 @@ async def load_media(
imdb_cache: Path,
imdb_max_retries: int,
imdb_backoff: float,
imdb_requests_per_window: Optional[int],
imdb_window_seconds: float,
imdb_queue: Path,
upsert_buffer_size: int,
plex_chunk_size: int,
Expand All @@ -1786,6 +1785,8 @@ async def load_media(
imdb_cache,
imdb_max_retries,
imdb_backoff,
imdb_requests_per_window,
imdb_window_seconds,
imdb_queue,
upsert_buffer_size,
plex_chunk_size,
Expand Down
Loading