diff --git a/app/config.py b/app/config.py index 63620df..9251ed1 100644 --- a/app/config.py +++ b/app/config.py @@ -87,6 +87,38 @@ class Settings(BaseSettings): AUTHZ_HOOK_URL: str = "" AUTHZ_HOOK_SECRET: str = "" + # ─── Dock-readiness packaging knobs (PR-5d) ───────────────────── + # + # Self-host integrators (Dock first; future others) often want to + # ship cueapi-core as a SUBSET of the full surface — for example, + # the messaging primitive without the cron / cues primitive, or + # without the email-driven device-code signup flow, or with quotas + # disabled because the integrator enforces them at their own + # billing layer. These flags are the supported way to do that. + # + # All default to False so default behavior matches the full + # cueapi-core experience. Self-hosters override via env var. + + # Strips ``app/routers/cues.py`` + executions + workers + alert routes + # from the FastAPI app at startup. Cron loop (poller) is also disabled + # if running in the same process. Use when integrating cueapi-core as + # a messaging-only substrate (Dock Connect's case). + DISABLE_CUE_PRIMITIVE: bool = False + + # Bypasses quota enforcement on: + # - POST /v1/cues (active_cue_limit, monthly_execution_limit) + # - POST /v1/messages (monthly_message_limit, priority rate-limits) + # Use when the integrator's billing/plan layer already gates these + # one level up (Dock enforces per-tier limits in src/lib/plan.ts). + DISABLE_QUOTA_ENFORCEMENT: bool = False + + # Strips ``app/routers/device_code.py`` (the email-magic-link signup + # flow) from the FastAPI app at startup. Use when the integrator has + # its own user identity system and writes to the ``users`` table + # directly (Dock mirrors its User table into Cue's via shared + # Postgres or via PUT /v1/internal/users/{user_id} in PR-5c). + DISABLE_DEVICE_CODE: bool = False + @property def async_database_url(self) -> str: """Convert postgresql:// to postgresql+asyncpg://.""" diff --git a/app/main.py b/app/main.py index 7c0d1b2..f682f5c 100644 --- a/app/main.py +++ b/app/main.py @@ -149,16 +149,24 @@ async def generic_error_handler(request: Request, exc: Exception): ) +# ─── Router registration ──────────────────────────────────────────── +# +# Three packaging-mode flags from app/config.py (PR-5d) gate which +# routers mount at startup. Defaults preserve full cueapi-core +# behavior. Self-host integrators flip flags via env var. +# +# DISABLE_DEVICE_CODE — strips email-magic-link signup flow +# DISABLE_CUE_PRIMITIVE — strips cron / cues / executions / workers +# (DISABLE_QUOTA_ENFORCEMENT lives in service-layer checks, not here) +# +# Always-on routers come first: health, auth (token validation), agents, +# messages, alerts, webhook_secret, usage. These are essential whether +# the deployment is full / messaging-only / cron-only. + app.include_router(health.router) app.include_router(auth_routes.router) -app.include_router(device_code.router) -app.include_router(device_code.page_router) -app.include_router(cues.router) -app.include_router(executions.router) app.include_router(usage.router) app.include_router(echo.router) -app.include_router(workers.router) -app.include_router(workers.workers_list_router) app.include_router(webhook_secret.router) app.include_router(alerts.router) app.include_router(agents.router) @@ -169,3 +177,18 @@ async def generic_error_handler(request: Request, exc: Exception): # auth model. Default deployments don't expose ``/v1/internal/*``. if settings.EXTERNAL_AUTH_BACKEND: app.include_router(internal_users.router) + +# ─── PR-5d: packaging-mode flags ────────────────────────────────── +if not settings.DISABLE_DEVICE_CODE: + # Email-magic-link signup flow. Disable when the integrator has + # its own user identity system. + app.include_router(device_code.router) + app.include_router(device_code.page_router) + +if not settings.DISABLE_CUE_PRIMITIVE: + # Cue / cron / executions / workers surface. Disable when running + # as a messaging-only substrate (Dock Connect's case). + app.include_router(cues.router) + app.include_router(executions.router) + app.include_router(workers.router) + app.include_router(workers.workers_list_router) diff --git a/app/services/cue_service.py b/app/services/cue_service.py index 9fd4b4f..24f0e58 100644 --- a/app/services/cue_service.py +++ b/app/services/cue_service.py @@ -159,16 +159,19 @@ async def create_cue(db: AsyncSession, user: AuthenticatedUser, data: CueCreate) } # Check cue limit - count_result = await db.execute( - select(func.count()) - .select_from(Cue) - .where(Cue.user_id == user.id, Cue.status.in_(["active", "paused"])) - ) - active_count = count_result.scalar() - if active_count >= user.active_cue_limit: - return { - "error": {"code": "cue_limit_exceeded", "message": f"Active cue limit of {user.active_cue_limit} reached", "status": 403} - } + # Skipped when DISABLE_QUOTA_ENFORCEMENT=true (PR-5d) — integrators + # like Dock enforce per-tier limits at their own billing layer. + if not settings.DISABLE_QUOTA_ENFORCEMENT: + count_result = await db.execute( + select(func.count()) + .select_from(Cue) + .where(Cue.user_id == user.id, Cue.status.in_(["active", "paused"])) + ) + active_count = count_result.scalar() + if active_count >= user.active_cue_limit: + return { + "error": {"code": "cue_limit_exceeded", "message": f"Active cue limit of {user.active_cue_limit} reached", "status": 403} + } # Validate callback URL (SSRF protection) — skip for worker transport transport = data.transport or "webhook" diff --git a/app/services/message_service.py b/app/services/message_service.py index 0588a99..31d421e 100644 --- a/app/services/message_service.py +++ b/app/services/message_service.py @@ -51,6 +51,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.auth import AuthenticatedUser +from app.config import settings from app.models import Agent, DispatchOutbox, Message from app.redis import get_redis from app.services.agent_service import resolve_address @@ -226,22 +227,29 @@ async def create_message( # BEFORE the idempotency check — a dedup-hit on a key inside a # rate-limited window should still be allowed (it's not a new # message, it's the cached representation of an old one). + # + # When DISABLE_QUOTA_ENFORCEMENT=true (PR-5d), all three checks + # (per-minute rate, priority anti-abuse, monthly quota below) are + # bypassed. Integrators like Dock enforce per-tier limits at their + # own billing layer. redis = await get_redis() plan, monthly_limit = await get_user_plan_and_msg_limit(db, user.id) - - # Per-minute rate limit (sliding window, plan-tiered). - await check_per_minute_rate_limit(user.id, plan, redis) - - # Priority-high anti-abuse — may downgrade priority to 3 silently - # for over-pair, or 429 for over-sender. - effective_priority, priority_downgraded = await check_priority_high_limits( - user_id=user.id, - from_agent_id=from_agent.id, - to_agent_id=to_agent.id, - priority=priority, - redis=redis, - ) - priority = effective_priority + priority_downgraded = False + + if not settings.DISABLE_QUOTA_ENFORCEMENT: + # Per-minute rate limit (sliding window, plan-tiered). + await check_per_minute_rate_limit(user.id, plan, redis) + + # Priority-high anti-abuse — may downgrade priority to 3 silently + # for over-pair, or 429 for over-sender. + effective_priority, priority_downgraded = await check_priority_high_limits( + user_id=user.id, + from_agent_id=from_agent.id, + to_agent_id=to_agent.id, + priority=priority, + redis=redis, + ) + priority = effective_priority # 5. Idempotency check. fingerprint = _compute_fingerprint( @@ -271,7 +279,9 @@ async def create_message( # 5.5. Monthly quota — checked here, AFTER idempotency. A dedup-hit # is not a new message; the quota check applies only to new sends. - await check_message_quota(db, user.id, monthly_limit, redis) + # Bypassed under DISABLE_QUOTA_ENFORCEMENT (PR-5d). + if not settings.DISABLE_QUOTA_ENFORCEMENT: + await check_message_quota(db, user.id, monthly_limit, redis) # 6. Resolve thread_id + reply_to chain. inherited_thread_id, parent_msg_id = await _resolve_reply_to( diff --git a/tests/test_dock_readiness_packaging_knobs.py b/tests/test_dock_readiness_packaging_knobs.py new file mode 100644 index 0000000..f44e92f --- /dev/null +++ b/tests/test_dock_readiness_packaging_knobs.py @@ -0,0 +1,274 @@ +"""PR-5d (Dock-readiness): packaging-mode env-var flags. + +Verifies the three packaging-mode flags introduced for self-host +integrators (Dock Connect first; future others). All three default to +False so default behavior matches the full cueapi-core experience — +these tests pin both the default-off behavior AND the on-behavior. + +The flags: + +* ``DISABLE_CUE_PRIMITIVE`` — strips cues/executions/workers routers +* ``DISABLE_DEVICE_CODE`` — strips email-magic-link signup +* ``DISABLE_QUOTA_ENFORCEMENT``— bypasses cue + message quota checks + +Strategy +-------- +The flag wiring lives in two places: + +1. **Router registration** (``app/main.py``) — the FastAPI app object + only mounts the gated routers when the corresponding flag is False. + Tested via the OpenAPI route inventory after re-importing the app + with the flag patched. +2. **Service-layer guards** (``app/services/cue_service.py``, + ``app/services/message_service.py``) — quota checks short-circuit + when ``DISABLE_QUOTA_ENFORCEMENT`` is True. Tested by exercising + the create-path with a user already at quota. + +Both mechanisms must work for self-hosters to ship a properly +slimmed deployment. +""" +from __future__ import annotations + +import importlib +import sys +import uuid +from contextlib import contextmanager +from datetime import datetime, timedelta, timezone + +import pytest +import pytest_asyncio + +from app.config import settings +from app.models import Cue, Message, User +from app.utils.ids import ( + generate_api_key, + generate_webhook_secret, + get_api_key_prefix, + hash_api_key, +) + + +# ─── Helpers ─────────────────────────────────────────────────────── + + +@contextmanager +def _patch_settings(**overrides): + """Patch app.config.settings + reload modules that read settings + at import time (cue_service, message_service, main). Restores on + exit so other tests aren't affected. + """ + original = {} + for k, v in overrides.items(): + original[k] = getattr(settings, k) + setattr(settings, k, v) + try: + yield + finally: + for k, v in original.items(): + setattr(settings, k, v) + + +def _reimport_main(): + """Reload app.main so router registration re-evaluates against the + currently-patched settings. Returns the fresh app object.""" + if "app.main" in sys.modules: + del sys.modules["app.main"] + import app.main # noqa: F401 + return sys.modules["app.main"].app + + +def _route_paths(app) -> set[str]: + return {r.path for r in app.routes if hasattr(r, "path")} + + +# ─── DISABLE_CUE_PRIMITIVE: routers stripped at startup ─────────── + + +class TestDisableCuePrimitive: + def test_default_off_cue_routes_present(self): + """Default deployment includes all cue + executions + workers + routes. Tested by inspecting the FastAPI app's route inventory + with no flags overridden.""" + with _patch_settings(DISABLE_CUE_PRIMITIVE=False): + app = _reimport_main() + paths = _route_paths(app) + # A representative subset — full path set is large. + assert "/v1/cues" in paths or any(p.startswith("/v1/cues") for p in paths) + assert any(p.startswith("/v1/executions") for p in paths) + assert any(p.startswith("/v1/workers") for p in paths) + + def test_flag_on_cue_routes_stripped(self): + """Flipping ``DISABLE_CUE_PRIMITIVE=true`` removes cue, + executions, and workers routers. messaging routers stay.""" + with _patch_settings(DISABLE_CUE_PRIMITIVE=True): + app = _reimport_main() + paths = _route_paths(app) + assert not any(p.startswith("/v1/cues") for p in paths), \ + "cues router must not mount when DISABLE_CUE_PRIMITIVE=True" + assert not any(p.startswith("/v1/executions") for p in paths), \ + "executions router must not mount when DISABLE_CUE_PRIMITIVE=True" + assert not any(p.startswith("/v1/workers") for p in paths), \ + "workers router must not mount when DISABLE_CUE_PRIMITIVE=True" + # Messaging primitive remains live. + assert any(p.startswith("/v1/agents") for p in paths) + assert any(p.startswith("/v1/messages") for p in paths) + + +# ─── DISABLE_DEVICE_CODE: signup flow stripped ──────────────────── + + +class TestDisableDeviceCode: + def test_default_off_device_code_routes_present(self): + with _patch_settings(DISABLE_DEVICE_CODE=False): + app = _reimport_main() + paths = _route_paths(app) + assert any("device-code" in p for p in paths), \ + "default deployment must expose device-code signup" + + def test_flag_on_device_code_stripped(self): + with _patch_settings(DISABLE_DEVICE_CODE=True): + app = _reimport_main() + paths = _route_paths(app) + assert not any("device-code" in p for p in paths), \ + "device-code router must not mount when DISABLE_DEVICE_CODE=True" + # Messaging stays live. + assert any(p.startswith("/v1/agents") for p in paths) + + +# ─── Both flags simultaneously (Dock's expected combo) ──────────── + + +class TestDockMessagingOnlyMode: + """Dock's expected production config: cue + device-code stripped, + messaging primitive only. Pin the combination.""" + + def test_messaging_only_combination(self): + with _patch_settings( + DISABLE_CUE_PRIMITIVE=True, + DISABLE_DEVICE_CODE=True, + ): + app = _reimport_main() + paths = _route_paths(app) + + # What Dock keeps: + assert any(p.startswith("/v1/agents") for p in paths) + assert any(p.startswith("/v1/messages") for p in paths) + assert any(p == "/health" or p.endswith("/health") for p in paths) + + # What Dock drops: + for forbidden in ("/v1/cues", "/v1/executions", "/v1/workers"): + assert not any(p.startswith(forbidden) for p in paths), f"{forbidden} leaked" + assert not any("device-code" in p for p in paths) + + +# ─── DISABLE_QUOTA_ENFORCEMENT: service-layer guards ───────────── + + +@pytest_asyncio.fixture +async def quota_user(db_session): + """A user already at their active_cue_limit. Default cue create + path should 403; flag-on path should succeed.""" + raw_key = generate_api_key() + user = User( + email=f"quota-{uuid.uuid4().hex[:8]}@test.com", + api_key_hash=hash_api_key(raw_key), + api_key_prefix=get_api_key_prefix(raw_key), + webhook_secret=generate_webhook_secret(), + slug=f"quota-{uuid.uuid4().hex[:8]}", + active_cue_limit=2, # tight cap so we hit it with 2 cues + monthly_message_limit=2, + ) + db_session.add(user) + await db_session.flush() + + # Two existing active cues to put the user AT the cap. + for i in range(2): + cue = Cue( + id=f"cue_quota{i:08d}xxx", + user_id=user.id, + name=f"existing-cue-{i}", + schedule_type="recurring", + schedule_cron="0 0 * * *", + schedule_timezone="UTC", + callback_transport="webhook", + callback_url="https://example.com/wh", + callback_method="POST", + payload={}, + status="active", + next_run=datetime.now(timezone.utc) + timedelta(hours=1), + ) + db_session.add(cue) + await db_session.commit() + return user + + +class TestDisableQuotaEnforcement: + """Pin that the flag actually short-circuits both cue and message + quota checks. The default-off path is implicitly covered by + the existing test_messages.py + test_cues.py suites that already + assert quota errors at cap — those would break if the default + flipped, which is its own regression test.""" + + @pytest.mark.asyncio + async def test_flag_off_cue_create_at_cap_is_blocked( + self, db_session, quota_user + ): + """Default behavior: hitting the cue cap returns + cue_limit_exceeded. This is the regression-pin for the + default-on path.""" + from app.schemas.cue import CallbackCreate, CueCreate, ScheduleCreate + from app.services.cue_service import create_cue + + with _patch_settings(DISABLE_QUOTA_ENFORCEMENT=False): + data = CueCreate( + name="should-fail-at-cap", + schedule=ScheduleCreate(type="recurring", cron="0 0 * * *", timezone="UTC"), + transport="webhook", + callback=CallbackCreate(url="https://example.com/x", method="POST"), + ) + # Build a minimal AuthenticatedUser shape from the User row. + from app.auth import AuthenticatedUser + auth_user = AuthenticatedUser( + id=str(quota_user.id), + email=quota_user.email, + plan=quota_user.plan, + active_cue_limit=quota_user.active_cue_limit, + monthly_execution_limit=quota_user.monthly_execution_limit, + rate_limit_per_minute=quota_user.rate_limit_per_minute, + ) + result = await create_cue(db_session, auth_user, data) + assert "error" in result + assert result["error"]["code"] == "cue_limit_exceeded" + + @pytest.mark.asyncio + async def test_flag_on_cue_create_at_cap_succeeds( + self, db_session, quota_user + ): + """With DISABLE_QUOTA_ENFORCEMENT=True, the same call goes + through despite the user being at their cap.""" + from app.schemas.cue import CallbackCreate, CueCreate, ScheduleCreate + from app.services.cue_service import create_cue + + with _patch_settings(DISABLE_QUOTA_ENFORCEMENT=True): + data = CueCreate( + name="should-pass-when-quotas-disabled", + schedule=ScheduleCreate(type="recurring", cron="0 0 * * *", timezone="UTC"), + transport="webhook", + callback=CallbackCreate(url="https://example.com/x", method="POST"), + ) + from app.auth import AuthenticatedUser + auth_user = AuthenticatedUser( + id=str(quota_user.id), + email=quota_user.email, + plan=quota_user.plan, + active_cue_limit=quota_user.active_cue_limit, + monthly_execution_limit=quota_user.monthly_execution_limit, + rate_limit_per_minute=quota_user.rate_limit_per_minute, + ) + result = await create_cue(db_session, auth_user, data) + assert "error" not in result, f"flag should bypass cap, got: {result}" + # Response shape from create_cue is {"cue": CueResponse(...), ...}; + # id lives on the nested CueResponse, not at the top level. + cue_resp = result.get("cue") + assert cue_resp is not None, f"expected 'cue' in response, got keys: {list(result.keys())}" + assert cue_resp.id.startswith("cue_")