Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "2.0.18"
version = "2.0.19"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
24 changes: 24 additions & 0 deletions docs/loader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Loader Operations

The loader emits structured log records throughout ingestion so operators can
monitor progress without scraping stdout dumps.

## Qdrant retry summary

After every run the loader processes the in-memory Qdrant retry queue and
reports the outcome via a structured ``INFO`` log:

```
Qdrant retry summary
```

The log record is tagged with ``event="qdrant_retry_summary"`` and includes two
integer attributes:

- ``succeeded_points`` – number of points that were reindexed successfully after
retrying.
- ``failed_points`` – number of points that still failed after exhausting all
retry attempts and therefore remain missing from the collection.

Use your logging aggregator or ``caplog`` when testing to filter on the
``qdrant_retry_summary`` event and confirm ingestion health.
10 changes: 9 additions & 1 deletion mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,20 @@ async def run(
if not items:
logger.info("No points to upsert")

await _process_qdrant_retry_queue(
succeeded_points, failed_points = await _process_qdrant_retry_queue(
client,
collection_name,
qdrant_retry_queue,
config=qdrant_config,
)
logger.info(
"Qdrant retry summary",
extra={
"event": "qdrant_retry_summary",
"succeeded_points": succeeded_points,
"failed_points": failed_points,
},
)

if imdb_queue_path:
_persist_imdb_retry_queue(imdb_queue_path, imdb_config.retry_queue)
Expand Down
33 changes: 21 additions & 12 deletions mcp_plex/loader/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,24 @@ async def _process_qdrant_retry_queue(
retry_queue: asyncio.Queue[list[models.PointStruct]],
*,
config: "QdrantRuntimeConfig",
) -> None:
"""Retry failed Qdrant batches with exponential backoff."""
) -> tuple[int, int]:
"""Retry failed Qdrant batches with exponential backoff.

Returns a tuple containing the number of points that were retried successfully
and the number that still failed after exhausting ``config.retry_attempts``.
"""

if retry_queue.empty():
return
return 0, 0

pending = retry_queue.qsize()
logger.info("Retrying %d failed Qdrant batches", pending)
succeeded_points = 0
failed_points = 0
while not retry_queue.empty():
batch = await retry_queue.get()
attempt = 1
while attempt <= config.retry_attempts:
batch_size = len(batch)
for attempt in range(1, config.retry_attempts + 1):
try:
await client.upsert(
collection_name=collection_name,
Expand All @@ -331,26 +337,29 @@ async def _process_qdrant_retry_queue(
"Retry %d/%d failed for Qdrant batch of %d points",
attempt,
config.retry_attempts,
len(batch),
batch_size,
)
attempt += 1
if attempt > config.retry_attempts:
if attempt == config.retry_attempts:
logger.error(
"Giving up on Qdrant batch after %d attempts; %d points were not indexed",
config.retry_attempts,
len(batch),
batch_size,
)
failed_points += batch_size
break
await asyncio.sleep(config.retry_backoff * attempt)
continue

next_attempt = attempt + 1
await asyncio.sleep(config.retry_backoff * next_attempt)
else:
logger.info(
"Successfully retried Qdrant batch of %d points on attempt %d",
len(batch),
batch_size,
attempt,
)
succeeded_points += batch_size
break

return succeeded_points, failed_points

__all__ = [
"_DENSE_MODEL_PARAMS",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "2.0.18"
version = "2.0.19"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
24 changes: 24 additions & 0 deletions tests/test_loader_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ def test_run_logs_no_points(monkeypatch, caplog):
assert "Ingestion stage finished" in caplog.text


def test_run_logs_qdrant_retry_summary(monkeypatch, caplog):
monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient)
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"

async def fake_process_retry_queue(*args, **kwargs):
return 7, 3

monkeypatch.setattr(loader, "_process_qdrant_retry_queue", fake_process_retry_queue)

with caplog.at_level(logging.INFO):
asyncio.run(loader.run(None, None, None, sample_dir, None, None))

summary_records = [
record
for record in caplog.records
if record.levelno == logging.INFO
and getattr(record, "event", None) == "qdrant_retry_summary"
]
assert summary_records, "Expected a qdrant retry summary log record"
record = summary_records[-1]
assert getattr(record, "succeeded_points", None) == 7
assert getattr(record, "failed_points", None) == 3


def test_run_rejects_invalid_upsert_buffer_size(monkeypatch):
monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient)
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.