Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions mcp_plex/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import json
import logging
import sys
from collections import deque
from pathlib import Path
from typing import Awaitable, List, Optional, Sequence, TypeVar
from typing import Awaitable, Iterable, List, Optional, Sequence, TypeVar

import click
import httpx
Expand Down Expand Up @@ -43,10 +44,42 @@
_imdb_cache: IMDbCache | None = None
_imdb_max_retries: int = 3
_imdb_backoff: float = 1.0
_imdb_retry_queue: asyncio.Queue[str] | None = None
_imdb_retry_queue: "_IMDbRetryQueue" | None = None
_imdb_batch_limit: int = 5
_qdrant_batch_size: int = 1000


class _IMDbRetryQueue(asyncio.Queue[str]):
"""Queue that tracks items in a deque for safe serialization."""

def __init__(self, initial: Iterable[str] | None = None):
super().__init__()
self._items: deque[str] = deque()
if initial:
for imdb_id in initial:
imdb_id_str = str(imdb_id)
super().put_nowait(imdb_id_str)
self._items.append(imdb_id_str)

def put_nowait(self, item: str) -> None: # type: ignore[override]
super().put_nowait(item)
self._items.append(item)

def get_nowait(self) -> str: # type: ignore[override]
if not self._items:
raise RuntimeError("Desynchronization: Queue is not empty but self._items is empty.")
try:
item = super().get_nowait()
except asyncio.QueueEmpty:
raise RuntimeError("Desynchronization: self._items is not empty but asyncio.Queue is empty.")
self._items.popleft()
return item

def snapshot(self) -> list[str]:
"""Return a list of the current queue contents."""

return list(self._items)

# Known Qdrant-managed dense embedding models with their dimensionality and
# similarity metric. To support a new server-side embedding model, add an entry
# here with the appropriate vector size and `models.Distance` value.
Expand Down Expand Up @@ -182,14 +215,20 @@ def _load_imdb_retry_queue(path: Path) -> None:
"""Populate the retry queue from a JSON file if it exists."""

global _imdb_retry_queue
_imdb_retry_queue = asyncio.Queue()
ids: list[str] = []
if path.exists():
try:
ids = json.loads(path.read_text())
for imdb_id in ids:
_imdb_retry_queue.put_nowait(str(imdb_id))
data = json.loads(path.read_text())
if isinstance(data, list):
ids = [str(imdb_id) for imdb_id in data]
else:
logger.warning(
"IMDb retry queue file %s did not contain a list; ignoring its contents",
path,
)
except Exception:
logger.exception("Failed to load IMDb retry queue from %s", path)
_imdb_retry_queue = _IMDbRetryQueue(ids)


async def _process_imdb_retry_queue(client: httpx.AsyncClient) -> None:
Expand All @@ -210,8 +249,7 @@ def _persist_imdb_retry_queue(path: Path) -> None:

if _imdb_retry_queue is None:
return
ids = list(_imdb_retry_queue._queue) # type: ignore[attr-defined]
path.write_text(json.dumps(ids))
path.write_text(json.dumps(_imdb_retry_queue.snapshot()))


async def _upsert_in_batches(
Expand Down Expand Up @@ -598,7 +636,7 @@ async def run(
async with httpx.AsyncClient(timeout=30) as client:
await _process_imdb_retry_queue(client)
else:
_imdb_retry_queue = asyncio.Queue()
_imdb_retry_queue = _IMDbRetryQueue()

items: List[AggregatedItem]
if sample_dir is not None:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.28"
version = "0.26.29"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
7 changes: 5 additions & 2 deletions tests/test_loader_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ async def first_run():

async def second_run():
_load_imdb_retry_queue(queue_path)
assert loader._imdb_retry_queue is not None
assert loader._imdb_retry_queue.qsize() == 1
assert loader._imdb_retry_queue.snapshot() == ["tt0111161"]
async with httpx.AsyncClient(transport=httpx.MockTransport(second_transport)) as client:
await _process_imdb_retry_queue(client)
_persist_imdb_retry_queue(queue_path)
Expand All @@ -388,8 +391,7 @@ def test_load_imdb_retry_queue_invalid_json(tmp_path):


def test_process_imdb_retry_queue_requeues(monkeypatch):
queue: asyncio.Queue[str] = asyncio.Queue()
queue.put_nowait("tt0111161")
queue = loader._IMDbRetryQueue(["tt0111161"])
monkeypatch.setattr(loader, "_imdb_retry_queue", queue)

async def fake_fetch(client, imdb_id):
Expand All @@ -403,6 +405,7 @@ async def run_test():

asyncio.run(run_test())
assert queue.qsize() == 1
assert queue.snapshot() == ["tt0111161"]


def test_resolve_tmdb_season_number_matches_name():
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.