Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "0.26.63"
version = "0.26.64"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
78 changes: 76 additions & 2 deletions mcp_plex/loader/pipeline/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
from typing import Sequence

from ...common.types import AggregatedItem
from .channels import IngestQueue, SampleBatch, chunk_sequence
from .channels import (
EpisodeBatch,
IngestQueue,
MovieBatch,
SampleBatch,
chunk_sequence,
_chunk_sequence,
)


class IngestionStage:
Expand Down Expand Up @@ -91,9 +98,76 @@ async def _run_plex_ingestion(self) -> None:
if self._plex_server is None:
self._logger.warning("Plex server unavailable; skipping ingestion.")
else:
self._logger.info("Plex ingestion pending implementation.")
await self._ingest_plex(
plex_server=self._plex_server,
movie_batch_size=self._movie_batch_size,
episode_batch_size=self._episode_batch_size,
output_queue=self._output_queue,
logger=self._logger,
)
await asyncio.sleep(0)

async def _ingest_plex(
self,
*,
plex_server: object,
movie_batch_size: int,
episode_batch_size: int,
output_queue: IngestQueue,
logger: logging.Logger,
) -> None:
"""Retrieve Plex media and place batches onto *output_queue*."""

movies_attr = getattr(plex_server, "movies", [])
movies_source = movies_attr() if callable(movies_attr) else movies_attr
movies = list(movies_source)
for batch_index, chunk in enumerate(
_chunk_sequence(movies, movie_batch_size), start=1
):
batch_movies = list(chunk)
if not batch_movies:
continue

batch = MovieBatch(movies=batch_movies)
await output_queue.put(batch)
self._items_ingested += len(batch_movies)
self._batches_ingested += 1
logger.info(
"Queued Plex movie batch %d with %d movies (total items=%d).",
batch_index,
len(batch_movies),
self._items_ingested,
)

shows_attr = getattr(plex_server, "shows", [])
shows_source = shows_attr() if callable(shows_attr) else shows_attr
shows = list(shows_source)
for show in shows:
show_title = getattr(show, "title", str(show))
episodes_attr = getattr(show, "episodes", [])
episodes_source = (
episodes_attr() if callable(episodes_attr) else episodes_attr
)
episodes = list(episodes_source)
for batch_index, chunk in enumerate(
_chunk_sequence(episodes, episode_batch_size), start=1
):
batch_episodes = list(chunk)
if not batch_episodes:
continue

batch = EpisodeBatch(show=show, episodes=batch_episodes)
await output_queue.put(batch)
self._items_ingested += len(batch_episodes)
self._batches_ingested += 1
logger.info(
"Queued Plex episode batch %d for %s with %d episodes (total items=%d).",
batch_index,
show_title,
len(batch_episodes),
self._items_ingested,
)

async def _enqueue_sample_batches(
self, items: Sequence[AggregatedItem]
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.63"
version = "0.26.64"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
100 changes: 99 additions & 1 deletion tests/test_ingestion_stage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import asyncio
import logging

from mcp_plex.common.types import AggregatedItem, PlexItem
from mcp_plex.loader.pipeline.channels import INGEST_DONE, SampleBatch
from mcp_plex.loader.pipeline.channels import (
INGEST_DONE,
EpisodeBatch,
MovieBatch,
SampleBatch,
)
from mcp_plex.loader.pipeline.ingestion import IngestionStage


Expand Down Expand Up @@ -146,3 +152,95 @@ async def scenario() -> tuple[list[SampleBatch], object | None, object | None, i
assert second_token is sentinel
assert items_ingested == 2
assert batches_ingested == 2


def test_ingestion_stage_ingest_plex_batches_movies_and_episodes(caplog) -> None:
caplog.set_level(logging.INFO)

class FakeMovie:
def __init__(self, title: str) -> None:
self.title = title

class FakeEpisode:
def __init__(self, title: str) -> None:
self.title = title

class FakeShow:
def __init__(self, title: str, episode_titles: list[str]) -> None:
self.title = title
self._episodes = [FakeEpisode(ep_title) for ep_title in episode_titles]

def episodes(self) -> list[FakeEpisode]:
return list(self._episodes)

class FakePlex:
def __init__(self) -> None:
self._movies = [
FakeMovie("Movie 1"),
FakeMovie("Movie 2"),
FakeMovie("Movie 3"),
]
self._shows = [
FakeShow("Show A", ["S01E01", "S01E02", "S01E03"]),
FakeShow("Show B", ["S01E01", "S01E02"]),
]

def movies(self) -> list[FakeMovie]:
return list(self._movies)

def shows(self) -> list[FakeShow]:
return list(self._shows)

sentinel = object()

async def scenario() -> tuple[list[object], int, int]:
queue: asyncio.Queue = asyncio.Queue()
plex = FakePlex()
stage = IngestionStage(
plex_server=plex,
sample_items=None,
movie_batch_size=2,
episode_batch_size=2,
sample_batch_size=10,
output_queue=queue,
completion_sentinel=sentinel,
)

await stage._ingest_plex(
plex_server=plex,
movie_batch_size=2,
episode_batch_size=2,
output_queue=queue,
logger=stage.logger,
)

batches: list[object] = []
while not queue.empty():
batches.append(await queue.get())

return batches, stage.items_ingested, stage.batches_ingested

batches, items_ingested, batches_ingested = asyncio.run(scenario())

assert items_ingested == 8
assert batches_ingested == 5

assert len(batches) == 5
assert isinstance(batches[0], MovieBatch)
assert [movie.title for movie in batches[0].movies] == ["Movie 1", "Movie 2"]
assert isinstance(batches[1], MovieBatch)
assert [movie.title for movie in batches[1].movies] == ["Movie 3"]
assert isinstance(batches[2], EpisodeBatch)
assert [episode.title for episode in batches[2].episodes] == ["S01E01", "S01E02"]
assert isinstance(batches[3], EpisodeBatch)
assert [episode.title for episode in batches[3].episodes] == ["S01E03"]
assert isinstance(batches[4], EpisodeBatch)
assert [episode.title for episode in batches[4].episodes] == ["S01E01", "S01E02"]

assert caplog.messages == [
"Queued Plex movie batch 1 with 2 movies (total items=2).",
"Queued Plex movie batch 2 with 1 movies (total items=3).",
"Queued Plex episode batch 1 for Show A with 2 episodes (total items=5).",
"Queued Plex episode batch 2 for Show A with 1 episodes (total items=6).",
"Queued Plex episode batch 1 for Show B with 2 episodes (total items=8).",
]
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.