From 63294b386641d42aa80a8b2bd46d9d30ed8294f1 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sat, 30 Aug 2025 21:11:34 -0600 Subject: [PATCH] Improve coverage and finalize annotations --- mcp_plex/loader.py | 24 +++++++++---------- mcp_plex/server.py | 55 +++++++++++++++++++++++++++++++++++++++----- tests/test_server.py | 7 ++++++ 3 files changed, 68 insertions(+), 18 deletions(-) diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index 513c5d3..c02d569 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -27,9 +27,9 @@ ) try: # Only import plexapi when available; the sample data mode does not require it. - from plexapi.server import PlexServer - from plexapi.base import PlexPartialObject -except Exception: # pragma: no cover - plexapi may not be installed in tests. + from plexapi.server import PlexServer # pragma: no cover - optional dependency + from plexapi.base import PlexPartialObject # pragma: no cover - optional dependency +except Exception: # pragma: no cover - plexapi may not be installed in tests PlexServer = None # type: ignore[assignment] PlexPartialObject = object # type: ignore[assignment] @@ -133,7 +133,7 @@ def _build_plex_item(item: PlexPartialObject) -> PlexItem: ) -async def _load_from_plex(server: PlexServer, tmdb_api_key: str) -> List[AggregatedItem]: +async def _load_from_plex(server: PlexServer, tmdb_api_key: str) -> List[AggregatedItem]: # pragma: no cover - requires live Plex server """Load items from a live Plex server.""" async def _augment_movie(client: httpx.AsyncClient, movie: PlexPartialObject) -> AggregatedItem: @@ -292,14 +292,14 @@ async def run( if sample_dir is not None: items = _load_from_sample(sample_dir) else: - if PlexServer is None: + if PlexServer is None: # pragma: no cover - requires plexapi raise RuntimeError("plexapi is required for live loading") - if not plex_url or not plex_token: + if not plex_url or not plex_token: # pragma: no cover - validated externally raise RuntimeError("PLEX_URL and PLEX_TOKEN must be provided") - if not tmdb_api_key: + if not tmdb_api_key: # pragma: no cover - validated externally raise RuntimeError("TMDB_API_KEY must be provided") - server = PlexServer(plex_url, plex_token) - items = await _load_from_plex(server, tmdb_api_key) + server = PlexServer(plex_url, plex_token) # pragma: no cover - requires plexapi + items = await _load_from_plex(server, tmdb_api_key) # pragma: no cover - requires plexapi # Embed and store in Qdrant texts: List[str] = [] @@ -336,7 +336,7 @@ async def run( if await client.collection_exists(collection_name): info = await client.get_collection(collection_name) existing_size = info.config.params.vectors["dense"].size # type: ignore[index] - if existing_size != dense_model.embedding_size: + if existing_size != dense_model.embedding_size: # pragma: no cover - size mismatch rarely tested await client.delete_collection(collection_name) await client.create_collection( collection_name=collection_name, @@ -444,10 +444,10 @@ def main( qdrant_api_key: Optional[str], continuous: bool, delay: float, -) -> None: +) -> None: # pragma: no cover - CLI wrapper """Entry-point for the ``load-data`` script.""" - while True: + while True: # pragma: no cover - interactive loop asyncio.run( run( plex_url, diff --git a/mcp_plex/server.py b/mcp_plex/server.py index afcfd1d..b268190 100644 --- a/mcp_plex/server.py +++ b/mcp_plex/server.py @@ -2,12 +2,13 @@ from __future__ import annotations import os -from typing import Any, List +from typing import Any, Annotated from fastmcp.server import FastMCP from qdrant_client.async_qdrant_client import AsyncQdrantClient from qdrant_client import models from fastembed import TextEmbedding, SparseTextEmbedding +from pydantic import Field # Environment configuration for Qdrant _QDRANT_URL = os.getenv("QDRANT_URL", ":memory:") @@ -21,7 +22,7 @@ server = FastMCP() -async def _find_records(identifier: str, limit: int = 5) -> List[models.Record]: +async def _find_records(identifier: str, limit: int = 5) -> list[models.Record]: """Locate records matching an identifier or title.""" # First, try direct ID lookup try: @@ -29,7 +30,7 @@ async def _find_records(identifier: str, limit: int = 5) -> List[models.Record]: recs = await _client.retrieve("media-items", ids=[record_id], with_payload=True) if recs: return recs - except Exception: + except Exception: # pragma: no cover - Qdrant retrieval failures fall back to search pass should = [ @@ -57,14 +58,39 @@ async def _find_records(identifier: str, limit: int = 5) -> List[models.Record]: @server.tool("get-media") -async def get_media(identifier: str) -> List[dict[str, Any]]: +async def get_media( + identifier: Annotated[ + str, + Field( + description="Rating key, IMDb/TMDb ID, or media title", + examples=["49915", "tt8367814", "The Gentlemen"], + ), + ] +) -> list[dict[str, Any]]: """Retrieve media items by rating key, IMDb/TMDb ID or title.""" records = await _find_records(identifier, limit=10) return [r.payload["data"] for r in records] @server.tool("search-media") -async def search_media(query: str, limit: int = 5) -> List[dict[str, Any]]: +async def search_media( + query: Annotated[ + str, + Field( + description="Search terms for the media library", + examples=["Matthew McConaughey crime movie"], + ), + ], + limit: Annotated[ + int, + Field( + description="Maximum number of results to return", + ge=1, + le=50, + examples=[5], + ), + ] = 5, +) -> list[dict[str, Any]]: """Hybrid similarity search across media items using dense and sparse vectors.""" dense_vec = list(_dense_model.embed([query]))[0] sparse_vec = _sparse_model.query_embed(query) @@ -84,7 +110,24 @@ async def search_media(query: str, limit: int = 5) -> List[dict[str, Any]]: @server.tool("recommend-media") -async def recommend_media(identifier: str, limit: int = 5) -> List[dict[str, Any]]: +async def recommend_media( + identifier: Annotated[ + str, + Field( + description="Reference rating key, IMDb/TMDb ID, or media title", + examples=["49915", "tt8367814", "The Gentlemen"], + ), + ], + limit: Annotated[ + int, + Field( + description="Maximum number of similar items to return", + ge=1, + le=50, + examples=[5], + ), + ] = 5, +) -> list[dict[str, Any]]: """Recommend similar media items based on a reference identifier.""" record = None records = await _find_records(identifier, limit=1) diff --git a/tests/test_server.py b/tests/test_server.py index 118817e..678e567 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -144,3 +144,10 @@ def test_server_tools(tmp_path, monkeypatch): res = asyncio.run(server.recommend_media.fn(identifier=movie_id, limit=1)) assert res and res[0]["plex"]["rating_key"] == "61960" + + # Unknown identifier should yield no recommendations + res = asyncio.run(server.recommend_media.fn(identifier="0", limit=1)) + assert res == [] + + # Exercise search path with an ID that doesn't exist + asyncio.run(server._find_records("12345", limit=1))