feat(settings): wire workspace settings to backend (concurrency, notifications, danger zone)#36
Conversation
…e 3) Adds tenant_notification_config table with one row per workspace storing Slack channel/webhook, PagerDuty key + enabled flag, email-digest flag, and per-event-type triggers (run_failed, run_timeout). Persists via new GET/PATCH /api/v1/tenant/notifications (admin-only PATCH) plus a POST /tenant/notifications/test endpoint to verify each channel. Real enforcement: - services/notifier.py — fire-and-forget dispatcher. On run failure, loads tenant config, posts to Slack incoming-webhook (text + channel override) and triggers a PagerDuty Events API v2 incident with severity=error and run/error metadata. - services/executor.py hooks both terminal-failure transition sites (HTTP-mode permanent failure + AI-step retries-exhausted). Wrapped in asyncio.create_task so notification I/O never blocks run completion; notifier itself is exception-safe. Persisted-only for v1: - email_digest_enabled — column lands but daily digest cron lives in services/digest.py (Phase 3.1). Frontend: new components/settings/notifications-tab.tsx wired into settings/page.tsx as a Bell-icon tab. Loads settings on mount, PATCHes on blur (URLs / channel) and on click (Switches), shows a Send test button per channel that surfaces the server's pass/fail message via toast.
…(Phase 2 + 2.5) Persist max_concurrent_runs / per_function_default / default_step_timeout_s / use_event_id_idempotency to Tenant.settings JSONB and expose via new GET/PATCH /api/v1/tenant/concurrency (admin-only PATCH). Real enforcement (Phase 2): - runner._create_run derives Run.idempotency_key from the trigger event id when the workspace flag is on, so redelivered events map to the same logical run. - runner._enqueue_run consults the running-run count and re-queues with a 5s delay when over max_concurrent_runs (soft throttle), and injects per_function_default into the job config when a Function does not declare its own concurrency. Real enforcement (Phase 2.5 — default_step_timeout_s): - runner._enqueue_run additionally injects default_step_timeout_s into the job config so the executor doesn't need to re-fetch tenant rows. - executor._execute_run resolves the per-step timeout chain: function-level `timeout` wins, otherwise tenant default, else None. - executor._invoke_function (worker mode HTTP) now overrides the per-call read timeout to the resolved value via httpx.Timeout, and forwards step_timeout_s in the payload as a hint for SDK self-enforcement. - executor AI-step branch wraps ai_service.complete in asyncio.wait_for with the resolved timeout so a stuck LLM call yields control instead of riding the default 5-minute read timeout. Step-level timeout in step_result["timeout_seconds"] still wins when the SDK passes one. Frontend: - ConcurrencySettings type + getConcurrencySettings/updateConcurrencySettings in lib/api.ts. - New components/settings/concurrency-tab.tsx (shadcn-styled): three cards covering Concurrency / Step timeout / Idempotency, persisting on blur (numbers) and click (Switch). - settings/page.tsx adds a Gauge-icon Concurrency tab between Audit and Notifications; bumped grid cols 8→9 / 10→11. Out of scope: - Inline-mode (serverless) AI step timeout: inline_executor receives fn not job, so the tenant default would need to be threaded through; worker-mode covers the common path today.
…Phase 4)
Backend:
- Migration add_tenant_soft_delete.py adds tenants.deleted_at TIMESTAMPTZ + index.
- Tenant model gains the deleted_at column.
- api/deps.py::_bounce_if_deleted gates every authenticated request: any
attempt to use a deleted tenant returns HTTP 410 Gone (distinct from
401 / 404 so clients can act on it).
- api/routes/tenants.py adds:
* GET /api/v1/tenant — minimal workspace identity (id/name/slug/deleted_at).
Available to any auth'd user; lets the dashboard show the slug for
typed-confirmation flows.
* POST /api/v1/tenant/pause-all — admin-only. Bulk update of
Function.is_active=false for the workspace; idempotent. Returns
paused_count.
* POST /api/v1/tenant/transfer-ownership — admin-only. Promotes the
target user to admin, demotes the calling admin to member; rejects
self-target and cross-tenant targets.
* DELETE /api/v1/tenant — admin-only. Body must echo the workspace
slug to confirm. Stamps deleted_at; recoverable until a future
hard-delete background job retires the row.
Frontend:
- TenantInfo type + getTenantInfo + pauseAllFunctions + transferOwnership
+ deleteWorkspace methods in lib/api.ts.
- New components/settings/danger-zone.tsx: three rounded action rows
inside a destructive-bordered Card; each behind its own Dialog with
a typed/selected confirmation. Pause and transfer use simple confirms;
delete requires retyping the workspace slug exactly. On successful
delete the page does a hard nav to /login so the auth store resets.
- settings/page.tsx replaces the old "Clear All Data" Danger Zone Card
with <DangerZone /> in the General tab.
Out of scope (intentional follow-ups):
- Hard delete background job — soft-delete persists deleted_at; a cron
in services/retention.py will purge after the retention window.
- Email confirmation for transfer-ownership — currently single-click
promote/demote within the dashboard; the more careful flow can layer on.
There was a problem hiding this comment.
Pull request overview
This PR wires previously UI-only workspace Settings (Concurrency, Notifications, Danger Zone) into backend persistence/enforcement, plus adds dashboard tabs/components to manage them.
Changes:
- Add tenant/workspace settings API endpoints for concurrency limits, notifications config, and danger-zone admin actions (pause-all, transfer ownership, soft delete).
- Enforce workspace-level behavior in runner/executor (idempotency key derivation, soft throttling, step timeouts, failure notifications).
- Add DB migrations/models for tenant soft-delete and per-tenant notification configuration, plus dashboard UI + API client support.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| server/src/flowforge_server/services/runner.py | Reads tenant concurrency settings to derive run idempotency key, soft-throttle enqueue, inject defaults into job config |
| server/src/flowforge_server/services/executor.py | Resolves step timeouts for worker invokes, wraps AI calls with wait_for, triggers notifier on terminal failures |
| server/src/flowforge_server/services/notifier.py | New Slack/PagerDuty dispatcher and “send test” helper |
| server/src/flowforge_server/db/models/tenant_notification_config.py | New per-tenant notification config model |
| server/src/flowforge_server/db/models/tenant.py | Adds deleted_at soft-delete marker |
| server/src/flowforge_server/db/models/init.py | Exports TenantNotificationConfig |
| server/src/flowforge_server/api/schemas/tenant.py | Adds concurrency settings schemas/defaults |
| server/src/flowforge_server/api/schemas/notifications_settings.py | Adds notification settings schemas + test request/response |
| server/src/flowforge_server/api/routes/tenants.py | New tenant settings + danger-zone endpoints |
| server/src/flowforge_server/api/routes/tenant_notifications.py | New notification settings + test-send endpoints |
| server/src/flowforge_server/api/routes/init.py | Registers new routers |
| server/src/flowforge_server/api/deps.py | Adds 410 “deleted workspace” bounce in get_current_tenant |
| server/src/flowforge_server/api/app.py | Includes new routers |
| server/migrations/add_tenant_soft_delete.py | Adds tenants.deleted_at + index |
| server/migrations/add_tenant_notifications.py | Adds tenant_notification_config table |
| dashboard/src/lib/api.ts | Adds typed client calls + defaults for tenant/concurrency/notifications/danger-zone |
| dashboard/src/components/settings/concurrency-tab.tsx | New Concurrency settings UI |
| dashboard/src/components/settings/notifications-tab.tsx | New Notifications settings UI |
| dashboard/src/components/settings/danger-zone.tsx | New Danger Zone UI (pause/transfer/delete) |
| dashboard/src/app/(dashboard)/settings/page.tsx | Adds new settings tabs and replaces old danger zone card |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| idempotency_key: str | None = None | ||
| if concurrency_cfg.use_event_id_idempotency and message.event_id: | ||
| idempotency_key = f"event:{message.event_id}:fn:{fn.id}" | ||
|
|
There was a problem hiding this comment.
idempotency_key is set based on the trigger event id, but the runner never checks for an existing Run with the same key (and the DB schema doesn’t enforce uniqueness). As a result, a redelivered event can still create duplicate Run rows. Consider either (a) querying for an existing run by (tenant_id, idempotency_key) and returning it, or (b) adding a UNIQUE index/constraint and handling IntegrityError on insert.
| if idempotency_key is not None: | |
| existing_run_result = await session.execute( | |
| select(Run).where( | |
| Run.tenant_id == fn.tenant_id, | |
| Run.idempotency_key == idempotency_key, | |
| ) | |
| ) | |
| existing_run = existing_run_result.scalar_one_or_none() | |
| if existing_run is not None: | |
| return existing_run |
| def _bounce_if_deleted(tenant: Tenant) -> Tenant: | ||
| """Reject auth attempts targeting a soft-deleted tenant. | ||
|
|
||
| Set when an admin runs the Danger-zone delete flow. We respond with a | ||
| 410 Gone so clients can distinguish a deleted workspace from a missing | ||
| one without leaking presence. | ||
| """ | ||
| if getattr(tenant, "deleted_at", None) is not None: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_410_GONE, | ||
| detail="This workspace has been deleted.", | ||
| ) | ||
| return tenant |
There was a problem hiding this comment.
TenantWithDevFallback and get_optional_tenant still return deleted tenants without calling _bounce_if_deleted, so many endpoints that use those deps will remain accessible after a workspace is soft-deleted. To ensure “all auth attempts return 410”, apply _bounce_if_deleted in every dependency that resolves a tenant from credentials (not just get_current_tenant).
| @router.get("", response_model=NotificationSettings) | ||
| async def get_settings( | ||
| tenant: TenantWithDevFallback, | ||
| session: AsyncSession = Depends(get_session), | ||
| ) -> NotificationSettings: | ||
| """Return notification settings for the workspace (defaults if unset).""" | ||
| result = await session.execute( | ||
| select(TenantNotificationConfig).where( | ||
| TenantNotificationConfig.tenant_id == tenant.id | ||
| ) | ||
| ) | ||
| return _row_to_response(result.scalar_one_or_none()) |
There was a problem hiding this comment.
GET /api/v1/tenant/notifications currently has no admin guard, but it returns secrets (Slack webhook URL, PagerDuty integration key). This allows any authenticated workspace member to read those credentials. Add CurrentUserAdmin to the GET handler (or redact secret fields for non-admins).
| @router.patch("", response_model=NotificationSettings) | ||
| async def update_settings( | ||
| update_data: NotificationSettingsUpdate, | ||
| tenant: TenantWithDevFallback, | ||
| _admin: CurrentUserAdmin, | ||
| session: AsyncSession = Depends(get_session), | ||
| ) -> NotificationSettings: | ||
| """Patch the workspace notification settings. Admin-only. | ||
|
|
||
| Upserts the row — first call for a tenant materializes the config. | ||
| """ | ||
| result = await session.execute( | ||
| select(TenantNotificationConfig).where( | ||
| TenantNotificationConfig.tenant_id == tenant.id | ||
| ) | ||
| ) | ||
| row = result.scalar_one_or_none() | ||
| if row is None: | ||
| row = TenantNotificationConfig(tenant_id=tenant.id) | ||
| session.add(row) | ||
|
|
||
| patch = update_data.model_dump(exclude_unset=True) | ||
| for k, v in patch.items(): | ||
| setattr(row, k, v) | ||
| row.updated_at = datetime.now(UTC) |
There was a problem hiding this comment.
The notification settings PATCH endpoint accepts arbitrary slack_webhook_url values and the notifier will POST to them, which creates an SSRF risk (even if admin-only). There’s an existing validate_webhook_url helper for outbound-webhook safety; validate Slack webhook URLs on write (and ideally also before sending test/dispatch).
| # Persist back to the tenant row. We re-fetch through the request session so | ||
| # the JSONB write participates in the transaction. | ||
| settings = dict(tenant.settings or {}) | ||
| settings[CONCURRENCY_KEY] = merged.model_dump() | ||
|
|
||
| await session.execute( | ||
| update(Tenant).where(Tenant.id == tenant.id).values(settings=settings) | ||
| ) | ||
| # Mirror the new value onto the in-memory tenant the dep handed us. | ||
| tenant.settings = settings | ||
| flag_modified(tenant, "settings") | ||
| await session.commit() |
There was a problem hiding this comment.
This updates Tenant.settings via a SQL update(...) and then mutates the in-memory tenant.settings + flag_modified before committing. Because tenant is attached to the same session, this can result in redundant UPDATEs. Prefer a single approach: mutate tenant.settings + flag_modified and commit, or execute UPDATE and session.refresh(tenant) without also flagging the instance.
| await session.execute( | ||
| update(Tenant).where(Tenant.id == tenant.id).values(deleted_at=now) | ||
| ) |
There was a problem hiding this comment.
Similar to the concurrency PATCH: this uses a SQL update(...) and also sets tenant.deleted_at on the attached instance before committing, which can cause redundant UPDATEs. Prefer mutating the ORM instance (and committing) or doing the SQL UPDATE + refresh, but not both.
| await session.execute( | |
| update(Tenant).where(Tenant.id == tenant.id).values(deleted_at=now) | |
| ) |
| """Create a new run record. | ||
|
|
||
| Honors the workspace `use_event_id_idempotency` setting: when enabled, | ||
| the trigger event id becomes the new run's idempotency key so a | ||
| redelivered event from the broker maps to the same logical run. | ||
| """ |
There was a problem hiding this comment.
The new idempotency behavior (event id → idempotency_key) changes core scheduling semantics but isn’t covered by the existing Runner unit tests. Adding a test that simulates event redelivery and asserts only one logical run is created/enqueued when idempotency is enabled would help prevent regressions.
| # Resolve the step timeout for this invoke cycle: | ||
| # function-level config wins; otherwise fall back to the workspace | ||
| # default (default_step_timeout_s, injected by runner._enqueue_run). | ||
| # Each /invoke call is bounded by one step yield, so this gives a | ||
| # real per-step soft timeout for worker-mode functions. | ||
| fn_cfg = job.data.get("config") or {} | ||
| fn_timeout_str = fn_cfg.get("timeout") if isinstance(fn_cfg, dict) else None | ||
| tenant_default_step_s = ( | ||
| fn_cfg.get("default_step_timeout_s") | ||
| if isinstance(fn_cfg, dict) | ||
| else None | ||
| ) | ||
| step_timeout_s = ( | ||
| _parse_timeout_to_seconds(fn_timeout_str) | ||
| if fn_timeout_str | ||
| else (int(tenant_default_step_s) if tenant_default_step_s else None) | ||
| ) |
There was a problem hiding this comment.
This step-timeout resolution relies on default_step_timeout_s being injected into job.data.config by the runner, but several enqueue paths don’t provide config at all (e.g. executor recovery re-enqueue), which means step_timeout_s becomes None and the new timeout enforcement won’t apply. Consider making the executor fall back to loading the tenant setting when the job config lacks it, or centralizing config injection across all enqueue call sites.
| cfg = await _load_config(session, run.tenant_id) | ||
| if cfg is None or not cfg.notify_on_run_failed: | ||
| return | ||
|
|
There was a problem hiding this comment.
notify_on_run_timeout is persisted/exposed in the API, but the notifier dispatch only checks notify_on_run_failed and there is no timeout-specific path. Either implement timeout classification/dispatch (and a notify_run_timeout entry point), or remove/disable the timeout toggle to avoid a setting that never takes effect.
| @router.get("", response_model=NotificationSettings) | ||
| async def get_settings( | ||
| tenant: TenantWithDevFallback, | ||
| session: AsyncSession = Depends(get_session), | ||
| ) -> NotificationSettings: | ||
| """Return notification settings for the workspace (defaults if unset).""" | ||
| result = await session.execute( |
There was a problem hiding this comment.
These new notification settings routes are auth- and validation-sensitive (admin-only secret reads/writes, outbound test sends) but there are no integration tests covering authorization/validation. Consider adding route tests to ensure non-admin users can’t read secrets and that unsafe webhook URLs are rejected.
Addresses 7 substantive issues from Copilot's automated review of the settings-backend PR. Tests are flagged as out-of-scope follow-up. Idempotency (Copilot #1): - Add migration add_runs_idempotency_unique.py: partial UNIQUE index on runs(tenant_id, idempotency_key) WHERE idempotency_key IS NOT NULL, built CONCURRENTLY using the AUTOCOMMIT pattern. - runner._create_run now does optimistic query-then-insert. Insert is wrapped in `async with session.begin_nested()` (a SAVEPOINT) so an IntegrityError from a concurrent inserter doesn't poison the outer session transaction shared with _resolve_waiting_steps. On IntegrityError, re-query and return the winner row. Soft-delete coverage (Copilot #2): - Apply _bounce_if_deleted to all 4 return sites in get_tenant_with_dev_fallback (was only in get_current_tenant). get_optional_tenant has zero consumers in api/routes/ — left as-is to preserve its "endpoint works without auth" contract. Notification settings auth + SSRF (Copilot #3, #4): - GET /tenant/notifications now requires CurrentUserAdmin. The response contains the Slack webhook URL and PagerDuty integration key; members must not see them. - PATCH /tenant/notifications validates slack_webhook_url via the existing services/network_utils.validate_webhook_url (rejects private IPs, DNS-rebinding) AND pins the host to hooks.slack.com. - notifier._client now uses create_ssrf_safe_client as defense-in-depth against URLs persisted before the route validation was added. Cleanup (Copilot #5/#6): - Drop the redundant SQL update() + ORM mutate pattern in tenants.update_concurrency and tenants.delete_workspace. Single ORM mutation flushes one UPDATE on commit. Step-timeout coverage (Copilot #8): - New services/concurrency.py exports read_tenant_concurrency, shared between runner.py (enqueue-time injection) and executor.py (fallback load when job config lacks default_step_timeout_s — covers _recover_stuck_runs, which only ships trigger_data + endpoint_url). - Both worker-mode HTTP path and AI-step branch in executor now fall back to a tenant DB load when the runner-injected default is absent. Timeout classification (Copilot #9): - executor._invoke_function emits error.type="HTTPTimeout" for httpx.TimeoutException (covers ReadTimeout/Connect/Write/Pool); other RequestError subclasses keep "RequestError". - notifier.notify_run_failed reads error.type and gates dispatch on notify_on_run_timeout for {"TimeoutError","HTTPTimeout"}, otherwise notify_on_run_failed. The toggle does something now.
Summary
Three previously-cosmetic Settings sections now have real business logic
behind them, plus the executor & runner enforcement to back them up.
Concurrency & limits (Phase 2 + 2.5)
GET/PATCH /api/v1/tenant/concurrency— admin-only PATCH; persisted inTenant.settings["concurrency"]JSONB.runner._create_runderivesRun.idempotency_keyfrom the trigger event id when the workspace flag is on.runner._enqueue_runre-queues with a 5s delay when overmax_concurrent_runs(soft throttle), and injectsper_function_defaultinto the job config when a Function does not declare its own.executor._invoke_function(worker mode) overrides the per-callhttpxread timeout to the resolved step timeout; AI-step branch wrapsai_service.completeinasyncio.wait_for(timeout=…). Fallback chain: step → function → workspace default.Concurrencysettings tab.Notifications (Phase 3)
tenant_notification_configtable (migrationadd_tenant_notifications.py, FK + cascade-delete).GET/PATCH /api/v1/tenant/notifications(admin-only PATCH) +POST /api/v1/tenant/notifications/testto verify each channel.services/notifier.py— fire-and-forget Slack incoming-webhook + PagerDuty Events API v2 dispatcher; exception-safe so failures never block run completion.services/executor.pyhooks both terminal-failure transition sites to fire the notifier.Notificationssettings tab with Slack URL + channel, PagerDuty toggle + key, trigger toggles, email-digest toggle, and per-channel "Send test" buttons.Danger zone (Phase 4)
tenants.deleted_atcolumn (migrationadd_tenant_soft_delete.py) + index.api/deps.py::_bounce_if_deletedreturns HTTP 410 on any auth attempt against a deleted workspace.GET /api/v1/tenant— minimal workspace identity (used by the dashboard for typed-confirm dialogs).POST /api/v1/tenant/pause-all— admin-only bulkFunction.is_active=false.POST /api/v1/tenant/transfer-ownership— admin-only; promotes target to admin and demotes self to member.DELETE /api/v1/tenant— admin-only soft delete, requires the caller to echo the workspace slug.<DangerZone />component replaces the old "Clear All Data" card in the General tab; each action behind a confirmation Dialog.Test plan
docker-compose up -dand start the server — both new migrations apply on boot (add_tenant_notifications,add_tenant_soft_delete).Tenant … at capwhenmax_concurrent_runsis hit.use_event_id_idempotencyon; verify only oneRunrow exists.npx tsc --noEmitindashboard/is clean.