Add multi-worker serve mode for Sendspin Party#199
Conversation
asyncio.wait_for + run_in_executor with a blocking queue.get() leaves orphaned threads that consume messages and discard them when the future is cancelled. Use queue.get(timeout=0.5) instead so the thread exits cleanly on timeout.
The web player on a worker port (e.g. :9002) fetches the coordinator's /api/status (e.g. :9000) which is cross-origin. Add Access-Control-Allow-Origin: * to allow the request.
Cancel the running task on SIGINT so it breaks out of blocking audio decode. Ignore repeated Ctrl+C after the first one.
When the last client disconnects from a worker, the stream is stopped. Catch StreamStoppedError and clear the stream reference so the worker skips audio chunks until a new client connects and creates a fresh stream.
- Add worker crash detection: coordinator checks process liveness every 30s and removes dead workers from the redirect pool. Shuts down if all workers crash. - Validate --workers >= 1 in CLI - Add CORS header to single-worker /api/status for consistency - Use _queue.Empty instead of bare Exception in _drain_status_queue - Remove unused TYPE_CHECKING import and empty pass block - Redirect handler now uses live _active_worker_ports (reflects crash removal) - Frontend: hide listener count when stopped, show min 1 when listening
Track already-reported crashes so _check_worker_health doesn't print the same crash message every 30s for workers that are already dead.
Clear _active_group alongside _stream on StreamStoppedError so that the next connecting client creates a fresh group and stream instead of being added to a stale group with no active stream.
The coordinator was pacing itself to real-time (sleeping when ahead > 0), resulting in only ~250ms of buffer on workers. Allow up to 5s of buffer ahead, matching single-worker mode's max_buffer_us=5_000_000.
Workers now use consecutive ports starting at --port instead of --port+1.
Prevents reconnect race where a client joins a dead group whose stream was already stopped by aiosendspin.
Multi-worker mode does not support outbound client connections. Previously the --client flag was silently ignored.
Track successful vs failed workers separately. Exit if all workers fail. Remove dead audio queues for failed workers.
Previously only shut down when all workers crashed.
Coordinator now loops between waiting for clients and streaming, instead of continuously decoding when nobody is listening.
|
Bug: Fix: Add """Custom SendspinServer with embedded web player."""
from __future__ import annotations
from importlib.resources import files |
There was a problem hiding this comment.
Pull request overview
Adds a multi-worker execution mode to sendspin serve, introducing a coordinator/worker model where the coordinator decodes once and fans out timestamped PCM to multiple worker HTTP/WebSocket servers, and the embedded web UI can display total listener count across workers.
Changes:
- Add
--workerstosendspin serveand route to a new multi-worker coordinator (run_server_multi). - Introduce coordinator/worker subprocess implementation plus IPC message types for audio and status reporting.
- Add
/api/statusendpoint and update embedded web UI to poll and display listener count; add tests for IPC/coordinator/worker behavior.
Reviewed changes
Copilot reviewed 14 out of 15 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
sendspin/cli.py |
Adds --workers flag and dispatches to multi-worker serve mode; rejects --client when --workers > 1. |
sendspin/serve/__init__.py |
Adds run_server_multi() entrypoint to run the coordinator. |
sendspin/serve/coordinator.py |
New coordinator that spawns workers, decodes audio, fans out PCM chunks, aggregates listener counts, and handles shutdown/health. |
sendspin/serve/worker.py |
New worker subprocess that runs a server on an assigned port, manages client groups/streams, and consumes audio IPC. |
sendspin/serve/ipc.py |
New picklable dataclasses for coordinator↔worker IPC. |
sendspin/serve/server.py |
Adds /api/status endpoint and optional shared total listener counter support. |
sendspin/serve/web/index.html |
Adds listener count UI element. |
sendspin/serve/web/styles.css |
Styles listener count element and visibility transitions. |
sendspin/serve/web/app.js |
Polls /api/status while listening and updates/hides the listener count UI. |
tests/serve/test_ipc.py |
Validates IPC message types are picklable. |
tests/serve/test_coordinator.py |
Unit tests for coordinator listener aggregation and worker startup/crash handling. |
tests/serve/test_worker.py |
Unit tests for worker startup signaling, chunk processing, and disconnect cleanup behavior. |
tests/serve/test_integration.py |
Spawns workers and validates /api/status responds on each worker port. |
README.md |
Documents multi-worker serve mode and reverse-proxy/load-balancer guidance. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async def _wait_for_workers_listening(self) -> int: | ||
| """Wait for all workers to report status. Returns count of healthy workers.""" | ||
| loop = asyncio.get_running_loop() | ||
| listening_count = 0 | ||
| error_count = 0 | ||
| failed_workers: set[int] = set() | ||
|
|
||
| while (listening_count + error_count) < self.workers: | ||
| msg = await loop.run_in_executor(None, self._status_queue.get) | ||
|
|
There was a problem hiding this comment.
_wait_for_workers_listening() waits on status_queue.get() with no timeout and no worker liveness checks. If a worker dies before it can post WorkerListening/WorkerError (e.g., import error, hard crash), the coordinator can hang indefinitely (and tests that call this can hang as well). Consider adding a startup timeout and/or periodically checking Process.is_alive() to fail fast and mark that worker as failed.
| """Entry point for the worker subprocess.""" | ||
| logging.basicConfig( | ||
| level=getattr(logging, log_level.upper(), logging.INFO), | ||
| format=f"%(asctime)s %(levelname)s [W{worker_id}] %(message)s", | ||
| ) | ||
| worker = ServeWorker( |
There was a problem hiding this comment.
Multi-worker mode doesn’t apply the Windows ConnectionResetError suppression that single-worker run_server() uses (sendspin/serve/__init__.py sets a custom exception handler on win32). Since workers use asyncio.run() with the default event loop, client disconnects may reintroduce noisy WinError 10054 logs in multi-worker mode. Consider setting the same exception handler (or equivalent) in worker_main on Windows before running the worker loop.
| if (resp.ok) { | ||
| const data = await resp.json(); | ||
| const count = Math.max(data.total_clients ?? 0, 1); | ||
| elements.listenerCountValue.textContent = String(count); | ||
| elements.listenerCount.setAttribute("aria-hidden", "false"); | ||
| } | ||
| } catch { | ||
| // Silently ignore - server may be unavailable |
There was a problem hiding this comment.
updateListenerCount() forces the displayed total to be at least 1 (Math.max(..., 1)). This can over-report listeners when the API returns 0 (e.g., brief race before coordinator updates counts, or if the socket disconnects but polling continues). Consider displaying the API value as-is (allow 0) and handling the “not yet available” case by hiding the element until a valid count is received.
| if (resp.ok) { | |
| const data = await resp.json(); | |
| const count = Math.max(data.total_clients ?? 0, 1); | |
| elements.listenerCountValue.textContent = String(count); | |
| elements.listenerCount.setAttribute("aria-hidden", "false"); | |
| } | |
| } catch { | |
| // Silently ignore - server may be unavailable | |
| if (!resp.ok) { | |
| elements.listenerCount.setAttribute("aria-hidden", "true"); | |
| return; | |
| } | |
| const data = await resp.json(); | |
| const rawCount = data.total_clients; | |
| const count = | |
| typeof rawCount === "number" && Number.isFinite(rawCount) && rawCount >= 0 | |
| ? rawCount | |
| : null; | |
| if (count === null) { | |
| elements.listenerCount.setAttribute("aria-hidden", "true"); | |
| return; | |
| } | |
| elements.listenerCountValue.textContent = String(count); | |
| elements.listenerCount.setAttribute("aria-hidden", "false"); | |
| } catch { | |
| // Silently ignore - server may be unavailable | |
| elements.listenerCount.setAttribute("aria-hidden", "true"); |
| import pytest | ||
| from aiohttp import ClientSession | ||
|
|
||
| from sendspin.serve.coordinator import ServeCoordinator | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_multi_worker_starts_and_serves_status() -> None: | ||
| """Start coordinator with 2 workers, verify worker /api/status returns shared count.""" | ||
| coordinator = ServeCoordinator( | ||
| source="http://retro.dancewave.online/retrodance.mp3", | ||
| source_format=None, | ||
| port=19800, | ||
| name="Integration Test", | ||
| workers=2, | ||
| log_level="WARNING", | ||
| ) | ||
|
|
||
| coordinator._spawn_workers() | ||
|
|
||
| try: | ||
| await coordinator._wait_for_workers_listening() |
There was a problem hiding this comment.
This integration test binds to hard-coded ports (19800/19801). In CI or when running tests in parallel, these ports can already be in use, causing worker startup failures and potentially hanging indefinitely because _wait_for_workers_listening() has no timeout. Consider selecting free ports at runtime (e.g., bind a temporary socket to port 0 to pick an available base port) and/or wrapping the wait in asyncio.wait_for to ensure the test fails fast instead of hanging.
| import pytest | |
| from aiohttp import ClientSession | |
| from sendspin.serve.coordinator import ServeCoordinator | |
| @pytest.mark.asyncio | |
| async def test_multi_worker_starts_and_serves_status() -> None: | |
| """Start coordinator with 2 workers, verify worker /api/status returns shared count.""" | |
| coordinator = ServeCoordinator( | |
| source="http://retro.dancewave.online/retrodance.mp3", | |
| source_format=None, | |
| port=19800, | |
| name="Integration Test", | |
| workers=2, | |
| log_level="WARNING", | |
| ) | |
| coordinator._spawn_workers() | |
| try: | |
| await coordinator._wait_for_workers_listening() | |
| import asyncio | |
| import socket | |
| import pytest | |
| from aiohttp import ClientSession | |
| from sendspin.serve.coordinator import ServeCoordinator | |
| def _find_free_port_range(count: int) -> int: | |
| """Return the first port in a consecutive localhost port range that is currently free.""" | |
| while True: | |
| probe = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| probe.bind(("127.0.0.1", 0)) | |
| start_port = probe.getsockname()[1] | |
| probe.close() | |
| sockets: list[socket.socket] = [] | |
| try: | |
| for port in range(start_port, start_port + count): | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.bind(("127.0.0.1", port)) | |
| sockets.append(sock) | |
| except OSError: | |
| for sock in sockets: | |
| sock.close() | |
| continue | |
| for sock in sockets: | |
| sock.close() | |
| return start_port | |
| @pytest.mark.asyncio | |
| async def test_multi_worker_starts_and_serves_status() -> None: | |
| """Start coordinator with 2 workers, verify worker /api/status returns shared count.""" | |
| workers = 2 | |
| coordinator = ServeCoordinator( | |
| source="http://retro.dancewave.online/retrodance.mp3", | |
| source_format=None, | |
| port=_find_free_port_range(workers + 1), | |
| name="Integration Test", | |
| workers=workers, | |
| log_level="WARNING", | |
| ) | |
| coordinator._spawn_workers() | |
| try: | |
| await asyncio.wait_for(coordinator._wait_for_workers_listening(), timeout=10) |
| chunk_msg = AudioChunk( | ||
| pcm_bytes=pcm_chunk, | ||
| sample_rate=fmt.sample_rate, | ||
| bit_depth=fmt.bit_depth, | ||
| channels=fmt.channels, | ||
| play_start_us=play_start_us, | ||
| ) | ||
| for queue in self._audio_queues: | ||
| queue.put(chunk_msg) | ||
|
|
There was a problem hiding this comment.
Inside the async decode loop, multiprocessing.Queue.put() is called directly for each worker. Queue.put() can block (pipe backpressure, pickling cost), which will block the event loop and can delay status handling / shutdown responsiveness. Consider offloading fan-out to a thread (asyncio.to_thread/executor) or using a non-blocking/backpressured design (e.g., put_nowait with drop/slowdown) to keep the coordinator loop responsive.
|
|
||
| def _handle_sigint(self) -> None: | ||
| if self._shutdown_requested: | ||
| # Second Ctrl+C — force exit |
There was a problem hiding this comment.
In _handle_sigint, the comment says a second Ctrl+C will “force exit”, but the code just returns and keeps the custom SIGINT handler installed. This makes repeated Ctrl+C a no-op and can leave the process stuck if shutdown hangs. Consider either implementing an actual forced exit on the second SIGINT (e.g., raise KeyboardInterrupt/SystemExit or os._exit) or adjust the comment/behavior so the second signal is not ignored.
| # Second Ctrl+C — force exit | |
| # Second Ctrl+C — force exit | |
| signal.signal(signal.SIGINT, signal.default_int_handler) | |
| signal.raise_signal(signal.SIGINT) |
Summary
This PR adds multi-worker support to
sendspin serveso multiple worker processes can accept Sendspin Party listeners while sharing thesame decoded audio timeline.
It introduces a coordinator/worker architecture where:
Changes
--workerstosendspin serve/api/statusendpoint for total listener count--clientwhen used with multi-worker modeREADME.mdNotes
--clientremains supported in single-worker serve mode only