From be8e3f90ad819adabc1f07787ac2c6016f69197b Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Tue, 2 Jun 2026 16:24:50 +0300 Subject: [PATCH] feat: add agents server permissions --- .changeset/agents-server-permissions.md | 11 + packages/agents-runtime/src/agents-client.ts | 7 +- packages/agents-runtime/src/create-handler.ts | 3 + packages/agents-runtime/src/index.ts | 1 + packages/agents-runtime/src/process-wake.ts | 39 +- .../src/runtime-server-client.ts | 18 +- packages/agents-runtime/src/types.ts | 8 + .../test/create-handler.test.ts | 14 + .../test/electric-agents-client.test.ts | 4 +- ...time-server-client-update-metadata.test.ts | 20 + packages/agents-server/docker-compose.dev.yml | 1 + .../drizzle/0011_entity_permissions.sql | 100 +++ .../agents-server/drizzle/meta/_journal.json | 7 + packages/agents-server/src/db/schema.ts | 198 ++++++ .../src/electric-agents-types.ts | 73 +++ .../src/entity-bridge-manager.ts | 63 +- packages/agents-server/src/entity-manager.ts | 50 +- .../agents-server/src/entity-projector.ts | 93 ++- packages/agents-server/src/entity-registry.ts | 595 +++++++++++++++++- packages/agents-server/src/index.ts | 11 + packages/agents-server/src/permissions.ts | 239 +++++++ packages/agents-server/src/routing/context.ts | 2 + .../src/routing/durable-streams-router.ts | 129 +++- .../src/routing/electric-proxy-router.ts | 4 + .../src/routing/entities-router.ts | 291 ++++++++- .../src/routing/entity-types-router.ts | 259 +++++++- packages/agents-server/src/routing/hooks.ts | 1 + .../src/routing/observations-router.ts | 3 +- packages/agents-server/src/runtime.ts | 31 + packages/agents-server/src/server.ts | 5 + .../agents-server/src/utils/server-utils.ts | 154 ++++- .../test/electric-agents-routes.test.ts | 6 + .../test/entity-bridge-manager.test.ts | 86 ++- .../test/entity-projector.test.ts | 7 +- .../test/permissions-routes.test.ts | 413 ++++++++++++ .../agents-server/test/permissions.test.ts | 160 +++++ .../agents-server/test/server-utils.test.ts | 52 +- packages/agents-server/test/test-backend.ts | 11 +- packages/agents/src/agents/horton.ts | 7 + packages/agents/src/agents/worker.ts | 7 + .../builtin-pull-wake-registration.test.ts | 28 + 41 files changed, 3117 insertions(+), 94 deletions(-) create mode 100644 .changeset/agents-server-permissions.md create mode 100644 packages/agents-server/drizzle/0011_entity_permissions.sql create mode 100644 packages/agents-server/src/permissions.ts create mode 100644 packages/agents-server/test/permissions-routes.test.ts create mode 100644 packages/agents-server/test/permissions.test.ts diff --git a/.changeset/agents-server-permissions.md b/.changeset/agents-server-permissions.md new file mode 100644 index 0000000000..bd126e7e98 --- /dev/null +++ b/.changeset/agents-server-permissions.md @@ -0,0 +1,11 @@ +--- +"@electric-ax/agents-server": patch +"@electric-ax/agents-runtime": patch +"@electric-ax/agents": patch +--- + +Add owner-default agents-server permissions with type-level spawn grants, entity grants, effective permission materialization, principal-scoped entity observation streams, shared-state access links, runtime registration permission grants, and default user spawn grants for built-in Horton and Worker types. + +Existing entity observation bridges are rebuilt after upgrade because pre-permission bridge rows do not include principal attribution. + +Entity `manage` grants participate in read visibility, entity-type `manage` grants participate in spawn visibility, and broad parented spawn-time grants require `manage` on the parent. diff --git a/packages/agents-runtime/src/agents-client.ts b/packages/agents-runtime/src/agents-client.ts index 8513cbe55e..e0d6e90b70 100644 --- a/packages/agents-runtime/src/agents-client.ts +++ b/packages/agents-runtime/src/agents-client.ts @@ -63,9 +63,14 @@ export function createAgentsClient(config: AgentsClientConfig): AgentsClient { } if (source.sourceType === `entities`) { - await serverClient.ensureEntitiesMembershipStream( + const ensured = await serverClient.ensureEntitiesMembershipStream( (source as EntitiesObservationSource).tags ) + source = { + ...source, + sourceRef: ensured.sourceRef, + streamUrl: ensured.streamUrl, + } } if (!source.streamUrl || !source.schema) { diff --git a/packages/agents-runtime/src/create-handler.ts b/packages/agents-runtime/src/create-handler.ts index 30373ba6f8..aebf3e5c4f 100644 --- a/packages/agents-runtime/src/create-handler.ts +++ b/packages/agents-runtime/src/create-handler.ts @@ -510,6 +510,9 @@ export function createRuntimeRouter( ? mapSchemas(definition.stateSchemas) : {}), }, + ...(definition.permissionGrants && { + permission_grants: definition.permissionGrants, + }), } const defaultDispatchPolicy = defaultDispatchPolicyForType?.(name) diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 84e57a3fe9..c593d6ac84 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -26,6 +26,7 @@ export type { AgentConfig, AgentModel, EntityDefinition, + EntityTypePermissionGrantDefinition, EntityActionsFactory, EntityActionMap, EntityArgs, diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index c8b92d3629..ad531be0e3 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -1396,7 +1396,7 @@ export async function processWake( ): Promise => { const ssStreamPath = serverClient.getSharedStateStreamPath(ssId) if (mode === `create`) { - await serverClient.ensureSharedStateStream(ssId) + await serverClient.ensureSharedStateStream(ssId, entityUrl) } const ssStreamUrl = appendPathToUrl(baseUrl, ssStreamPath) const ssCollections: Record = {} @@ -1607,6 +1607,7 @@ export async function processWake( source: ObservationSource, wake?: Wake ): Promise => { + let observedSource = source // Self-observation if ( source.sourceType === `entity` && @@ -1647,24 +1648,44 @@ export async function processWake( } if (source.sourceType === `entities`) { - await serverClient.ensureEntitiesMembershipStream( + const ensured = await serverClient.ensureEntitiesMembershipStream( (source as EntitiesObservationSource).tags ) + const originalEntry = source.toManifestEntry() as Record< + string, + unknown + > + observedSource = { + ...source, + sourceRef: ensured.sourceRef, + streamUrl: ensured.streamUrl, + toManifestEntry() { + return { + ...originalEntry, + key: `source:entities:${ensured.sourceRef}`, + sourceRef: ensured.sourceRef, + config: { + ...((originalEntry.config as Record) ?? {}), + streamUrl: ensured.streamUrl, + }, + } as unknown as ReturnType + }, + } } if (effectiveWake) { - const observeHandle = await setupCtx.observe(source, { + const observeHandle = await setupCtx.observe(observedSource, { wake: effectiveWake, }) const sourceUrl = sourceWakeConfig?.sourceUrl ?? - (source.sourceType === `entity` - ? (source as EntityObservationSource).entityUrl - : source.streamUrl) + (observedSource.sourceType === `entity` + ? (observedSource as EntityObservationSource).entityUrl + : observedSource.streamUrl) if (!sourceUrl) { throw new Error( - `[agent-runtime] Cannot register wake for source '${source.sourceType}:${source.sourceRef}' without a source URL` + `[agent-runtime] Cannot register wake for source '${observedSource.sourceType}:${observedSource.sourceRef}' without a source URL` ) } @@ -1695,7 +1716,7 @@ export async function processWake( ? wake.includeResponse : undefined : sourceWakeConfig?.includeResponse, - manifestKey: source.toManifestEntry().key, + manifestKey: observedSource.toManifestEntry().key, }) if (source.sourceType === `db`) { @@ -1706,7 +1727,7 @@ export async function processWake( return observeHandle } - const observeHandle = await setupCtx.observe(source) + const observeHandle = await setupCtx.observe(observedSource) if (source.sourceType === `db`) { scheduleSharedStateWiring() await waitForSharedStateWiring() diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index b4ac4cdcc7..3142059414 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -122,7 +122,10 @@ export interface RuntimeServerClient { }) => Promise spawnEntity: (options: SpawnEntityOptions) => Promise getEntity: (entityUrl: string) => Promise - ensureSharedStateStream: (sharedStateId: string) => Promise + ensureSharedStateStream: ( + sharedStateId: string, + ownerEntityUrl?: string + ) => Promise signalEntity: (options: SignalEntityOptions) => Promise<{ txid: number }> ensureStream: (streamPath: string, contentType?: string) => Promise deleteEntity: (entityUrl: string) => Promise @@ -447,19 +450,24 @@ export function createRuntimeServerClient( } const ensureSharedStateStream = async ( - sharedStateId: string + sharedStateId: string, + ownerEntityUrl?: string ): Promise => { const streamPath = getSharedStateStreamPath(sharedStateId) - return await ensureStream(streamPath, `application/json`) + return await ensureStream(streamPath, `application/json`, ownerEntityUrl) } const ensureStream = async ( streamPath: string, - contentType = `application/json` + contentType = `application/json`, + ownerEntityUrl?: string ): Promise => { const response = await request(streamPath, { method: `PUT`, - headers: { 'content-type': contentType }, + headers: { + 'content-type': contentType, + ...(ownerEntityUrl ? { 'electric-owner-entity': ownerEntityUrl } : {}), + }, }) if (!response.ok && response.status !== 409) { diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index bcd58d3d97..f0d0c90dc0 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -413,6 +413,13 @@ export interface EntityCreated { parent_url?: string } +export type EntityTypePermissionGrantDefinition = { + subject_kind: `principal` | `principal_kind` + subject_value: string + permission: `spawn` | `manage` + expires_at?: string +} + export interface PendingSend { targetUrl: string payload: unknown @@ -1047,6 +1054,7 @@ export interface EntityDefinition< creationSchema?: TCreationSchema inboxSchemas?: Record stateSchemas?: Record + permissionGrants?: ReadonlyArray handler: ( ctx: HandlerContext< diff --git a/packages/agents-runtime/test/create-handler.test.ts b/packages/agents-runtime/test/create-handler.test.ts index 76e52ed83c..ae6dcea3e4 100644 --- a/packages/agents-runtime/test/create-handler.test.ts +++ b/packages/agents-runtime/test/create-handler.test.ts @@ -659,6 +659,13 @@ describe(`createRuntimeHandler`, () => { defineEntity(`schema-agent`, { description: `Schema agent`, stateSchemas: { custom: makeStandardSchema({ type: `object` }) }, + permissionGrants: [ + { + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `spawn`, + }, + ], handler: async () => {}, }) @@ -698,6 +705,13 @@ describe(`createRuntimeHandler`, () => { }, ], }, + permission_grants: [ + { + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `spawn`, + }, + ], state_schemas: expect.objectContaining({ custom: { type: `object` }, run: expect.any(Object), diff --git a/packages/agents-runtime/test/electric-agents-client.test.ts b/packages/agents-runtime/test/electric-agents-client.test.ts index a3e6440251..8ff9506725 100644 --- a/packages/agents-runtime/test/electric-agents-client.test.ts +++ b/packages/agents-runtime/test/electric-agents-client.test.ts @@ -93,7 +93,7 @@ describe(`createAgentsClient`, () => { }) expect(mockState.createStreamDB).toHaveBeenCalledWith({ streamOptions: { - url: `http://electric-agents.test${source.streamUrl}`, + url: `http://electric-agents.test/_entities/source-1`, contentType: `application/json`, }, state: source.schema, @@ -117,7 +117,7 @@ describe(`createAgentsClient`, () => { expect(mockState.createStreamDB).toHaveBeenCalledWith({ streamOptions: { - url: `http://electric-agents.test/t/tenant-a/v1${source.streamUrl}`, + url: `http://electric-agents.test/t/tenant-a/v1/_entities/source-1`, contentType: `application/json`, }, state: source.schema, diff --git a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts index 3277c01c9a..3bb93b08d2 100644 --- a/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts +++ b/packages/agents-runtime/test/runtime-server-client-update-metadata.test.ts @@ -44,6 +44,26 @@ describe(`runtime-server-client.setTag`, () => { ) }) + it(`ensureSharedStateStream sends the owner entity header`, async () => { + const calls: Array<{ url: string; init?: RequestInit }> = [] + const fakeFetch = vi.fn(async (url: string, init?: RequestInit) => { + calls.push({ url, init }) + return new Response(null, { status: 201 }) + }) as unknown as typeof fetch + const client = createRuntimeServerClient({ + baseUrl: `http://test.example`, + fetch: fakeFetch, + }) + + await expect( + client.ensureSharedStateStream(`board-1`, `/task/owner`) + ).resolves.toBe(`/_electric/shared-state/board-1`) + + const headers = new Headers(calls[0]!.init?.headers) + expect(headers.get(`content-type`)).toBe(`application/json`) + expect(headers.get(`electric-owner-entity`)).toBe(`/task/owner`) + }) + it(`sends POST with bearer token and tag body`, async () => { const calls: Array<{ url: string; init?: RequestInit }> = [] const fakeFetch = vi.fn(async (url: string, init?: RequestInit) => { diff --git a/packages/agents-server/docker-compose.dev.yml b/packages/agents-server/docker-compose.dev.yml index 7de9a84c07..e69adb59c0 100644 --- a/packages/agents-server/docker-compose.dev.yml +++ b/packages/agents-server/docker-compose.dev.yml @@ -30,6 +30,7 @@ services: environment: DATABASE_URL: postgresql://electric_agents:electric_agents@postgres:5432/electric_agents ELECTRIC_INSECURE: 'true' + ELECTRIC_FEATURE_FLAGS: allow_subqueries depends_on: postgres: condition: service_healthy diff --git a/packages/agents-server/drizzle/0011_entity_permissions.sql b/packages/agents-server/drizzle/0011_entity_permissions.sql new file mode 100644 index 0000000000..168d02e561 --- /dev/null +++ b/packages/agents-server/drizzle/0011_entity_permissions.sql @@ -0,0 +1,100 @@ +CREATE TABLE "entity_type_permission_grants" ( + "id" bigserial PRIMARY KEY NOT NULL, + "tenant_id" text DEFAULT 'default' NOT NULL, + "entity_type" text NOT NULL, + "permission" text NOT NULL, + "subject_kind" text NOT NULL, + "subject_value" text NOT NULL, + "created_by" text, + "expires_at" timestamp with time zone, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + CONSTRAINT "chk_type_permission_grants_permission" CHECK ("entity_type_permission_grants"."permission" IN ('spawn', 'manage')), + CONSTRAINT "chk_type_permission_grants_subject_kind" CHECK ("entity_type_permission_grants"."subject_kind" IN ('principal', 'principal_kind')) +); +--> statement-breakpoint +CREATE TABLE "entity_lineage" ( + "tenant_id" text DEFAULT 'default' NOT NULL, + "ancestor_url" text NOT NULL, + "descendant_url" text NOT NULL, + "depth" integer NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + CONSTRAINT "entity_lineage_pkey" PRIMARY KEY ("tenant_id", "ancestor_url", "descendant_url"), + CONSTRAINT "chk_entity_lineage_depth" CHECK ("entity_lineage"."depth" >= 0) +); +--> statement-breakpoint +CREATE TABLE "entity_permission_grants" ( + "id" bigserial PRIMARY KEY NOT NULL, + "tenant_id" text DEFAULT 'default' NOT NULL, + "entity_url" text NOT NULL, + "permission" text NOT NULL, + "subject_kind" text NOT NULL, + "subject_value" text NOT NULL, + "propagation" text DEFAULT 'self' NOT NULL, + "copy_to_children" boolean DEFAULT false NOT NULL, + "created_by" text, + "expires_at" timestamp with time zone, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + CONSTRAINT "chk_entity_permission_grants_permission" CHECK ("entity_permission_grants"."permission" IN ('read', 'write', 'delete', 'signal', 'fork', 'schedule', 'spawn', 'manage')), + CONSTRAINT "chk_entity_permission_grants_subject_kind" CHECK ("entity_permission_grants"."subject_kind" IN ('principal', 'principal_kind')), + CONSTRAINT "chk_entity_permission_grants_propagation" CHECK ("entity_permission_grants"."propagation" IN ('self', 'descendants')) +); +--> statement-breakpoint +CREATE TABLE "entity_effective_permissions" ( + "id" bigserial PRIMARY KEY NOT NULL, + "tenant_id" text DEFAULT 'default' NOT NULL, + "entity_url" text NOT NULL, + "source_entity_url" text NOT NULL, + "source_grant_id" bigint NOT NULL, + "permission" text NOT NULL, + "subject_kind" text NOT NULL, + "subject_value" text NOT NULL, + "expires_at" timestamp with time zone, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + CONSTRAINT "uq_entity_effective_permission" UNIQUE ("tenant_id", "entity_url", "source_grant_id"), + CONSTRAINT "chk_entity_effective_permissions_permission" CHECK ("entity_effective_permissions"."permission" IN ('read', 'write', 'delete', 'signal', 'fork', 'schedule', 'spawn', 'manage')), + CONSTRAINT "chk_entity_effective_permissions_subject_kind" CHECK ("entity_effective_permissions"."subject_kind" IN ('principal', 'principal_kind')) +); +--> statement-breakpoint +CREATE TABLE "shared_state_links" ( + "tenant_id" text DEFAULT 'default' NOT NULL, + "shared_state_id" text NOT NULL, + "owner_entity_url" text NOT NULL, + "manifest_key" text NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + CONSTRAINT "shared_state_links_pkey" PRIMARY KEY ("tenant_id", "owner_entity_url", "manifest_key") +); +--> statement-breakpoint +CREATE INDEX "idx_type_permission_grants_lookup" ON "entity_type_permission_grants" USING btree ("tenant_id", "entity_type", "permission", "subject_kind", "subject_value"); +--> statement-breakpoint +CREATE INDEX "idx_type_permission_grants_expiry" ON "entity_type_permission_grants" USING btree ("tenant_id", "expires_at"); +--> statement-breakpoint +CREATE INDEX "idx_entity_lineage_descendant" ON "entity_lineage" USING btree ("tenant_id", "descendant_url"); +--> statement-breakpoint +CREATE INDEX "idx_entity_permission_grants_entity" ON "entity_permission_grants" USING btree ("tenant_id", "entity_url"); +--> statement-breakpoint +CREATE INDEX "idx_entity_permission_grants_subject" ON "entity_permission_grants" USING btree ("tenant_id", "permission", "subject_kind", "subject_value"); +--> statement-breakpoint +CREATE INDEX "idx_entity_permission_grants_expiry" ON "entity_permission_grants" USING btree ("tenant_id", "expires_at"); +--> statement-breakpoint +CREATE INDEX "idx_entity_effective_permissions_lookup" ON "entity_effective_permissions" USING btree ("tenant_id", "permission", "subject_kind", "subject_value", "entity_url"); +--> statement-breakpoint +CREATE INDEX "idx_entity_effective_permissions_entity" ON "entity_effective_permissions" USING btree ("tenant_id", "entity_url"); +--> statement-breakpoint +CREATE INDEX "idx_entity_effective_permissions_expiry" ON "entity_effective_permissions" USING btree ("tenant_id", "expires_at"); +--> statement-breakpoint +CREATE INDEX "idx_shared_state_links_shared_state" ON "shared_state_links" USING btree ("tenant_id", "shared_state_id"); +--> statement-breakpoint +CREATE INDEX "idx_shared_state_links_owner" ON "shared_state_links" USING btree ("tenant_id", "owner_entity_url"); +--> statement-breakpoint +-- Pre-permission entity bridge rows do not carry principal attribution. Drop them +-- so observation bridges are rebuilt with principal_url/principal_kind scoping. +DELETE FROM "entity_bridges"; +--> statement-breakpoint +ALTER TABLE "entity_bridges" ADD COLUMN "principal_url" text; +--> statement-breakpoint +ALTER TABLE "entity_bridges" ADD COLUMN "principal_kind" text; +--> statement-breakpoint +CREATE INDEX "idx_entity_bridges_principal" ON "entity_bridges" USING btree ("tenant_id", "principal_kind", "principal_url"); diff --git a/packages/agents-server/drizzle/meta/_journal.json b/packages/agents-server/drizzle/meta/_journal.json index a05826c622..6fb0fae48a 100644 --- a/packages/agents-server/drizzle/meta/_journal.json +++ b/packages/agents-server/drizzle/meta/_journal.json @@ -78,6 +78,13 @@ "when": 1779062400000, "tag": "0010_sandbox_profiles", "breakpoints": true + }, + { + "idx": 11, + "version": "7", + "when": 1779050000000, + "tag": "0011_entity_permissions", + "breakpoints": true } ] } diff --git a/packages/agents-server/src/db/schema.ts b/packages/agents-server/src/db/schema.ts index 8f1b45fcf4..db68ec66fb 100644 --- a/packages/agents-server/src/db/schema.ts +++ b/packages/agents-server/src/db/schema.ts @@ -72,6 +72,197 @@ export const entities = pgTable( ] ) +export const entityTypePermissionGrants = pgTable( + `entity_type_permission_grants`, + { + id: bigserial(`id`, { mode: `number` }).primaryKey(), + tenantId: text(`tenant_id`).notNull().default(`default`), + entityType: text(`entity_type`).notNull(), + permission: text(`permission`).notNull(), + subjectKind: text(`subject_kind`).notNull(), + subjectValue: text(`subject_value`).notNull(), + createdBy: text(`created_by`), + expiresAt: timestamp(`expires_at`, { withTimezone: true }), + createdAt: timestamp(`created_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + updatedAt: timestamp(`updated_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (table) => [ + index(`idx_type_permission_grants_lookup`).on( + table.tenantId, + table.entityType, + table.permission, + table.subjectKind, + table.subjectValue + ), + index(`idx_type_permission_grants_expiry`).on( + table.tenantId, + table.expiresAt + ), + check( + `chk_type_permission_grants_permission`, + sql`${table.permission} IN ('spawn', 'manage')` + ), + check( + `chk_type_permission_grants_subject_kind`, + sql`${table.subjectKind} IN ('principal', 'principal_kind')` + ), + ] +) + +export const entityLineage = pgTable( + `entity_lineage`, + { + tenantId: text(`tenant_id`).notNull().default(`default`), + ancestorUrl: text(`ancestor_url`).notNull(), + descendantUrl: text(`descendant_url`).notNull(), + depth: integer(`depth`).notNull(), + createdAt: timestamp(`created_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (table) => [ + primaryKey({ + columns: [table.tenantId, table.ancestorUrl, table.descendantUrl], + }), + index(`idx_entity_lineage_descendant`).on( + table.tenantId, + table.descendantUrl + ), + check(`chk_entity_lineage_depth`, sql`${table.depth} >= 0`), + ] +) + +export const entityPermissionGrants = pgTable( + `entity_permission_grants`, + { + id: bigserial(`id`, { mode: `number` }).primaryKey(), + tenantId: text(`tenant_id`).notNull().default(`default`), + entityUrl: text(`entity_url`).notNull(), + permission: text(`permission`).notNull(), + subjectKind: text(`subject_kind`).notNull(), + subjectValue: text(`subject_value`).notNull(), + propagation: text(`propagation`).notNull().default(`self`), + copyToChildren: boolean(`copy_to_children`).notNull().default(false), + createdBy: text(`created_by`), + expiresAt: timestamp(`expires_at`, { withTimezone: true }), + createdAt: timestamp(`created_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + updatedAt: timestamp(`updated_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (table) => [ + index(`idx_entity_permission_grants_entity`).on( + table.tenantId, + table.entityUrl + ), + index(`idx_entity_permission_grants_subject`).on( + table.tenantId, + table.permission, + table.subjectKind, + table.subjectValue + ), + index(`idx_entity_permission_grants_expiry`).on( + table.tenantId, + table.expiresAt + ), + check( + `chk_entity_permission_grants_permission`, + sql`${table.permission} IN ('read', 'write', 'delete', 'signal', 'fork', 'schedule', 'spawn', 'manage')` + ), + check( + `chk_entity_permission_grants_subject_kind`, + sql`${table.subjectKind} IN ('principal', 'principal_kind')` + ), + check( + `chk_entity_permission_grants_propagation`, + sql`${table.propagation} IN ('self', 'descendants')` + ), + ] +) + +export const entityEffectivePermissions = pgTable( + `entity_effective_permissions`, + { + id: bigserial(`id`, { mode: `number` }).primaryKey(), + tenantId: text(`tenant_id`).notNull().default(`default`), + entityUrl: text(`entity_url`).notNull(), + sourceEntityUrl: text(`source_entity_url`).notNull(), + sourceGrantId: bigint(`source_grant_id`, { mode: `number` }).notNull(), + permission: text(`permission`).notNull(), + subjectKind: text(`subject_kind`).notNull(), + subjectValue: text(`subject_value`).notNull(), + expiresAt: timestamp(`expires_at`, { withTimezone: true }), + createdAt: timestamp(`created_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (table) => [ + unique(`uq_entity_effective_permission`).on( + table.tenantId, + table.entityUrl, + table.sourceGrantId + ), + index(`idx_entity_effective_permissions_lookup`).on( + table.tenantId, + table.permission, + table.subjectKind, + table.subjectValue, + table.entityUrl + ), + index(`idx_entity_effective_permissions_entity`).on( + table.tenantId, + table.entityUrl + ), + index(`idx_entity_effective_permissions_expiry`).on( + table.tenantId, + table.expiresAt + ), + check( + `chk_entity_effective_permissions_permission`, + sql`${table.permission} IN ('read', 'write', 'delete', 'signal', 'fork', 'schedule', 'spawn', 'manage')` + ), + check( + `chk_entity_effective_permissions_subject_kind`, + sql`${table.subjectKind} IN ('principal', 'principal_kind')` + ), + ] +) + +export const sharedStateLinks = pgTable( + `shared_state_links`, + { + tenantId: text(`tenant_id`).notNull().default(`default`), + sharedStateId: text(`shared_state_id`).notNull(), + ownerEntityUrl: text(`owner_entity_url`).notNull(), + manifestKey: text(`manifest_key`).notNull(), + createdAt: timestamp(`created_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + updatedAt: timestamp(`updated_at`, { withTimezone: true }) + .notNull() + .defaultNow(), + }, + (table) => [ + primaryKey({ + columns: [table.tenantId, table.ownerEntityUrl, table.manifestKey], + }), + index(`idx_shared_state_links_shared_state`).on( + table.tenantId, + table.sharedStateId + ), + index(`idx_shared_state_links_owner`).on( + table.tenantId, + table.ownerEntityUrl + ), + ] +) + export const users = pgTable( `users`, { @@ -436,6 +627,8 @@ export const entityBridges = pgTable( sourceRef: text(`source_ref`).notNull(), tags: jsonb(`tags`).notNull(), streamUrl: text(`stream_url`).notNull(), + principalUrl: text(`principal_url`), + principalKind: text(`principal_kind`), shapeHandle: text(`shape_handle`), shapeOffset: text(`shape_offset`), lastObserverActivityAt: timestamp(`last_observer_activity_at`, { @@ -453,6 +646,11 @@ export const entityBridges = pgTable( (table) => [ primaryKey({ columns: [table.tenantId, table.sourceRef] }), unique(`uq_entity_bridges_stream_url`).on(table.tenantId, table.streamUrl), + index(`idx_entity_bridges_principal`).on( + table.tenantId, + table.principalKind, + table.principalUrl + ), ] ) diff --git a/packages/agents-server/src/electric-agents-types.ts b/packages/agents-server/src/electric-agents-types.ts index 6c6a980a52..b062962a9b 100644 --- a/packages/agents-server/src/electric-agents-types.ts +++ b/packages/agents-server/src/electric-agents-types.ts @@ -67,6 +67,78 @@ export type RunnerKind = `local` | `cloud-worker` | `sandbox` | `ci` | `server` export type RunnerAdminStatus = `enabled` | `disabled` export type RunnerLiveness = `online` | `offline` +export type PermissionSubjectKind = `principal` | `principal_kind` +export type PermissionSubject = { + subject_kind: PermissionSubjectKind + subject_value: string +} +export type EntityPermission = + | `read` + | `write` + | `delete` + | `signal` + | `fork` + | `schedule` + | `spawn` + | `manage` +export type EntityTypePermission = `spawn` | `manage` +export type EntityPermissionPropagation = `self` | `descendants` + +export interface EntityPermissionGrant extends PermissionSubject { + id: number + entity_url: string + permission: EntityPermission + propagation: EntityPermissionPropagation + copy_to_children: boolean + created_by?: string + expires_at?: string + created_at: string + updated_at: string +} + +export interface EntityTypePermissionGrant extends PermissionSubject { + id: number + entity_type: string + permission: EntityTypePermission + created_by?: string + expires_at?: string + created_at: string + updated_at: string +} + +export interface EntityTypePermissionGrantInput extends PermissionSubject { + permission: EntityTypePermission + expires_at?: string +} + +export type AuthorizationResource = + | { kind: `entity`; entity: ElectricAgentsEntity } + | { kind: `entity_type`; entityType: ElectricAgentsEntityType } + | { kind: `entity_type_registration`; entityTypeName: string } + | { + kind: `shared_state` + sharedStateId: string + linkedEntityUrls: Array + } + +export type AuthorizationDecision = { + decision: `allow` | `deny` + expires_at?: string +} + +export type AuthorizeRequest = (input: { + tenant: string + principal: Principal + verb: EntityPermission | EntityTypePermission + resource: AuthorizationResource + request?: { + method: string + url: string + headers: Record + } + builtInAllowed: boolean +}) => Promise | AuthorizationDecision + const VALID_RUNNER_KINDS = new Set([ `local`, `cloud-worker`, @@ -443,6 +515,7 @@ export interface RegisterEntityTypeRequest { state_schemas?: Record> serve_endpoint?: string default_dispatch_policy?: DispatchPolicy + permission_grants?: Array } export interface TypedSpawnRequest { diff --git a/packages/agents-server/src/entity-bridge-manager.ts b/packages/agents-server/src/entity-bridge-manager.ts index 889aee50a4..977662507a 100644 --- a/packages/agents-server/src/entity-bridge-manager.ts +++ b/packages/agents-server/src/entity-bridge-manager.ts @@ -3,6 +3,7 @@ import { assertTags, buildTagsIndex, getEntitiesStreamPath, + hashString, normalizeTags, sourceRefForTags, } from '@electric-ax/agents-runtime' @@ -13,7 +14,9 @@ import { } from '@electric-sql/client' import { serverLog } from './utils/log.js' import { electricUrlWithPath } from './utils/electric-url.js' +import { buildReadableEntitiesWhere } from './utils/server-utils.js' import { DEFAULT_TENANT_ID } from './tenant.js' +import { isBuiltInSystemPrincipalUrl } from './principal.js' import type { EntityBridgeRow, PostgresRegistry } from './entity-registry.js' import type { StreamClient } from './stream-client.js' import type { @@ -30,7 +33,11 @@ import type { export interface EntityBridgeCoordinator { start(): Promise stop(): Promise - register(tagsInput: unknown): Promise<{ + register( + tagsInput: unknown, + principalUrl: string, + principalKind: string + ): Promise<{ sourceRef: string streamUrl: string }> @@ -115,19 +122,44 @@ function sqlStringLiteral(value: string): string { function buildTenantTagsWhereClause( tenantId: string, - tags: EntityTags + tags: EntityTags, + principalUrl?: string, + principalKind?: string, + permissionBypass?: boolean ): string { - return `tenant_id = ${sqlStringLiteral(tenantId)} AND (${buildTagsWhereClause(tags)})` + const readableWhere = + principalUrl && principalKind + ? buildReadableEntitiesWhere({ + tenantId, + principalUrl, + principalKind, + permissionBypass, + }) + : `tenant_id = ${sqlStringLiteral(tenantId)} AND FALSE` + return `${readableWhere} AND (${buildTagsWhereClause(tags)})` } function shapeEntityKey(message: ChangeMessage): string { return message.value.url } +function principalScopedSourceRef( + tagSourceRef: string, + principalUrl: string, + principalKind: string +): string { + return `${tagSourceRef}-${hashString( + JSON.stringify({ principalKind, principalUrl }) + )}` +} + class EntityBridge { readonly sourceRef: string readonly tags: EntityTags readonly streamUrl: string + private readonly principalUrl?: string + private readonly principalKind?: string + private readonly permissionBypass: boolean private currentMembers = new Map() private producer: IdempotentProducer | null = null @@ -152,6 +184,9 @@ class EntityBridge { this.sourceRef = row.sourceRef this.tags = normalizeTags(row.tags) this.streamUrl = row.streamUrl + this.principalUrl = row.principalUrl + this.principalKind = row.principalKind + this.permissionBypass = isBuiltInSystemPrincipalUrl(row.principalUrl) this.initialShapeHandle = row.shapeHandle this.initialShapeOffset = row.shapeOffset } @@ -316,7 +351,13 @@ class EntityBridge { url: electricUrlWithPath(this.electricUrl, `/v1/shape`).toString(), params: { table: `entities`, - where: buildTenantTagsWhereClause(this.tenantId, this.tags), + where: buildTenantTagsWhereClause( + this.tenantId, + this.tags, + this.principalUrl, + this.principalKind, + this.permissionBypass + ), ...(this.electricSecret ? { secret: this.electricSecret } : {}), columns: [...ENTITY_SHAPE_COLUMNS], replica: `full`, @@ -564,7 +605,11 @@ export class EntityBridgeManager implements EntityBridgeCoordinator { ) } - async register(tagsInput: unknown): Promise<{ + async register( + tagsInput: unknown, + principalUrl: string, + principalKind: string + ): Promise<{ sourceRef: string streamUrl: string }> { @@ -573,13 +618,19 @@ export class EntityBridgeManager implements EntityBridgeCoordinator { } const tags = normalizeTags(assertTags(tagsInput)) - const sourceRef = sourceRefForTags(tags) + const sourceRef = principalScopedSourceRef( + sourceRefForTags(tags), + principalUrl, + principalKind + ) const streamUrl = getEntitiesStreamPath(sourceRef) const row = await this.registry.upsertEntityBridge({ sourceRef, tags, streamUrl, + principalUrl, + principalKind, }) await this.registry.touchEntityBridge(sourceRef) await this.ensureBridge(row) diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index d6a6bbd4f5..37bf91877a 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -2047,12 +2047,7 @@ export class EntityManager { manifests: Map> ): Promise { for (const [manifestKey, manifest] of manifests) { - await this.syncEntitiesManifestSource( - entityUrl, - manifestKey, - `upsert`, - manifest - ) + await this.syncManifestLinks(entityUrl, manifestKey, `upsert`, manifest) const wake = buildManifestWakeRegistration( entityUrl, @@ -2610,14 +2605,21 @@ export class EntityManager { return updated } - async ensureEntitiesMembershipStream(tags: Record): Promise<{ + async ensureEntitiesMembershipStream( + tags: Record, + principal: { url: string; kind: string } + ): Promise<{ sourceRef: string streamUrl: string }> { if (!this.entityBridgeManager) { throw new Error(`Entity bridge manager not configured`) } - return this.entityBridgeManager.register(this.validateTags(tags)) + return this.entityBridgeManager.register( + this.validateTags(tags), + principal.url, + principal.kind + ) } async writeManifestEntry( @@ -2650,12 +2652,12 @@ export class EntityManager { await this.streamClient.appendIdempotent(entity.streams.main, encoded, { producerId: opts.producerId, }) - await this.syncEntitiesManifestSource(entityUrl, key, operation, value) + await this.syncManifestLinks(entityUrl, key, operation, value) return } await this.streamClient.append(entity.streams.main, encoded) - await this.syncEntitiesManifestSource(entityUrl, key, operation, value) + await this.syncManifestLinks(entityUrl, key, operation, value) } async upsertCronSchedule( @@ -3031,7 +3033,7 @@ export class EntityManager { }) } - private async syncEntitiesManifestSource( + private async syncManifestLinks( entityUrl: string, manifestKey: string, operation: `insert` | `update` | `upsert` | `delete`, @@ -3044,6 +3046,14 @@ export class EntityManager { manifestKey, sourceRef ) + + const sharedStateId = + operation === `delete` ? undefined : this.extractSharedStateId(value) + await this.registry.replaceSharedStateLink( + entityUrl, + manifestKey, + sharedStateId + ) } private extractEntitiesSourceRef( @@ -3059,6 +3069,24 @@ export class EntityManager { return undefined } + private extractSharedStateId( + manifest?: Record + ): string | undefined { + if (manifest?.kind === `shared-state` && typeof manifest.id === `string`) { + return manifest.id + } + + if (manifest?.kind !== `source` || manifest.sourceType !== `db`) { + return undefined + } + + if (typeof manifest.sourceRef === `string`) { + return manifest.sourceRef + } + const config = isRecord(manifest.config) ? manifest.config : undefined + return typeof config?.id === `string` ? config.id : undefined + } + /** * Read a child entity's stream and extract concatenated text deltas * for a specific run, plus any error messages for that run. diff --git a/packages/agents-server/src/entity-projector.ts b/packages/agents-server/src/entity-projector.ts index cac148d706..f8c9d89295 100644 --- a/packages/agents-server/src/entity-projector.ts +++ b/packages/agents-server/src/entity-projector.ts @@ -3,6 +3,7 @@ import { assertTags, buildTagsIndex, getEntitiesStreamPath, + hashString, normalizeTags, sourceRefForTags, } from '@electric-ax/agents-runtime' @@ -15,6 +16,7 @@ import { PostgresRegistry } from './entity-registry.js' import { electricUrlWithPath } from './utils/electric-url.js' import { serverLog } from './utils/log.js' import { isUnregisteredTenantError } from './tenant.js' +import { isBuiltInSystemPrincipalUrl } from './principal.js' import type { DrizzleDB } from './db/index.js' import type { EntityBridgeCoordinator } from './entity-bridge-manager.js' import type { EntityBridgeRow } from './entity-registry.js' @@ -37,6 +39,7 @@ interface EntityShapeRow extends Row { type: string status: `spawning` | `running` | `idle` | `stopped` tags: EntityTags + created_by?: string | null spawn_args?: Record | null sandbox?: { profile: string } | null parent?: string | null @@ -53,6 +56,7 @@ const ENTITY_SHAPE_COLUMNS = [ `type`, `status`, `tags`, + `created_by`, `spawn_args`, `sandbox`, `parent`, @@ -89,6 +93,16 @@ function sourceRefFromStreamPath(streamPath: string): string | null { return match?.[1] ?? null } +function principalScopedSourceRef( + tagSourceRef: string, + principalUrl: string, + principalKind: string +): string { + return `${tagSourceRef}-${hashString( + JSON.stringify({ principalKind, principalUrl }) + )}` +} + function sameMember( left: EntityMembershipRow | undefined, right: EntityMembershipRow @@ -125,6 +139,9 @@ class ProjectedEntityBridge { readonly sourceRef: string readonly tags: EntityTags readonly streamUrl: string + private readonly principalUrl?: string + private readonly principalKind?: string + private readonly permissionBypass: boolean private currentMembers = new Map() private producer: IdempotentProducer | null = null @@ -132,12 +149,16 @@ class ProjectedEntityBridge { constructor( row: EntityBridgeRow, + private registry: PostgresRegistry, private streamClient: StreamClient ) { this.tenantId = row.tenantId this.sourceRef = row.sourceRef this.tags = normalizeTags(row.tags) this.streamUrl = row.streamUrl + this.principalUrl = row.principalUrl + this.principalKind = row.principalKind + this.permissionBypass = isBuiltInSystemPrincipalUrl(row.principalUrl) } async start(initialEntities: Iterable): Promise { @@ -159,7 +180,7 @@ class ProjectedEntityBridge { } ) await this.loadCurrentMembers() - this.reconcile(initialEntities) + await this.reconcile(initialEntities) } async stop(): Promise { @@ -175,13 +196,14 @@ class ProjectedEntityBridge { } } - reconcile(entities: Iterable): void { + async reconcile(entities: Iterable): Promise { if (this.stopped) return const staleMembers = new Map(this.currentMembers) for (const entity of entities) { if (entity.tenant_id !== this.tenantId) continue if (!entityMatchesTags(entity, this.tags)) continue + if (!(await this.canReadEntity(entity))) continue staleMembers.delete(entity.url) this.upsertEntity(entity) } @@ -192,11 +214,14 @@ class ProjectedEntityBridge { } } - applyEntity(entity: EntityShapeRow): void { + async applyEntity(entity: EntityShapeRow): Promise { if (this.stopped) return if (entity.tenant_id !== this.tenantId) return - if (!entityMatchesTags(entity, this.tags)) { + if ( + !entityMatchesTags(entity, this.tags) || + !(await this.canReadEntity(entity)) + ) { const existing = this.currentMembers.get(entity.url) if (!existing) return this.append(`delete`, existing) @@ -231,6 +256,16 @@ class ProjectedEntityBridge { } } + private async canReadEntity(entity: EntityShapeRow): Promise { + if (this.permissionBypass) return true + if (!this.principalUrl || !this.principalKind) return false + if (entity.created_by === this.principalUrl) return true + return await this.registry.hasEntityPermission(entity.url, `read`, { + principalUrl: this.principalUrl, + principalKind: this.principalKind, + }) + } + private async ensureStream(): Promise { if (!(await this.streamClient.exists(this.streamUrl))) { await this.streamClient.create(this.streamUrl, { @@ -377,7 +412,9 @@ export class EntityProjector { async register( tenantId: string, registry: PostgresRegistry, - tagsInput: unknown + tagsInput: unknown, + principalUrl: string, + principalKind: string ): Promise<{ sourceRef: string; streamUrl: string }> { if (!this.electricUrl) { throw new Error( @@ -388,12 +425,18 @@ export class EntityProjector { await this.start() this.registries.set(tenantId, registry) const tags = normalizeTags(assertTags(tagsInput)) - const sourceRef = sourceRefForTags(tags) + const sourceRef = principalScopedSourceRef( + sourceRefForTags(tags), + principalUrl, + principalKind + ) const streamUrl = getEntitiesStreamPath(sourceRef) const row = await registry.upsertEntityBridge({ sourceRef, tags, streamUrl, + principalUrl, + principalKind, }) await registry.touchEntityBridge(sourceRef) await this.ensureProjection(row) @@ -436,8 +479,12 @@ export class EntityProjector { } } - async onEntityChanged(_tenantId: string, _entityUrl: string): Promise { - // Membership updates come from the shared Electric entities shape. + async onEntityChanged(tenantId: string, entityUrl: string): Promise { + const entity = this.entities.get(entityKey(tenantId, entityUrl)) + if (!entity) return + for (const projection of this.projectionsForTenant(tenantId)) { + await projection.applyEntity(entity) + } } async loadTenantBridges( @@ -523,18 +570,20 @@ export class EntityProjector { } if (message.headers.control === `up-to-date`) { this.upToDate = true - this.reconcileAll() + await this.reconcileAll() this.readyResolve?.() } continue } if (!isChangeMessage(message)) continue - this.applyChangeMessage(message) + await this.applyChangeMessage(message) } } - private applyChangeMessage(message: ChangeMessage): void { + private async applyChangeMessage( + message: ChangeMessage + ): Promise { const entity = message.value const key = entityKey(entity.tenant_id, entity.url) if (message.headers.operation === `delete`) { @@ -550,7 +599,7 @@ export class EntityProjector { this.entities.set(key, entity) if (this.upToDate) { for (const projection of this.projectionsForTenant(entity.tenant_id)) { - projection.applyEntity(entity) + await projection.applyEntity(entity) } } } @@ -642,7 +691,11 @@ export class EntityProjector { } throw error } - const projection = new ProjectedEntityBridge(row, streamClient) + const projection = new ProjectedEntityBridge( + row, + this.registryForTenant(row.tenantId), + streamClient + ) await projection.start(this.entitiesForTenant(row.tenantId)) this.projections.set(key, projection) })().finally(() => { @@ -665,9 +718,9 @@ export class EntityProjector { ) } - private reconcileAll(): void { + private async reconcileAll(): Promise { for (const projection of this.projections.values()) { - projection.reconcile(this.entitiesForTenant(projection.tenantId)) + await projection.reconcile(this.entitiesForTenant(projection.tenantId)) } } @@ -733,14 +786,20 @@ export class EntityProjectorTenantFacade implements EntityBridgeCoordinator { async stop(): Promise {} - async register(tagsInput: unknown): Promise<{ + async register( + tagsInput: unknown, + principalUrl: string, + principalKind: string + ): Promise<{ sourceRef: string streamUrl: string }> { return await this.projector.register( this.tenantId, this.registry, - tagsInput + tagsInput, + principalUrl, + principalKind ) } diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index a118ff30d8..aac94d971b 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -1,14 +1,19 @@ -import { and, desc, eq, lt, ne, sql } from 'drizzle-orm' +import { and, desc, eq, inArray, lt, ne, sql } from 'drizzle-orm' import { buildTagsIndex, normalizeTags } from '@electric-ax/agents-runtime' import { consumerClaims, entities, + entityEffectivePermissions, entityBridges, entityDispatchState, entityManifestSources, + entityLineage, + entityPermissionGrants, entityTypes, + entityTypePermissionGrants, runnerRuntimeDiagnostics, runners, + sharedStateLinks, tagStreamOutbox, } from './db/schema.js' import { @@ -30,6 +35,12 @@ import type { SourceStreamOffset, ConsumerClaim, DispatchPolicy, + EntityPermission, + EntityPermissionGrant, + EntityPermissionPropagation, + EntityTypePermission, + EntityTypePermissionGrant, + PermissionSubjectKind, } from './electric-agents-types.js' import type { EntityTags } from '@electric-ax/agents-runtime' @@ -51,6 +62,8 @@ export interface EntityBridgeRow { sourceRef: string tags: EntityTags streamUrl: string + principalUrl?: string + principalKind?: string shapeHandle?: string shapeOffset?: string lastObserverActivityAt: Date @@ -135,13 +148,43 @@ export interface MaterializeReleasedClaimInput { releasedAt?: Date } +export interface CreateEntityTypePermissionGrantInput { + entityType: string + permission: EntityTypePermission + subjectKind: PermissionSubjectKind + subjectValue: string + createdBy?: string + expiresAt?: Date +} + +export interface CreateEntityPermissionGrantInput { + entityUrl: string + permission: EntityPermission + subjectKind: PermissionSubjectKind + subjectValue: string + propagation?: EntityPermissionPropagation + copyToChildren?: boolean + createdBy?: string + expiresAt?: Date +} + const DEFAULT_RUNNER_LEASE_MS = 30_000 +const PERMISSION_PRUNE_INTERVAL_MS = 30_000 + +type RegistryTransaction = Parameters< + Parameters[0] +>[0] export function runnerWakeStream(runnerId: string): string { return `/runners/${runnerId}/wake` } export class PostgresRegistry { + // Electric predicates cannot depend on now(), so expired effective rows need a + // server-side sweep. Debounce it to keep read/auth paths from writing on every request. + private lastPermissionPruneStartedAt = 0 + private permissionPrunePromise: Promise | null = null + constructor( private db: DrizzleDB, readonly tenantId: string = DEFAULT_TENANT_ID @@ -699,6 +742,67 @@ export class PostgresRegistry { }) .onConflictDoNothing() + await tx + .insert(entityLineage) + .values({ + tenantId: this.tenantId, + ancestorUrl: entity.url, + descendantUrl: entity.url, + depth: 0, + }) + .onConflictDoNothing() + + if (entity.parent) { + await tx.execute(sql` + INSERT INTO ${entityLineage} ( + tenant_id, + ancestor_url, + descendant_url, + depth + ) + SELECT + ${this.tenantId}, + ancestor_url, + ${entity.url}, + depth + 1 + FROM ${entityLineage} + WHERE tenant_id = ${this.tenantId} + AND descendant_url = ${entity.parent} + ON CONFLICT DO NOTHING + `) + } + + await tx.execute(sql` + INSERT INTO ${entityEffectivePermissions} ( + tenant_id, + entity_url, + source_entity_url, + source_grant_id, + permission, + subject_kind, + subject_value, + expires_at + ) + SELECT + ${this.tenantId}, + ${entity.url}, + grants.entity_url, + grants.id, + grants.permission, + grants.subject_kind, + grants.subject_value, + grants.expires_at + FROM ${entityPermissionGrants} grants + JOIN ${entityLineage} lineage + ON lineage.tenant_id = grants.tenant_id + AND lineage.ancestor_url = grants.entity_url + AND lineage.descendant_url = ${entity.url} + WHERE grants.tenant_id = ${this.tenantId} + AND grants.propagation = 'descendants' + AND (grants.expires_at IS NULL OR grants.expires_at > now()) + ON CONFLICT DO NOTHING + `) + return parseInt(result[0]!.txid) }) } catch (err) { @@ -753,6 +857,11 @@ export class PostgresRegistry { limit?: number offset?: number created_by?: string + readableBy?: { + principalUrl: string + principalKind: string + bypass?: boolean + } }): Promise<{ entities: Array; total: number }> { const conditions = [eq(entities.tenantId, this.tenantId)] if (filter?.type) conditions.push(eq(entities.type, filter.type)) @@ -760,6 +869,25 @@ export class PostgresRegistry { if (filter?.parent) conditions.push(eq(entities.parent, filter.parent)) if (filter?.created_by) conditions.push(eq(entities.createdBy, filter.created_by)) + if (filter?.readableBy && !filter.readableBy.bypass) { + conditions.push(sql`( + ${entities.createdBy} = ${filter.readableBy.principalUrl} + OR ${entities.url} IN ( + SELECT ${entityEffectivePermissions.entityUrl} + FROM ${entityEffectivePermissions} + WHERE ${entityEffectivePermissions.tenantId} = ${this.tenantId} + AND ${entityEffectivePermissions.permission} IN ('read', 'manage') + AND (${entityEffectivePermissions.expiresAt} IS NULL OR ${entityEffectivePermissions.expiresAt} > now()) + AND ( + (${entityEffectivePermissions.subjectKind} = 'principal' + AND ${entityEffectivePermissions.subjectValue} = ${filter.readableBy.principalUrl}) + OR + (${entityEffectivePermissions.subjectKind} = 'principal_kind' + AND ${entityEffectivePermissions.subjectValue} = ${filter.readableBy.principalKind}) + ) + ) + )`) + } const whereClause = and(...conditions) @@ -790,6 +918,431 @@ export class PostgresRegistry { } } + async createEntityTypePermissionGrant( + input: CreateEntityTypePermissionGrantInput + ): Promise { + const [row] = await this.db + .insert(entityTypePermissionGrants) + .values({ + tenantId: this.tenantId, + entityType: input.entityType, + permission: input.permission, + subjectKind: input.subjectKind, + subjectValue: input.subjectValue, + createdBy: input.createdBy ?? null, + expiresAt: input.expiresAt ?? null, + }) + .returning() + return this.rowToEntityTypePermissionGrant(row!) + } + + async ensureEntityTypePermissionGrant( + input: CreateEntityTypePermissionGrantInput + ): Promise { + const [existing] = await this.db + .select() + .from(entityTypePermissionGrants) + .where( + and( + eq(entityTypePermissionGrants.tenantId, this.tenantId), + eq(entityTypePermissionGrants.entityType, input.entityType), + eq(entityTypePermissionGrants.permission, input.permission), + eq(entityTypePermissionGrants.subjectKind, input.subjectKind), + eq(entityTypePermissionGrants.subjectValue, input.subjectValue), + input.expiresAt + ? eq(entityTypePermissionGrants.expiresAt, input.expiresAt) + : sql`${entityTypePermissionGrants.expiresAt} IS NULL` + ) + ) + .limit(1) + if (existing) return this.rowToEntityTypePermissionGrant(existing) + + return await this.createEntityTypePermissionGrant(input) + } + + async listEntityTypePermissionGrants( + entityType: string + ): Promise> { + const rows = await this.db + .select() + .from(entityTypePermissionGrants) + .where( + and( + eq(entityTypePermissionGrants.tenantId, this.tenantId), + eq(entityTypePermissionGrants.entityType, entityType) + ) + ) + .orderBy(entityTypePermissionGrants.id) + return rows.map((row) => this.rowToEntityTypePermissionGrant(row)) + } + + async deleteEntityTypePermissionGrant( + entityType: string, + grantId: number + ): Promise { + const rows = await this.db + .delete(entityTypePermissionGrants) + .where( + and( + eq(entityTypePermissionGrants.tenantId, this.tenantId), + eq(entityTypePermissionGrants.entityType, entityType), + eq(entityTypePermissionGrants.id, grantId) + ) + ) + .returning({ id: entityTypePermissionGrants.id }) + return rows.length > 0 + } + + async hasEntityTypePermission( + entityType: string, + permission: EntityTypePermission, + subject: { principalUrl: string; principalKind: string } + ): Promise { + const permissions = [permission, `manage`] as const + const rows = await this.db + .select({ id: entityTypePermissionGrants.id }) + .from(entityTypePermissionGrants) + .where( + and( + eq(entityTypePermissionGrants.tenantId, this.tenantId), + eq(entityTypePermissionGrants.entityType, entityType), + inArray(entityTypePermissionGrants.permission, [...permissions]), + sql`(${entityTypePermissionGrants.expiresAt} IS NULL OR ${entityTypePermissionGrants.expiresAt} > now())`, + sql`( + (${entityTypePermissionGrants.subjectKind} = 'principal' + AND ${entityTypePermissionGrants.subjectValue} = ${subject.principalUrl}) + OR + (${entityTypePermissionGrants.subjectKind} = 'principal_kind' + AND ${entityTypePermissionGrants.subjectValue} = ${subject.principalKind}) + )` + ) + ) + .limit(1) + return rows.length > 0 + } + + async createEntityPermissionGrant( + input: CreateEntityPermissionGrantInput + ): Promise { + return await this.db.transaction(async (tx) => { + const [row] = await tx + .insert(entityPermissionGrants) + .values({ + tenantId: this.tenantId, + entityUrl: input.entityUrl, + permission: input.permission, + subjectKind: input.subjectKind, + subjectValue: input.subjectValue, + propagation: input.propagation ?? `self`, + copyToChildren: input.copyToChildren ?? false, + createdBy: input.createdBy ?? null, + expiresAt: input.expiresAt ?? null, + }) + .returning() + await this.materializeEntityPermissionGrant(tx, row!) + return this.rowToEntityPermissionGrant(row!) + }) + } + + async listEntityPermissionGrants( + entityUrl: string + ): Promise> { + const rows = await this.db + .select() + .from(entityPermissionGrants) + .where( + and( + eq(entityPermissionGrants.tenantId, this.tenantId), + eq(entityPermissionGrants.entityUrl, entityUrl) + ) + ) + .orderBy(entityPermissionGrants.id) + return rows.map((row) => this.rowToEntityPermissionGrant(row)) + } + + async deleteEntityPermissionGrant( + entityUrl: string, + grantId: number + ): Promise { + return await this.db.transaction(async (tx) => { + await tx + .delete(entityEffectivePermissions) + .where( + and( + eq(entityEffectivePermissions.tenantId, this.tenantId), + eq(entityEffectivePermissions.sourceGrantId, grantId) + ) + ) + const rows = await tx + .delete(entityPermissionGrants) + .where( + and( + eq(entityPermissionGrants.tenantId, this.tenantId), + eq(entityPermissionGrants.entityUrl, entityUrl), + eq(entityPermissionGrants.id, grantId) + ) + ) + .returning({ id: entityPermissionGrants.id }) + return rows.length > 0 + }) + } + + async copyEntityPermissionGrantsForSpawn( + parentEntityUrl: string, + childEntityUrl: string, + createdBy?: string + ): Promise> { + const parentGrants = await this.db + .select() + .from(entityPermissionGrants) + .where( + and( + eq(entityPermissionGrants.tenantId, this.tenantId), + eq(entityPermissionGrants.entityUrl, parentEntityUrl), + eq(entityPermissionGrants.copyToChildren, true), + sql`(${entityPermissionGrants.expiresAt} IS NULL OR ${entityPermissionGrants.expiresAt} > now())` + ) + ) + + const copied: Array = [] + for (const grant of parentGrants) { + copied.push( + await this.createEntityPermissionGrant({ + entityUrl: childEntityUrl, + permission: grant.permission as EntityPermission, + subjectKind: grant.subjectKind as PermissionSubjectKind, + subjectValue: grant.subjectValue, + propagation: `self`, + copyToChildren: grant.copyToChildren, + createdBy, + expiresAt: grant.expiresAt ?? undefined, + }) + ) + } + return copied + } + + async hasEntityPermission( + entityUrl: string, + permission: EntityPermission, + subject: { principalUrl: string; principalKind: string } + ): Promise { + const permissions = [permission, `manage`] as const + const rows = await this.db + .select({ id: entityEffectivePermissions.id }) + .from(entityEffectivePermissions) + .where( + and( + eq(entityEffectivePermissions.tenantId, this.tenantId), + eq(entityEffectivePermissions.entityUrl, entityUrl), + inArray(entityEffectivePermissions.permission, [...permissions]), + sql`(${entityEffectivePermissions.expiresAt} IS NULL OR ${entityEffectivePermissions.expiresAt} > now())`, + sql`( + (${entityEffectivePermissions.subjectKind} = 'principal' + AND ${entityEffectivePermissions.subjectValue} = ${subject.principalUrl}) + OR + (${entityEffectivePermissions.subjectKind} = 'principal_kind' + AND ${entityEffectivePermissions.subjectValue} = ${subject.principalKind}) + )` + ) + ) + .limit(1) + return rows.length > 0 + } + + async replaceSharedStateLink( + ownerEntityUrl: string, + manifestKey: string, + sharedStateId?: string + ): Promise { + await this.db + .delete(sharedStateLinks) + .where( + and( + eq(sharedStateLinks.tenantId, this.tenantId), + eq(sharedStateLinks.ownerEntityUrl, ownerEntityUrl), + eq(sharedStateLinks.manifestKey, manifestKey) + ) + ) + + if (!sharedStateId) return + + await this.db + .insert(sharedStateLinks) + .values({ + tenantId: this.tenantId, + ownerEntityUrl, + manifestKey, + sharedStateId, + }) + .onConflictDoUpdate({ + target: [ + sharedStateLinks.tenantId, + sharedStateLinks.ownerEntityUrl, + sharedStateLinks.manifestKey, + ], + set: { + sharedStateId, + updatedAt: new Date(), + }, + }) + } + + async listSharedStateLinkedEntityUrls( + sharedStateId: string + ): Promise> { + const rows = await this.db + .selectDistinct({ ownerEntityUrl: sharedStateLinks.ownerEntityUrl }) + .from(sharedStateLinks) + .where( + and( + eq(sharedStateLinks.tenantId, this.tenantId), + eq(sharedStateLinks.sharedStateId, sharedStateId) + ) + ) + return rows.map((row) => row.ownerEntityUrl) + } + + async pruneExpiredPermissionGrants( + now: Date = new Date(), + options: { force?: boolean } = {} + ): Promise { + if (this.permissionPrunePromise) return await this.permissionPrunePromise + + const startedAt = Date.now() + if ( + !options.force && + startedAt - this.lastPermissionPruneStartedAt < + PERMISSION_PRUNE_INTERVAL_MS + ) { + return + } + + this.lastPermissionPruneStartedAt = startedAt + const promise = this.pruneExpiredPermissionGrantsNow(now) + this.permissionPrunePromise = promise + try { + await promise + } catch (error) { + this.lastPermissionPruneStartedAt = 0 + throw error + } finally { + if (this.permissionPrunePromise === promise) { + this.permissionPrunePromise = null + } + } + } + + private async pruneExpiredPermissionGrantsNow(now: Date): Promise { + await this.db.transaction(async (tx) => { + const expiredEntityGrantIds = await tx + .select({ id: entityPermissionGrants.id }) + .from(entityPermissionGrants) + .where( + and( + eq(entityPermissionGrants.tenantId, this.tenantId), + sql`${entityPermissionGrants.expiresAt} IS NOT NULL`, + lt(entityPermissionGrants.expiresAt, now) + ) + ) + const ids = expiredEntityGrantIds.map((row) => row.id) + if (ids.length > 0) { + await tx + .delete(entityEffectivePermissions) + .where( + and( + eq(entityEffectivePermissions.tenantId, this.tenantId), + inArray(entityEffectivePermissions.sourceGrantId, ids) + ) + ) + await tx + .delete(entityPermissionGrants) + .where( + and( + eq(entityPermissionGrants.tenantId, this.tenantId), + inArray(entityPermissionGrants.id, ids) + ) + ) + } + + await tx + .delete(entityEffectivePermissions) + .where( + and( + eq(entityEffectivePermissions.tenantId, this.tenantId), + sql`${entityEffectivePermissions.expiresAt} IS NOT NULL`, + lt(entityEffectivePermissions.expiresAt, now) + ) + ) + await tx + .delete(entityTypePermissionGrants) + .where( + and( + eq(entityTypePermissionGrants.tenantId, this.tenantId), + sql`${entityTypePermissionGrants.expiresAt} IS NOT NULL`, + lt(entityTypePermissionGrants.expiresAt, now) + ) + ) + }) + } + + private async materializeEntityPermissionGrant( + tx: RegistryTransaction, + grant: typeof entityPermissionGrants.$inferSelect + ): Promise { + await tx + .delete(entityEffectivePermissions) + .where( + and( + eq(entityEffectivePermissions.tenantId, this.tenantId), + eq(entityEffectivePermissions.sourceGrantId, grant.id) + ) + ) + + if (grant.propagation === `descendants`) { + await tx.execute(sql` + INSERT INTO ${entityEffectivePermissions} ( + tenant_id, + entity_url, + source_entity_url, + source_grant_id, + permission, + subject_kind, + subject_value, + expires_at + ) + SELECT + ${this.tenantId}, + descendant_url, + ${grant.entityUrl}, + ${grant.id}, + ${grant.permission}, + ${grant.subjectKind}, + ${grant.subjectValue}, + ${grant.expiresAt} + FROM ${entityLineage} + WHERE tenant_id = ${this.tenantId} + AND ancestor_url = ${grant.entityUrl} + ON CONFLICT DO NOTHING + `) + return + } + + await tx + .insert(entityEffectivePermissions) + .values({ + tenantId: this.tenantId, + entityUrl: grant.entityUrl, + sourceEntityUrl: grant.entityUrl, + sourceGrantId: grant.id, + permission: grant.permission, + subjectKind: grant.subjectKind, + subjectValue: grant.subjectValue, + expiresAt: grant.expiresAt, + }) + .onConflictDoNothing() + } + async updateStatus(entityUrl: string, status: EntityStatus): Promise { const whereClause = isTerminalEntityStatus(status) ? this.entityWhere(entityUrl) @@ -953,6 +1506,8 @@ export class PostgresRegistry { sourceRef: string tags: EntityTags streamUrl: string + principalUrl: string + principalKind: string }): Promise { await this.db .insert(entityBridges) @@ -961,6 +1516,8 @@ export class PostgresRegistry { sourceRef: row.sourceRef, tags: normalizeTags(row.tags), streamUrl: row.streamUrl, + principalUrl: row.principalUrl, + principalKind: row.principalKind, }) .onConflictDoNothing() @@ -1271,6 +1828,40 @@ export class PostgresRegistry { } } + private rowToEntityTypePermissionGrant( + row: typeof entityTypePermissionGrants.$inferSelect + ): EntityTypePermissionGrant { + return { + id: row.id, + entity_type: row.entityType, + permission: row.permission as EntityTypePermission, + subject_kind: row.subjectKind as PermissionSubjectKind, + subject_value: row.subjectValue, + created_by: row.createdBy ?? undefined, + expires_at: row.expiresAt?.toISOString(), + created_at: row.createdAt.toISOString(), + updated_at: row.updatedAt.toISOString(), + } + } + + private rowToEntityPermissionGrant( + row: typeof entityPermissionGrants.$inferSelect + ): EntityPermissionGrant { + return { + id: row.id, + entity_url: row.entityUrl, + permission: row.permission as EntityPermission, + subject_kind: row.subjectKind as PermissionSubjectKind, + subject_value: row.subjectValue, + propagation: row.propagation as EntityPermissionPropagation, + copy_to_children: row.copyToChildren, + created_by: row.createdBy ?? undefined, + expires_at: row.expiresAt?.toISOString(), + created_at: row.createdAt.toISOString(), + updated_at: row.updatedAt.toISOString(), + } + } + private rowToEntity(row: typeof entities.$inferSelect): ElectricAgentsEntity { return { url: row.url, @@ -1311,6 +1902,8 @@ export class PostgresRegistry { sourceRef: row.sourceRef, tags: (row.tags as EntityTags | null | undefined) ?? {}, streamUrl: row.streamUrl, + principalUrl: row.principalUrl ?? undefined, + principalKind: row.principalKind ?? undefined, shapeHandle: row.shapeHandle ?? undefined, shapeOffset: row.shapeOffset ?? undefined, lastObserverActivityAt: row.lastObserverActivityAt, diff --git a/packages/agents-server/src/index.ts b/packages/agents-server/src/index.ts index 7d72b7233b..e411f78771 100644 --- a/packages/agents-server/src/index.ts +++ b/packages/agents-server/src/index.ts @@ -43,6 +43,12 @@ export type { ElectricAgentsEntity, ElectricAgentsEntityRow, ElectricAgentsEntityType, + EntityPermission, + EntityPermissionGrant, + EntityPermissionPropagation, + EntityTypePermission, + EntityTypePermissionGrant, + EntityTypePermissionGrantInput, EntityStatus, EntitySignal, PublicElectricAgentsEntity, @@ -52,6 +58,11 @@ export type { SignalRequest, SignalResponse, TypedSpawnRequest, + PermissionSubject, + PermissionSubjectKind, + AuthorizationDecision, + AuthorizationResource, + AuthorizeRequest, } from './electric-agents-types.js' export type { EventSourceBucket, diff --git a/packages/agents-server/src/permissions.ts b/packages/agents-server/src/permissions.ts new file mode 100644 index 0000000000..eb1ae65b6e --- /dev/null +++ b/packages/agents-server/src/permissions.ts @@ -0,0 +1,239 @@ +import { isBuiltInSystemPrincipalUrl } from './principal.js' +import type { + AuthorizeRequest, + ElectricAgentsEntity, + ElectricAgentsEntityType, + EntityPermission, + RegisterEntityTypeRequest, + EntityTypePermission, +} from './electric-agents-types.js' +import type { TenantContext } from './routing/context.js' +import { serverLog } from './utils/log.js' + +const authzDecisionCache = new WeakMap< + AuthorizeRequest, + Map +>() + +export function principalSubject(principal: { url: string; kind: string }): { + principalUrl: string + principalKind: string +} { + return { principalUrl: principal.url, principalKind: principal.kind } +} + +export function isPermissionBypassPrincipal(ctx: TenantContext): boolean { + return isBuiltInSystemPrincipalUrl(ctx.principal.url) +} + +export async function canAccessEntity( + ctx: TenantContext, + entity: ElectricAgentsEntity, + permission: EntityPermission, + request?: Request +): Promise { + if (isPermissionBypassPrincipal(ctx)) return true + await ctx.entityManager.registry.pruneExpiredPermissionGrants?.() + + const builtInAllowed = + entity.created_by === ctx.principal.url || + (await ctx.entityManager.registry.hasEntityPermission( + entity.url, + permission, + principalSubject(ctx.principal) + )) + + return await applyAuthorizationHook(ctx, { + verb: permission, + resourceKey: `entity:${entity.url}`, + resource: { kind: `entity`, entity }, + builtInAllowed, + request, + }) +} + +export async function canAccessEntityType( + ctx: TenantContext, + entityType: ElectricAgentsEntityType, + permission: EntityTypePermission, + request?: Request +): Promise { + if (isPermissionBypassPrincipal(ctx)) return true + await ctx.entityManager.registry.pruneExpiredPermissionGrants?.() + + const builtInAllowed = + await ctx.entityManager.registry.hasEntityTypePermission( + entityType.name, + permission, + principalSubject(ctx.principal) + ) + + return await applyAuthorizationHook(ctx, { + verb: permission, + resourceKey: `entity_type:${entityType.name}`, + resource: { kind: `entity_type`, entityType }, + builtInAllowed, + request, + }) +} + +export async function canRegisterEntityType( + ctx: TenantContext, + input: Pick, + request?: Request +): Promise { + if (isPermissionBypassPrincipal(ctx)) return true + + return await applyAuthorizationHook(ctx, { + verb: `manage`, + resourceKey: `entity_type_registration:${input.name}`, + resource: { + kind: `entity_type_registration`, + entityTypeName: input.name, + }, + builtInAllowed: true, + request, + }) +} + +export async function canAccessSharedState( + ctx: TenantContext, + sharedStateId: string, + permission: `read` | `write`, + request?: Request, + ownerEntityUrl?: string +): Promise { + if (isPermissionBypassPrincipal(ctx)) return true + await ctx.entityManager.registry.pruneExpiredPermissionGrants?.() + + const storedLinkedEntityUrls = + await ctx.entityManager.registry.listSharedStateLinkedEntityUrls( + sharedStateId + ) + const bootstrapEntityUrls = + storedLinkedEntityUrls.length === 0 && ownerEntityUrl + ? [ownerEntityUrl] + : [] + const linkedEntityUrls = [ + ...new Set([...storedLinkedEntityUrls, ...bootstrapEntityUrls]), + ] + for (const entityUrl of linkedEntityUrls) { + const entity = await ctx.entityManager.registry.getEntity(entityUrl) + if (!entity) continue + if ( + entity.created_by === ctx.principal.url || + (await ctx.entityManager.registry.hasEntityPermission( + entity.url, + permission, + principalSubject(ctx.principal) + )) + ) { + return await applyAuthorizationHook(ctx, { + verb: permission, + resourceKey: `shared_state:${sharedStateId}`, + resource: { + kind: `shared_state`, + sharedStateId, + linkedEntityUrls, + }, + builtInAllowed: true, + request, + }) + } + } + + return await applyAuthorizationHook(ctx, { + verb: permission, + resourceKey: `shared_state:${sharedStateId}`, + resource: { + kind: `shared_state`, + sharedStateId, + linkedEntityUrls, + }, + builtInAllowed: false, + request, + }) +} + +async function applyAuthorizationHook( + ctx: TenantContext, + input: { + verb: EntityPermission | EntityTypePermission + resourceKey: string + resource: Parameters[0][`resource`] + builtInAllowed: boolean + request?: Request + } +): Promise { + const hook = ctx.authorizeRequest + if (!hook) return input.builtInAllowed + + const cacheKey = [ + ctx.service, + ctx.principal.url, + input.verb, + input.resourceKey, + ].join(`|`) + const cached = getCachedDecision(hook, cacheKey) + if (cached) return cached.decision === `allow` + + let decision: Awaited> + try { + decision = await hook({ + tenant: ctx.service, + principal: ctx.principal, + verb: input.verb, + resource: input.resource, + request: input.request ? requestMetadata(input.request) : undefined, + builtInAllowed: input.builtInAllowed, + }) + } catch (error) { + serverLog.warn(`[agent-server] authorization hook failed:`, error) + return false + } + + cacheDecision(hook, cacheKey, decision) + return decision.decision === `allow` +} + +function getCachedDecision( + hook: AuthorizeRequest, + cacheKey: string +): { decision: `allow` | `deny` } | null { + const cache = authzDecisionCache.get(hook) + const entry = cache?.get(cacheKey) + if (!entry) return null + if (entry.expiresAt <= Date.now()) { + cache?.delete(cacheKey) + return null + } + return { decision: entry.decision } +} + +function cacheDecision( + hook: AuthorizeRequest, + cacheKey: string, + decision: Awaited> +): void { + if (!decision.expires_at) return + const expiresAt = Date.parse(decision.expires_at) + if (!Number.isFinite(expiresAt) || expiresAt <= Date.now()) return + let cache = authzDecisionCache.get(hook) + if (!cache) { + cache = new Map() + authzDecisionCache.set(hook, cache) + } + cache.set(cacheKey, { decision: decision.decision, expiresAt }) +} + +function requestMetadata(request: Request): { + method: string + url: string + headers: Record +} { + const headers: Record = {} + request.headers.forEach((value, key) => { + headers[key] = value + }) + return { method: request.method, url: request.url, headers } +} diff --git a/packages/agents-server/src/routing/context.ts b/packages/agents-server/src/routing/context.ts index 3ccae920bb..c81a048b5a 100644 --- a/packages/agents-server/src/routing/context.ts +++ b/packages/agents-server/src/routing/context.ts @@ -12,6 +12,7 @@ import type { DurableStreamsRoutingAdapter } from './durable-streams-routing-ada import type { Principal } from '../principal.js' import type { DurableStreamsBearerProvider } from '../stream-client.js' import type { WebhookSigner } from '../webhook-signing.js' +import type { AuthorizeRequest } from '../electric-agents-types.js' export interface EventSourceCatalog { listEventSources: () => @@ -55,5 +56,6 @@ export interface TenantContext { entityBridgeManager: EntityBridgeCoordinator eventSources?: EventSourceCatalog ensureEventSourceWakeSource?: (sourceUrl: string) => Promise | void + authorizeRequest?: AuthorizeRequest isShuttingDown: () => boolean } diff --git a/packages/agents-server/src/routing/durable-streams-router.ts b/packages/agents-server/src/routing/durable-streams-router.ts index ecd1b89f1c..781ddfca21 100644 --- a/packages/agents-server/src/routing/durable-streams-router.ts +++ b/packages/agents-server/src/routing/durable-streams-router.ts @@ -6,8 +6,16 @@ import { appendPathToUrl } from '@electric-ax/agents-runtime' import { Type, type Static } from '@sinclair/typebox' import { and, eq } from 'drizzle-orm' import { Router } from 'itty-router' -import { readRequestBody, responseHeaders } from '../electric-agents-http.js' +import { + apiError, + readRequestBody, + responseHeaders, +} from '../electric-agents-http.js' import { subscriptionWebhooks } from '../db/schema.js' +import { + ErrCodeNotFound, + ErrCodeUnauthorized, +} from '../electric-agents-types.js' import { createStreamAppendRouteRequest, electricAgentsStreamAppendRouter, @@ -15,6 +23,7 @@ import { import { validateBody } from './schema.js' import { rewriteLoopbackWebhookUrl } from '../utils/webhook-url.js' import { forwardFetchRequest } from '../utils/server-utils.js' +import { canAccessEntity, canAccessSharedState } from '../permissions.js' import { getDefaultWebhookSigner, webhookSigningMetadata, @@ -48,6 +57,8 @@ const subscriptionControlActions = [ `release`, ] as const +const SHARED_STATE_OWNER_ENTITY_HEADER = `electric-owner-entity` + export type DurableStreamsRoutes = RouterType< IRequest, [TenantContext], @@ -583,6 +594,8 @@ async function streamAppend( request: IRequest, ctx: TenantContext ): Promise { + const auth = await authorizeDurableStreamAccess(request, ctx) + if (auth) return auth return await electricAgentsStreamAppendRouter.fetch( createStreamAppendRouteRequest(request as Request), ctx.runtime, @@ -608,10 +621,9 @@ async function proxyPassThrough( request: IRequest, ctx: TenantContext ): Promise { + const auth = await authorizeDurableStreamAccess(request, ctx) + if (auth) return auth const streamPath = new URL(request.url).pathname - if (ctx.entityManager?.isAttachmentStreamPath(streamPath)) { - return new Response(null, { status: 404 }) - } const upstream = await forwardToDurableStreams(ctx, request) const method = request.method.toUpperCase() const endTrackedRead = @@ -627,3 +639,112 @@ async function proxyPassThrough( await endTrackedRead?.() } } + +async function authorizeDurableStreamAccess( + request: IRequest, + ctx: TenantContext +): Promise { + const method = request.method.toUpperCase() + const streamPath = new URL(request.url).pathname + + if (method === `GET` || method === `HEAD`) { + const registry = ctx.entityManager?.registry + const entity = registry?.getEntityByStream + ? await registry.getEntityByStream(streamPath) + : null + if (entity) { + if (await canAccessEntity(ctx, entity, `read`, request as Request)) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to read ${entity.url}` + ) + } + + const attachmentEntityUrl = entityUrlFromAttachmentStreamPath(streamPath) + if (attachmentEntityUrl) { + const attachmentEntity = registry?.getEntity + ? await registry.getEntity(attachmentEntityUrl) + : null + if (!attachmentEntity) { + return apiError(404, ErrCodeNotFound, `Entity not found`) + } + if ( + await canAccessEntity(ctx, attachmentEntity, `read`, request as Request) + ) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to read ${attachmentEntity.url}` + ) + } + } + + const sharedStateId = sharedStateIdFromPath(streamPath) + if (!sharedStateId) { + // Durable Streams also hosts non-Agents utility streams. Entity streams, + // attachment streams, and shared-state streams are guarded above; paths that + // do not match those resource classes are intentionally passed through. + return undefined + } + + if (method === `GET` || method === `HEAD`) { + if ( + await canAccessSharedState(ctx, sharedStateId, `read`, request as Request) + ) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to read shared state` + ) + } + + if (method === `PUT` || method === `POST`) { + const ownerEntityUrl = + request.headers.get(SHARED_STATE_OWNER_ENTITY_HEADER)?.trim() || undefined + if ( + await canAccessSharedState( + ctx, + sharedStateId, + `write`, + request as Request, + ownerEntityUrl + ) + ) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to write shared state` + ) + } + + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to access shared state` + ) +} + +function entityUrlFromAttachmentStreamPath(path: string): string | null { + const match = path.match(/^\/([^/]+)\/([^/]+)\/attachments\/[^/]+$/) + if (!match) return null + return `/${match[1]}/${match[2]}` +} + +function sharedStateIdFromPath(path: string): string | null { + const match = path.match(/^\/_electric\/shared-state\/([^/]+)$/) + if (!match) return null + try { + return decodeURIComponent(match[1]!) + } catch { + return match[1]! + } +} diff --git a/packages/agents-server/src/routing/electric-proxy-router.ts b/packages/agents-server/src/routing/electric-proxy-router.ts index 5142d1fc39..2db10913f7 100644 --- a/packages/agents-server/src/routing/electric-proxy-router.ts +++ b/packages/agents-server/src/routing/electric-proxy-router.ts @@ -5,6 +5,7 @@ import { Router } from 'itty-router' import { apiError, responseHeaders } from '../electric-agents-http.js' +import { isPermissionBypassPrincipal } from '../permissions.js' import { buildElectricProxyTarget } from '../utils/server-utils.js' import type { IRequest, RouterType } from 'itty-router' import type { TenantContext } from './context.js' @@ -33,12 +34,15 @@ async function proxyElectric( return apiError(500, `ELECTRIC_PROXY_FAILED`, `Electric URL not configured`) } + await ctx.entityManager.registry.pruneExpiredPermissionGrants?.() const target = buildElectricProxyTarget({ incomingUrl: new URL(request.url), electricUrl: ctx.electricUrl, electricSecret: ctx.electricSecret, tenantId: ctx.service, principalUrl: ctx.principal.url, + principalKind: ctx.principal.kind, + permissionBypass: isPermissionBypassPrincipal(ctx), }) const headers = new Headers(request.headers) headers.delete(`host`) diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index b879b97c5c..71a70ae297 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -16,6 +16,7 @@ import { ErrCodeNotFound, ErrCodeUnknownEntityType, ErrCodeInvalidRequest, + ErrCodeUnauthorized, toPublicEntity, } from '../electric-agents-types.js' import { @@ -27,8 +28,18 @@ import { unlinkEntityDispatchSubscription, } from './dispatch-policy.js' import { ElectricAgentsError } from '../entity-manager.js' +import { + canAccessEntity, + canAccessEntityType, + isPermissionBypassPrincipal, + principalSubject, +} from '../permissions.js' import { routeBody, withSchema } from './schema.js' -import type { ElectricAgentsEntity } from '../electric-agents-types.js' +import type { + ElectricAgentsEntity, + ElectricAgentsEntityType, + EntityPermission, +} from '../electric-agents-types.js' import type { JsonRouteRequest } from './schema.js' import type { RouterType } from 'itty-router' import type { TenantContext } from './context.js' @@ -36,9 +47,11 @@ import type { EventSourceSubscriptionInput } from '@electric-ax/agents-runtime' interface AgentsRouteRequest extends JsonRouteRequest { entityRoute?: ExistingEntityRoute + spawnRoute?: SpawnableEntityRoute } type ExistingEntityRoute = { entityUrl: string; entity: ElectricAgentsEntity } +type SpawnableEntityRoute = { entityType: ElectricAgentsEntityType } type AgentsRouteArgs = [TenantContext] type AgentsRouteResult = Response | undefined @@ -78,6 +91,41 @@ const wakeConditionSchema = Type.Union([ }), ]) +const permissionSubjectSchema = Type.Object( + { + subject_kind: Type.Union([ + Type.Literal(`principal`), + Type.Literal(`principal_kind`), + ]), + subject_value: Type.String(), + }, + { additionalProperties: false } +) + +const entityPermissionSchema = Type.Union([ + Type.Literal(`read`), + Type.Literal(`write`), + Type.Literal(`delete`), + Type.Literal(`signal`), + Type.Literal(`fork`), + Type.Literal(`schedule`), + Type.Literal(`spawn`), + Type.Literal(`manage`), +]) + +const entityPermissionGrantInputSchema = Type.Object( + { + ...permissionSubjectSchema.properties, + permission: entityPermissionSchema, + propagation: Type.Optional( + Type.Union([Type.Literal(`self`), Type.Literal(`descendants`)]) + ), + copy_to_children: Type.Optional(Type.Boolean()), + expires_at: Type.Optional(Type.String()), + }, + { additionalProperties: false } +) + const spawnBodySchema = Type.Object({ args: Type.Optional(Type.Record(Type.String(), Type.Unknown())), tags: Type.Optional(stringRecordSchema), @@ -85,6 +133,7 @@ const spawnBodySchema = Type.Object({ dispatch_policy: Type.Optional(dispatchPolicySchema), sandbox: Type.Optional(sandboxChoiceSchema), initialMessage: Type.Optional(Type.Unknown()), + grants: Type.Optional(Type.Array(entityPermissionGrantInputSchema)), wake: Type.Optional( Type.Object({ subscriberUrl: Type.String(), @@ -215,6 +264,9 @@ type ScheduleBody = Static type EventSourceSubscriptionBody = Static< typeof eventSourceSubscriptionBodySchema > +type EntityPermissionGrantInput = Static< + typeof entityPermissionGrantInputSchema +> type AttachmentSubjectType = `inbox` | `run` | `text` | `tool_call` | `context` type AttachmentRole = `input` | `output` type ParsedAttachmentForm = { @@ -248,88 +300,137 @@ entitiesRouter.put( `/:type/:instanceId`, withSpawnableEntityType, withSchema(spawnBodySchema), + withSpawnPermission, spawnEntity ) -entitiesRouter.get(`/:type/:instanceId`, withExistingEntity, getEntity) -entitiesRouter.head(`/:type/:instanceId`, withExistingEntity, headEntity) -entitiesRouter.delete(`/:type/:instanceId`, withExistingEntity, killEntity) +entitiesRouter.get( + `/:type/:instanceId`, + withExistingEntity, + withEntityPermission(`read`), + getEntity +) +entitiesRouter.head( + `/:type/:instanceId`, + withExistingEntity, + withEntityPermission(`read`), + headEntity +) +entitiesRouter.delete( + `/:type/:instanceId`, + withExistingEntity, + withEntityPermission(`delete`), + killEntity +) entitiesRouter.post( `/:type/:instanceId/signal`, withExistingEntity, withSchema(signalBodySchema), + withEntityPermission(`signal`), signalEntity ) entitiesRouter.post( `/:type/:instanceId/send`, withExistingEntity, withSchema(sendBodySchema), + withEntityPermission(`write`), sendEntity ) entitiesRouter.post( `/:type/:instanceId/attachments`, withExistingEntity, + withEntityPermission(`write`), createAttachment ) entitiesRouter.get( `/:type/:instanceId/attachments/:attachmentId`, withExistingEntity, + withEntityPermission(`read`), readAttachment ) entitiesRouter.delete( `/:type/:instanceId/attachments/:attachmentId`, withExistingEntity, + withEntityPermission(`write`), deleteAttachment ) entitiesRouter.patch( `/:type/:instanceId/inbox/:messageKey`, withExistingEntity, withSchema(inboxMessageBodySchema), + withEntityPermission(`write`), updateInboxMessage ) entitiesRouter.delete( `/:type/:instanceId/inbox/:messageKey`, withExistingEntity, + withEntityPermission(`write`), deleteInboxMessage ) entitiesRouter.post( `/:type/:instanceId/fork`, withExistingEntity, withSchema(forkBodySchema), + withEntityPermission(`fork`), forkEntity ) entitiesRouter.post( `/:type/:instanceId/tags/:tagKey`, withExistingEntity, withSchema(setTagBodySchema), + withEntityPermission(`write`), setTag ) entitiesRouter.delete( `/:type/:instanceId/tags/:tagKey`, withExistingEntity, + withEntityPermission(`write`), deleteTag ) entitiesRouter.put( `/:type/:instanceId/schedules/:scheduleId`, withExistingEntity, withSchema(scheduleBodySchema), + withEntityPermission(`schedule`), upsertSchedule ) entitiesRouter.delete( `/:type/:instanceId/schedules/:scheduleId`, withExistingEntity, + withEntityPermission(`schedule`), deleteSchedule ) entitiesRouter.put( `/:type/:instanceId/event-source-subscriptions/:subscriptionId`, withExistingEntity, withSchema(eventSourceSubscriptionBodySchema), + withEntityPermission(`write`), upsertEventSourceSubscription ) entitiesRouter.delete( `/:type/:instanceId/event-source-subscriptions/:subscriptionId`, withExistingEntity, + withEntityPermission(`write`), deleteEventSourceSubscription ) +entitiesRouter.get( + `/:type/:instanceId/grants`, + withExistingEntity, + withEntityPermission(`manage`), + listEntityPermissionGrants +) +entitiesRouter.post( + `/:type/:instanceId/grants`, + withExistingEntity, + withSchema(entityPermissionGrantInputSchema), + withEntityPermission(`manage`), + createEntityPermissionGrant +) +entitiesRouter.delete( + `/:type/:instanceId/grants/:grantId`, + withExistingEntity, + withEntityPermission(`manage`), + deleteEntityPermissionGrant +) function entityUrlFromSegments( type: string, @@ -503,6 +604,31 @@ function rejectPrincipalEntityMutation( ) } +function parseExpiresAt(value: string | undefined): Date | undefined { + if (value === undefined) return undefined + const expiresAt = new Date(value) + if (Number.isNaN(expiresAt.getTime())) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Invalid expires_at timestamp`, + 400 + ) + } + return expiresAt +} + +function parseGrantId(request: AgentsRouteRequest): number { + const grantId = Number.parseInt(String(request.params.grantId), 10) + if (!Number.isSafeInteger(grantId) || grantId <= 0) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Invalid grant id`, + 400 + ) + } + return grantId +} + async function withExistingEntity( request: AgentsRouteRequest, ctx: TenantContext @@ -574,9 +700,96 @@ async function withSpawnableEntityType( ) } + request.spawnRoute = { entityType } return undefined } +function withEntityPermission(permission: EntityPermission) { + return async ( + request: AgentsRouteRequest, + ctx: TenantContext + ): Promise => { + const { entity } = requireExistingEntityRoute(request) + if (await canAccessEntity(ctx, entity, permission, request as Request)) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to ${permission} ${entity.url}` + ) + } +} + +async function withSpawnPermission( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const parsed = routeBody(request) + const entityType = request.spawnRoute?.entityType + if (!entityType) { + throw new Error(`spawnable entity type middleware did not run`) + } + + if ( + !(await canAccessEntityType(ctx, entityType, `spawn`, request as Request)) + ) { + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to spawn ${entityType.name}` + ) + } + + if (!parsed.parent) return undefined + + const parent = await ctx.entityManager.registry.getEntity(parsed.parent) + if (!parent) { + return apiError(404, ErrCodeNotFound, `Parent entity not found`) + } + if (await canAccessEntity(ctx, parent, `spawn`, request as Request)) { + return await validateParentedSpawnGrants(request, ctx, parent, parsed) + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to spawn children from ${parent.url}` + ) +} + +async function validateParentedSpawnGrants( + request: AgentsRouteRequest, + ctx: TenantContext, + parent: ElectricAgentsEntity, + parsed: SpawnBody +): Promise { + const needsParentManage = (parsed.grants ?? []).some( + requiresParentManageForInitialGrant + ) + if (!needsParentManage) return undefined + + if (await canAccessEntity(ctx, parent, `manage`, request as Request)) { + return undefined + } + + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to delegate broad grants from ${parent.url}` + ) +} + +function requiresParentManageForInitialGrant( + grant: EntityPermissionGrantInput +): boolean { + return ( + grant.permission === `manage` || + grant.subject_kind === `principal_kind` || + grant.propagation === `descendants` || + grant.copy_to_children === true + ) +} + async function listEntities( { query }: AgentsRouteRequest, ctx: TenantContext @@ -586,10 +799,61 @@ async function listEntities( status: firstQueryValue(query.status), parent: firstQueryValue(query.parent), created_by: firstQueryValue(query.created_by), + readableBy: { + ...principalSubject(ctx.principal), + bypass: isPermissionBypassPrincipal(ctx), + }, }) return json(entities.map((entity) => toPublicEntity(entity))) } +async function listEntityPermissionGrants( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const { entityUrl } = requireExistingEntityRoute(request) + const grants = + await ctx.entityManager.registry.listEntityPermissionGrants(entityUrl) + return json({ grants }) +} + +async function createEntityPermissionGrant( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const { entityUrl } = requireExistingEntityRoute(request) + const parsed = routeBody(request) + const grant = await ctx.entityManager.registry.createEntityPermissionGrant({ + entityUrl, + permission: parsed.permission, + subjectKind: parsed.subject_kind, + subjectValue: parsed.subject_value, + propagation: parsed.propagation, + copyToChildren: parsed.copy_to_children, + expiresAt: parseExpiresAt(parsed.expires_at), + createdBy: ctx.principal.url, + }) + await ctx.entityBridgeManager.onEntityChanged(entityUrl) + return json(grant, { status: 201 }) +} + +async function deleteEntityPermissionGrant( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const { entityUrl } = requireExistingEntityRoute(request) + const deleted = await ctx.entityManager.registry.deleteEntityPermissionGrant( + entityUrl, + parseGrantId(request) + ) + if (deleted) { + await ctx.entityBridgeManager.onEntityChanged(entityUrl) + } + return deleted + ? status(204) + : apiError(404, ErrCodeNotFound, `Grant not found`) +} + async function upsertSchedule( request: AgentsRouteRequest, ctx: TenantContext @@ -991,6 +1255,25 @@ async function spawnEntity( wake: parsed.wake, created_by: principal.url, }) + if (parsed.parent) { + await ctx.entityManager.registry.copyEntityPermissionGrantsForSpawn( + parsed.parent, + entity.url, + principal.url + ) + } + for (const grant of parsed.grants ?? []) { + await ctx.entityManager.registry.createEntityPermissionGrant({ + entityUrl: entity.url, + permission: grant.permission, + subjectKind: grant.subject_kind, + subjectValue: grant.subject_value, + propagation: grant.propagation, + copyToChildren: grant.copy_to_children, + expiresAt: parseExpiresAt(grant.expires_at), + createdBy: principal.url, + }) + } const linkBeforeInitialMessage = parsed.initialMessage !== undefined && shouldLinkDispatchBeforeInitialMessage(dispatchPolicy) diff --git a/packages/agents-server/src/routing/entity-types-router.ts b/packages/agents-server/src/routing/entity-types-router.ts index 2d7521ede0..5a5c574177 100644 --- a/packages/agents-server/src/routing/entity-types-router.ts +++ b/packages/agents-server/src/routing/entity-types-router.ts @@ -8,22 +8,27 @@ import { dispatchPolicySchema } from '../dispatch-policy-schema.js' import { ElectricAgentsError } from '../entity-manager.js' import { ErrCodeNotFound, + ErrCodeInvalidRequest, + ErrCodeUnauthorized, ErrCodeServeEndpointNameMismatch, ErrCodeServeEndpointUnreachable, } from '../electric-agents-types.js' import { apiError } from '../electric-agents-http.js' import { routeBody, withSchema } from './schema.js' import { rewriteLoopbackWebhookUrl } from '../utils/webhook-url.js' +import { canAccessEntityType, canRegisterEntityType } from '../permissions.js' import type { ElectricAgentsEntityType, RegisterEntityTypeRequest, + EntityTypePermissionGrantInput, } from '../electric-agents-types.js' import type { JsonRouteRequest } from './schema.js' import type { RouterType } from 'itty-router' import type { TenantContext } from './context.js' -export interface ElectricAgentsEntityTypeRouteRequest - extends JsonRouteRequest {} +export interface ElectricAgentsEntityTypeRouteRequest extends JsonRouteRequest { + entityTypeRoute?: { entityType: ElectricAgentsEntityType } +} type EntityTypeRouteArgs = [TenantContext] type EntityTypeRouteResult = Response | undefined @@ -41,6 +46,19 @@ type PublicEntityTypeResponse = ElectricAgentsEntityType & { const jsonObjectSchema = Type.Record(Type.String(), Type.Unknown()) const schemaMapSchema = Type.Record(Type.String(), jsonObjectSchema) +const typePermissionGrantInputSchema = Type.Object( + { + subject_kind: Type.Union([ + Type.Literal(`principal`), + Type.Literal(`principal_kind`), + ]), + subject_value: Type.String(), + permission: Type.Union([Type.Literal(`spawn`), Type.Literal(`manage`)]), + expires_at: Type.Optional(Type.String()), + }, + { additionalProperties: false } +) + const registerEntityTypeBodySchema = Type.Object( { name: Type.Optional(Type.String()), @@ -50,6 +68,9 @@ const registerEntityTypeBodySchema = Type.Object( state_schemas: Type.Optional(schemaMapSchema), serve_endpoint: Type.Optional(Type.String()), default_dispatch_policy: Type.Optional(dispatchPolicySchema), + permission_grants: Type.Optional( + Type.Array(typePermissionGrantInputSchema) + ), }, { additionalProperties: false } ) @@ -66,6 +87,7 @@ type RegisterEntityTypeBody = Static type AmendEntityTypeSchemasBody = Static< typeof amendEntityTypeSchemasBodySchema > +type TypePermissionGrantInput = EntityTypePermissionGrantInput export const entityTypesRouter: ElectricAgentsEntityTypeRoutes = Router< ElectricAgentsEntityTypeRouteRequest, @@ -79,15 +101,47 @@ entityTypesRouter.get(`/`, listEntityTypes) entityTypesRouter.post( `/`, withSchema(registerEntityTypeBodySchema), + withEntityTypeRegistrationPermission, registerEntityType ) entityTypesRouter.patch( `/:name/schemas`, + withExistingEntityType, + withEntityTypeManagePermission, withSchema(amendEntityTypeSchemasBodySchema), amendSchemas ) -entityTypesRouter.get(`/:name`, getEntityType) -entityTypesRouter.delete(`/:name`, deleteEntityType) +entityTypesRouter.get( + `/:name`, + withExistingEntityType, + withEntityTypeSpawnPermission, + getEntityType +) +entityTypesRouter.delete( + `/:name`, + withExistingEntityType, + withEntityTypeManagePermission, + deleteEntityType +) +entityTypesRouter.get( + `/:name/grants`, + withExistingEntityType, + withEntityTypeManagePermission, + listTypePermissionGrants +) +entityTypesRouter.post( + `/:name/grants`, + withExistingEntityType, + withSchema(typePermissionGrantInputSchema), + withEntityTypeManagePermission, + createTypePermissionGrant +) +entityTypesRouter.delete( + `/:name/grants/:grantId`, + withExistingEntityType, + withEntityTypeManagePermission, + deleteTypePermissionGrant +) async function registerEntityType( request: ElectricAgentsEntityTypeRouteRequest, @@ -105,6 +159,7 @@ async function registerEntityType( } const entityType = await ctx.entityManager.registerEntityType(normalized) + await applyRegistrationPermissionGrants(ctx, entityType.name, normalized) return json(toPublicEntityType(entityType), { status: 201 }) } @@ -113,7 +168,102 @@ async function listEntityTypes( ctx: TenantContext ): Promise { const entityTypes = await ctx.entityManager.registry.listEntityTypes() - return json(entityTypes.map((entityType) => toPublicEntityType(entityType))) + const visible: Array = [] + for (const entityType of entityTypes) { + if (await canAccessEntityType(ctx, entityType, `spawn`)) { + visible.push(entityType) + } + } + return json(visible.map((entityType) => toPublicEntityType(entityType))) +} + +async function withExistingEntityType( + request: ElectricAgentsEntityTypeRouteRequest, + ctx: TenantContext +): Promise { + const entityType = await ctx.entityManager.registry.getEntityType( + request.params.name + ) + if (!entityType) { + return apiError(404, ErrCodeNotFound, `Entity type not found`) + } + request.entityTypeRoute = { entityType } + return undefined +} + +async function withEntityTypeManagePermission( + request: ElectricAgentsEntityTypeRouteRequest, + ctx: TenantContext +): Promise { + const entityType = request.entityTypeRoute?.entityType + if (!entityType) { + throw new Error(`entity type middleware did not run`) + } + if ( + await canAccessEntityType(ctx, entityType, `manage`, request as Request) + ) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to manage ${entityType.name}` + ) +} + +async function withEntityTypeSpawnPermission( + request: ElectricAgentsEntityTypeRouteRequest, + ctx: TenantContext +): Promise { + const entityType = request.entityTypeRoute?.entityType + if (!entityType) { + throw new Error(`entity type middleware did not run`) + } + if (await canAccessEntityType(ctx, entityType, `spawn`, request as Request)) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to spawn ${entityType.name}` + ) +} + +async function withEntityTypeRegistrationPermission( + request: ElectricAgentsEntityTypeRouteRequest, + ctx: TenantContext +): Promise { + const parsed = normalizeEntityTypeRequest( + routeBody(request) + ) + if (!parsed.name) { + return undefined + } + + const existing = await ctx.entityManager.registry.getEntityType(parsed.name) + if (existing) { + request.entityTypeRoute = { entityType: existing } + if ( + await canAccessEntityType(ctx, existing, `manage`, request as Request) + ) { + return undefined + } + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to manage ${existing.name}` + ) + } + + if (await canRegisterEntityType(ctx, parsed, request as Request)) { + return undefined + } + + return apiError( + 401, + ErrCodeUnauthorized, + `Principal is not allowed to register entity types` + ) } async function discoverServeEndpoint( @@ -141,10 +291,12 @@ async function discoverServeEndpoint( } manifest.serve_endpoint = parsed.serve_endpoint + manifest.permission_grants = parsed.permission_grants const entityType = await ctx.entityManager.registerEntityType( normalizeEntityTypeRequest(manifest) ) + await applyRegistrationPermissionGrants(ctx, entityType.name, manifest) return json(toPublicEntityType(entityType), { status: 201 }) } catch (err) { if (err instanceof ElectricAgentsError) { @@ -161,17 +313,9 @@ async function discoverServeEndpoint( } async function getEntityType( - request: ElectricAgentsEntityTypeRouteRequest, - ctx: TenantContext + request: ElectricAgentsEntityTypeRouteRequest ): Promise { - const entityType = await ctx.entityManager.registry.getEntityType( - request.params.name - ) - if (!entityType) { - return apiError(404, ErrCodeNotFound, `Entity type not found`) - } - - return json(toPublicEntityType(entityType)) + return json(toPublicEntityType(request.entityTypeRoute!.entityType)) } async function amendSchemas( @@ -195,6 +339,90 @@ async function deleteEntityType( return status(204) } +async function listTypePermissionGrants( + request: ElectricAgentsEntityTypeRouteRequest, + ctx: TenantContext +): Promise { + const grants = + await ctx.entityManager.registry.listEntityTypePermissionGrants( + request.entityTypeRoute!.entityType.name + ) + return json({ grants }) +} + +async function createTypePermissionGrant( + request: ElectricAgentsEntityTypeRouteRequest, + ctx: TenantContext +): Promise { + const parsed = routeBody(request) + const grant = + await ctx.entityManager.registry.createEntityTypePermissionGrant({ + entityType: request.entityTypeRoute!.entityType.name, + permission: parsed.permission, + subjectKind: parsed.subject_kind, + subjectValue: parsed.subject_value, + expiresAt: parseExpiresAt(parsed.expires_at), + createdBy: ctx.principal.url, + }) + return json(grant, { status: 201 }) +} + +async function deleteTypePermissionGrant( + request: ElectricAgentsEntityTypeRouteRequest, + ctx: TenantContext +): Promise { + const deleted = + await ctx.entityManager.registry.deleteEntityTypePermissionGrant( + request.entityTypeRoute!.entityType.name, + parseGrantId(request) + ) + return deleted + ? status(204) + : apiError(404, ErrCodeNotFound, `Grant not found`) +} + +async function applyRegistrationPermissionGrants( + ctx: TenantContext, + entityType: string, + request: Pick +): Promise { + for (const grant of request.permission_grants ?? []) { + await ctx.entityManager.registry.ensureEntityTypePermissionGrant({ + entityType, + permission: grant.permission, + subjectKind: grant.subject_kind, + subjectValue: grant.subject_value, + expiresAt: parseExpiresAt(grant.expires_at), + createdBy: ctx.principal.url, + }) + } +} + +function parseGrantId(request: ElectricAgentsEntityTypeRouteRequest): number { + const grantId = Number.parseInt(String(request.params.grantId), 10) + if (!Number.isSafeInteger(grantId) || grantId <= 0) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Invalid grant id`, + 400 + ) + } + return grantId +} + +function parseExpiresAt(value: string | undefined): Date | undefined { + if (value === undefined) return undefined + const expiresAt = new Date(value) + if (Number.isNaN(expiresAt.getTime())) { + throw new ElectricAgentsError( + ErrCodeInvalidRequest, + `Invalid expires_at timestamp`, + 400 + ) + } + return expiresAt +} + function normalizeEntityTypeRequest( parsed: RegisterEntityTypeBody | RegisterEntityTypeRequest ): RegisterEntityTypeRequest { @@ -213,6 +441,7 @@ function normalizeEntityTypeRequest( targets: [{ type: `webhook`, url: serveEndpoint }], } as RegisterEntityTypeRequest[`default_dispatch_policy`]) : undefined), + permission_grants: parsed.permission_grants, } } diff --git a/packages/agents-server/src/routing/hooks.ts b/packages/agents-server/src/routing/hooks.ts index 7a8e444865..0aea49744d 100644 --- a/packages/agents-server/src/routing/hooks.ts +++ b/packages/agents-server/src/routing/hooks.ts @@ -85,6 +85,7 @@ export function applyCors( `content-type`, `authorization`, `electric-claim-token`, + `electric-owner-entity`, ELECTRIC_PRINCIPAL_HEADER, `ngrok-skip-browser-warning`, ].join(`, `) diff --git a/packages/agents-server/src/routing/observations-router.ts b/packages/agents-server/src/routing/observations-router.ts index ef9b0d9834..a91d4e16bb 100644 --- a/packages/agents-server/src/routing/observations-router.ts +++ b/packages/agents-server/src/routing/observations-router.ts @@ -56,7 +56,8 @@ async function ensureEntitiesMembershipStream( ): Promise { const parsed = routeBody(request) const result = await ctx.entityManager.ensureEntitiesMembershipStream( - parsed.tags ?? {} + parsed.tags ?? {}, + ctx.principal ) return json(result) } diff --git a/packages/agents-server/src/runtime.ts b/packages/agents-server/src/runtime.ts index b5d8bbef78..13a36a7818 100644 --- a/packages/agents-server/src/runtime.ts +++ b/packages/agents-server/src/runtime.ts @@ -499,6 +499,14 @@ export class ElectricAgentsTenantRuntime { manifestKey, sourceRef ) + + const sharedStateId = + operation === `delete` ? undefined : this.extractSharedStateId(value) + await this.manager.registry.replaceSharedStateLink( + ownerEntityUrl, + manifestKey, + sharedStateId + ) } private extractEntitiesSourceRef( @@ -514,6 +522,29 @@ export class ElectricAgentsTenantRuntime { return undefined } + private extractSharedStateId( + manifest: Record | undefined + ): string | undefined { + if (manifest?.kind === `shared-state` && typeof manifest.id === `string`) { + return manifest.id + } + + if (manifest?.kind !== `source` || manifest.sourceType !== `db`) { + return undefined + } + + if (typeof manifest.sourceRef === `string`) { + return manifest.sourceRef + } + const config = + typeof manifest.config === `object` && + manifest.config !== null && + !Array.isArray(manifest.config) + ? (manifest.config as Record) + : undefined + return typeof config?.id === `string` ? config.id : undefined + } + private async maybeMarkEntityIdleAfterRunFinished( entityUrl: string ): Promise { diff --git a/packages/agents-server/src/server.ts b/packages/agents-server/src/server.ts index 5c9e010b53..bb68961c3d 100644 --- a/packages/agents-server/src/server.ts +++ b/packages/agents-server/src/server.ts @@ -16,6 +16,7 @@ import { apiError } from './electric-agents-http.js' import { ErrCodeInvalidRequest, ErrCodeUnauthorized, + type AuthorizeRequest, } from './electric-agents-types.js' import { ElectricAgentsError } from './entity-manager.js' import { serverLog } from './utils/log.js' @@ -67,6 +68,7 @@ export interface ElectricAgentsServerOptions { authenticateRequest?: ( request: Request ) => Promise | Principal | null + authorizeRequest?: AuthorizeRequest allowDevPrincipalFallback?: boolean eventSources?: EventSourceCatalog ensureEventSourceWakeSource?: (sourceUrl: string) => Promise | void @@ -453,6 +455,9 @@ export class ElectricAgentsServer { this.options.ensureEventSourceWakeSource, } : {}), + ...(this.options.authorizeRequest + ? { authorizeRequest: this.options.authorizeRequest } + : {}), isShuttingDown: () => this.shuttingDown, mockAgent: this.mockAgentBootstrap ? { runtime: this.mockAgentBootstrap.runtime } diff --git a/packages/agents-server/src/utils/server-utils.ts b/packages/agents-server/src/utils/server-utils.ts index 7964315862..543ed04c56 100644 --- a/packages/agents-server/src/utils/server-utils.ts +++ b/packages/agents-server/src/utils/server-utils.ts @@ -96,6 +96,8 @@ export function buildElectricProxyTarget(options: { electricSecret?: string tenantId: string principalUrl?: string + principalKind?: string + permissionBypass?: boolean }): URL { const targetPath = options.incomingUrl.pathname.replace( `/_electric/electric`, @@ -121,13 +123,29 @@ export function buildElectricProxyTarget(options: { `columns`, `"tenant_id","url","type","status","dispatch_policy","tags","spawn_args","sandbox","parent","type_revision","inbox_schemas","state_schemas","created_at","updated_at"` ) - applyTenantShapeWhere(target, options.tenantId) + applyShapeWhere( + target, + buildReadableEntitiesWhere({ + tenantId: options.tenantId, + principalUrl: options.principalUrl ?? ``, + principalKind: options.principalKind ?? ``, + permissionBypass: options.permissionBypass, + }) + ) } else if (table === `entity_types`) { target.searchParams.set( `columns`, `"tenant_id","name","description","creation_schema","inbox_schemas","state_schemas","serve_endpoint","default_dispatch_policy","revision","created_at","updated_at"` ) - applyTenantShapeWhere(target, options.tenantId) + applyShapeWhere( + target, + buildSpawnableEntityTypesWhere({ + tenantId: options.tenantId, + principalUrl: options.principalUrl ?? ``, + principalKind: options.principalKind ?? ``, + permissionBypass: options.permissionBypass, + }) + ) } else if (table === `runners`) { target.searchParams.set( `columns`, @@ -149,24 +167,128 @@ export function buildElectricProxyTarget(options: { `columns`, `"tenant_id","entity_url","pending_source_streams","pending_reason","pending_since","outstanding_wake_id","outstanding_wake_target","outstanding_wake_created_at","active_consumer_id","active_runner_id","active_epoch","active_claimed_at","active_lease_expires_at","last_wake_id","last_claimed_at","last_released_at","last_completed_at","last_error","updated_at"` ) - applyTenantShapeWhere(target, options.tenantId) + applyShapeWhere( + target, + buildReadableEntityUrlWhere({ + tenantId: options.tenantId, + principalUrl: options.principalUrl ?? ``, + principalKind: options.principalKind ?? ``, + permissionBypass: options.permissionBypass, + }) + ) } else if (table === `wake_notifications`) { target.searchParams.set( `columns`, `"tenant_id","wake_id","entity_url","target_type","target_runner_id","target_webhook_url","target_worker_pool_id","runner_wake_stream","runner_wake_stream_offset","notification_public","delivery_status","claim_status","created_at","delivered_at","claimed_at","resolved_at"` ) - applyTenantShapeWhere(target, options.tenantId) + applyShapeWhere( + target, + buildReadableEntityUrlWhere({ + tenantId: options.tenantId, + principalUrl: options.principalUrl ?? ``, + principalKind: options.principalKind ?? ``, + permissionBypass: options.permissionBypass, + }) + ) } else if (table === `consumer_claims`) { target.searchParams.set( `columns`, `"tenant_id","consumer_id","epoch","wake_id","entity_url","stream_path","runner_id","status","claimed_at","last_heartbeat_at","lease_expires_at","released_at","acked_streams","updated_at"` ) - applyTenantShapeWhere(target, options.tenantId) + applyShapeWhere( + target, + buildReadableEntityUrlWhere({ + tenantId: options.tenantId, + principalUrl: options.principalUrl ?? ``, + principalKind: options.principalKind ?? ``, + permissionBypass: options.permissionBypass, + }) + ) } return target } +export function buildReadableEntitiesWhere(options: { + tenantId: string + principalUrl: string + principalKind: string + permissionBypass?: boolean +}): string { + const tenant = sqlStringLiteral(options.tenantId) + if (options.permissionBypass) { + return `tenant_id = ${tenant}` + } + const principalUrl = sqlStringLiteral(options.principalUrl) + const principalKind = sqlStringLiteral(options.principalKind) + return [ + `tenant_id = ${tenant}`, + `AND (`, + ` created_by = ${principalUrl}`, + ` OR url IN (`, + ` SELECT entity_url`, + ` FROM entity_effective_permissions`, + ` WHERE tenant_id = ${tenant}`, + ` AND permission IN ('read', 'manage')`, + ` AND (`, + ` (subject_kind = 'principal' AND subject_value = ${principalUrl})`, + ` OR (subject_kind = 'principal_kind' AND subject_value = ${principalKind})`, + ` )`, + ` )`, + `)`, + ].join(`\n`) +} + +export function buildReadableEntityUrlWhere(options: { + tenantId: string + principalUrl: string + principalKind: string + permissionBypass?: boolean +}): string { + const tenant = sqlStringLiteral(options.tenantId) + if (options.permissionBypass) { + return `tenant_id = ${tenant}` + } + return [ + `tenant_id = ${tenant}`, + `AND entity_url IN (`, + ` SELECT url`, + ` FROM entities`, + ` WHERE ${indentWhere( + buildReadableEntitiesWhere(options), + ` ` + ).trimStart()}`, + `)`, + ].join(`\n`) +} + +export function buildSpawnableEntityTypesWhere(options: { + tenantId: string + principalUrl: string + principalKind: string + permissionBypass?: boolean +}): string { + const tenant = sqlStringLiteral(options.tenantId) + if (options.permissionBypass) { + return `tenant_id = ${tenant}` + } + const principalUrl = sqlStringLiteral(options.principalUrl) + const principalKind = sqlStringLiteral(options.principalKind) + return [ + `tenant_id = ${tenant}`, + `AND name IN (`, + ` SELECT entity_type`, + ` FROM entity_type_permission_grants`, + ` WHERE tenant_id = ${tenant}`, + ` AND permission IN ('spawn', 'manage')`, + ` AND (`, + ` (subject_kind = 'principal' AND subject_value = ${principalUrl})`, + ` OR (subject_kind = 'principal_kind' AND subject_value = ${principalKind})`, + ` )`, + `)`, + ].join(`\n`) +} + export async function forwardFetchRequest(options: { request: { method: string @@ -248,17 +370,29 @@ function applyTenantShapeWhere( tenantId: string, extraConditions: Array = [] ): void { - const tenantWhere = [ - `tenant_id = ${sqlStringLiteral(tenantId)}`, - ...extraConditions, - ].join(` AND `) + applyShapeWhere( + target, + [`tenant_id = ${sqlStringLiteral(tenantId)}`, ...extraConditions].join( + ` AND ` + ) + ) +} + +function applyShapeWhere(target: URL, enforcedWhere: string): void { const existingWhere = target.searchParams.get(`where`) target.searchParams.set( `where`, - existingWhere ? `${tenantWhere} AND (${existingWhere})` : tenantWhere + existingWhere ? `${enforcedWhere} AND (${existingWhere})` : enforcedWhere ) } function sqlStringLiteral(value: string): string { return `'${value.replace(/'/g, `''`)}'` } + +function indentWhere(where: string, prefix: string): string { + return where + .split(`\n`) + .map((line) => `${prefix}${line}`) + .join(`\n`) +} diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index 8a413d40b3..608ab648a3 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -286,6 +286,12 @@ describe(`ElectricAgentsRoutes shared-state streams`, () => { createRequest(`PUT`, `/_electric/shared-state/board-1`), { service: `test`, + principal: { + kind: `system`, + id: `dev-local`, + key: `system:dev-local`, + url: `/principal/system:dev-local`, + }, durableStreamsUrl: `http://durable.local/custom/ds-prefix`, isShuttingDown: () => false, } as unknown as TenantContext diff --git a/packages/agents-server/test/entity-bridge-manager.test.ts b/packages/agents-server/test/entity-bridge-manager.test.ts index 7006b94a23..a0a50ac180 100644 --- a/packages/agents-server/test/entity-bridge-manager.test.ts +++ b/packages/agents-server/test/entity-bridge-manager.test.ts @@ -76,6 +76,9 @@ vi.mock(`@durable-streams/client`, () => ({ }, })) +const PRINCIPAL_URL = `/principal/user%3Aowner` +const PRINCIPAL_KIND = `user` + describe(`EntityBridgeManager`, () => { beforeEach(() => { mockState.liveCallback = null @@ -149,7 +152,11 @@ describe(`EntityBridgeManager`, () => { `http://electric.test` ) - const registerPromise = manager.register({ demo: `x` }) + const registerPromise = manager.register( + { demo: `x` }, + PRINCIPAL_URL, + PRINCIPAL_KIND + ) await vi.waitFor(() => { expect(mockState.liveCallback).not.toBeNull() }) @@ -323,6 +330,63 @@ describe(`EntityBridgeManager`, () => { ) }) + it(`scopes entity membership streams by principal`, async () => { + const registry = { + upsertEntityBridge: vi + .fn() + .mockImplementation(async (row: Record) => row), + touchEntityBridge: vi.fn().mockResolvedValue(undefined), + updateEntityBridgeCursor: vi.fn().mockResolvedValue(undefined), + clearEntityBridgeCursor: vi.fn().mockResolvedValue(undefined), + } + const streamClient = { + baseUrl: `http://streams.test`, + exists: vi.fn().mockResolvedValue(true), + create: vi.fn().mockResolvedValue(undefined), + readJson: vi.fn().mockResolvedValue([]), + } + const manager = new EntityBridgeManager( + registry as never, + streamClient as never, + `http://electric.test` + ) + + const firstPromise = manager.register( + { demo: `x` }, + `/principal/user%3Aone`, + `user` + ) + await vi.waitFor(() => expect(mockState.liveCallback).not.toBeNull()) + await mockState.liveCallback?.([{ headers: { control: `up-to-date` } }]) + const first = await firstPromise + + const secondPromise = manager.register( + { demo: `x` }, + `/principal/user%3Atwo`, + `user` + ) + await vi.waitFor(() => expect(mockState.liveCallbacks).toHaveLength(2)) + await mockState.liveCallbacks[1]?.([{ headers: { control: `up-to-date` } }]) + const second = await secondPromise + + expect(first.sourceRef).not.toBe(second.sourceRef) + expect(first.streamUrl).not.toBe(second.streamUrl) + expect(registry.upsertEntityBridge).toHaveBeenCalledWith( + expect.objectContaining({ + principalUrl: `/principal/user%3Aone`, + principalKind: `user`, + }) + ) + expect(registry.upsertEntityBridge).toHaveBeenCalledWith( + expect.objectContaining({ + principalUrl: `/principal/user%3Atwo`, + principalKind: `user`, + }) + ) + + await manager.stop() + }) + it(`reuses a persisted shape cursor when one is available`, async () => { const registry = { upsertEntityBridge: vi.fn().mockImplementation(async () => ({ @@ -350,7 +414,7 @@ describe(`EntityBridgeManager`, () => { `http://electric.test` ) - await manager.register({ demo: `x` }) + await manager.register({ demo: `x` }, PRINCIPAL_URL, PRINCIPAL_KIND) expect(mockState.lastConstructedOptions).toMatchObject({ offset: `5_0`, @@ -382,7 +446,11 @@ describe(`EntityBridgeManager`, () => { `http://electric.test` ) - const registerPromise = manager.register({ demo: `x` }) + const registerPromise = manager.register( + { demo: `x` }, + PRINCIPAL_URL, + PRINCIPAL_KIND + ) await vi.waitFor(() => { expect(mockState.liveCallback).not.toBeNull() }) @@ -495,8 +563,16 @@ describe(`EntityBridgeManager`, () => { `http://electric.test` ) - const firstPromise = manager.register({ demo: `x` }) - const secondPromise = manager.register({ demo: `x` }) + const firstPromise = manager.register( + { demo: `x` }, + PRINCIPAL_URL, + PRINCIPAL_KIND + ) + const secondPromise = manager.register( + { demo: `x` }, + PRINCIPAL_URL, + PRINCIPAL_KIND + ) await vi.waitFor(() => { expect(mockState.liveCallback).not.toBeNull() }) diff --git a/packages/agents-server/test/entity-projector.test.ts b/packages/agents-server/test/entity-projector.test.ts index fbb6d21c1c..47306e7810 100644 --- a/packages/agents-server/test/entity-projector.test.ts +++ b/packages/agents-server/test/entity-projector.test.ts @@ -78,6 +78,7 @@ function entityRow( url, type: `task`, status, + created_by: `/principal/user%3Aowner`, tags, spawn_args: {}, parent: null, @@ -144,7 +145,11 @@ describe(`EntityProjector`, () => { await startPromise const facade = projector.forTenant(`svc-a`, registry as never) - const result = await facade.register({ demo: `x` }) + const result = await facade.register( + { demo: `x` }, + `/principal/user%3Aowner`, + `user` + ) expect(mockState.constructedOptions).toHaveLength(1) expect(mockState.constructedOptions[0]).toMatchObject({ diff --git a/packages/agents-server/test/permissions-routes.test.ts b/packages/agents-server/test/permissions-routes.test.ts new file mode 100644 index 0000000000..e56f4953e6 --- /dev/null +++ b/packages/agents-server/test/permissions-routes.test.ts @@ -0,0 +1,413 @@ +import { afterEach, describe, expect, it, vi } from 'vitest' +import { globalRouter } from '../src/routing/global-router' +import type { EntityPermission } from '../src/electric-agents-types' +import type { Principal } from '../src/principal' +import type { TenantContext } from '../src/routing/context' + +const owner = { + kind: `user` as const, + id: `owner`, + key: `user:owner`, + url: `/principal/user%3Aowner`, +} + +const reader = { + kind: `user` as const, + id: `reader`, + key: `user:reader`, + url: `/principal/user%3Areader`, +} + +function request(method: string, path: string, body?: unknown): Request { + return new Request(`http://agents.test${path}`, { + method, + headers: + body === undefined ? undefined : { 'content-type': `application/json` }, + body: body === undefined ? undefined : JSON.stringify(body), + }) +} + +function entity(url: string, createdBy = owner.url) { + const type = url.split(`/`)[1] ?? `task` + return { + url, + type, + status: `idle`, + streams: { main: `${url}/main`, error: `${url}/error` }, + subscription_id: `${type}-sub`, + write_token: `${url}-token`, + tags: {}, + created_by: createdBy, + created_at: 1, + updated_at: 1, + } +} + +function ctx( + overrides: { + principal?: Principal + hasEntityPermission?: boolean | ((permission: EntityPermission) => boolean) + hasEntityTypePermission?: boolean + entityCreatedBy?: string + parentCreatedBy?: string + linkedSharedStateEntityUrls?: Array + } = {} +): TenantContext { + const taskType = { + name: `task`, + description: `Task`, + revision: 1, + created_at: new Date(0).toISOString(), + updated_at: new Date(0).toISOString(), + } + const currentEntity = entity(`/task/one`, overrides.entityCreatedBy) + const parentEntity = entity( + `/task/parent`, + overrides.parentCreatedBy ?? owner.url + ) + const linkedEntity = entity(`/task/linked`, owner.url) + const registry = { + pruneExpiredPermissionGrants: vi.fn(async () => undefined), + getEntityType: vi.fn(async (name: string) => + name === taskType.name ? taskType : null + ), + getEntity: vi.fn(async (url: string) => { + if (url === currentEntity.url) return currentEntity + if (url === parentEntity.url) return parentEntity + if (url === linkedEntity.url) return linkedEntity + return null + }), + getEntityByStream: vi.fn(async (path: string) => + path === currentEntity.streams.main ? currentEntity : null + ), + listSharedStateLinkedEntityUrls: vi.fn( + async () => overrides.linkedSharedStateEntityUrls ?? [currentEntity.url] + ), + hasEntityPermission: vi.fn(async (_url, permission) => + typeof overrides.hasEntityPermission === `function` + ? overrides.hasEntityPermission(permission) + : (overrides.hasEntityPermission ?? false) + ), + hasEntityTypePermission: vi.fn( + async () => overrides.hasEntityTypePermission ?? false + ), + listEntities: vi.fn(async () => ({ entities: [], total: 0 })), + createEntityPermissionGrant: vi.fn(async (input) => ({ + id: 1, + entity_url: input.entityUrl, + subject_kind: input.subjectKind, + subject_value: input.subjectValue, + permission: input.permission, + propagation: input.propagation ?? `self`, + copy_to_children: input.copyToChildren ?? false, + created_by: input.createdBy, + created_at: new Date(0).toISOString(), + updated_at: new Date(0).toISOString(), + })), + copyEntityPermissionGrantsForSpawn: vi.fn(async () => []), + ensureEntityTypePermissionGrant: vi.fn(async (input) => ({ + id: 1, + entity_type: input.entityType, + subject_kind: input.subjectKind, + subject_value: input.subjectValue, + permission: input.permission, + created_by: input.createdBy, + created_at: new Date(0).toISOString(), + updated_at: new Date(0).toISOString(), + })), + } + + return { + service: `tenant-test`, + principal: overrides.principal ?? reader, + publicUrl: `http://agents.test`, + durableStreamsUrl: `http://streams.test`, + durableStreamsDispatcher: {} as never, + pgDb: {} as never, + entityManager: { + registry, + ensurePrincipal: vi.fn(async () => undefined), + registerEntityType: vi.fn(async (input) => ({ + name: input.name, + description: input.description, + revision: 1, + created_at: new Date(0).toISOString(), + updated_at: new Date(0).toISOString(), + })), + spawn: vi.fn(async (_type, input) => + entity( + `/task/${input.instance_id}`, + (overrides.principal ?? reader).url + ) + ), + } as never, + streamClient: {} as never, + runtime: { claimWriteTokens: { clearStream: vi.fn() } } as never, + entityBridgeManager: { + beginClientRead: vi.fn(async () => vi.fn(async () => undefined)), + touchByStreamPath: vi.fn(async () => undefined), + } as never, + isShuttingDown: () => false, + } +} + +describe(`permission route middleware`, () => { + afterEach(() => { + vi.unstubAllGlobals() + }) + + it(`denies entity reads without ownership or grants`, async () => { + const response = await globalRouter.fetch( + request(`GET`, `/_electric/entities/task/one`), + ctx() + ) + expect(response.status).toBe(401) + }) + + it(`allows entity reads through effective grants`, async () => { + const response = await globalRouter.fetch( + request(`GET`, `/_electric/entities/task/one`), + ctx({ hasEntityPermission: true }) + ) + expect(response.status).toBe(200) + await expect(response.json()).resolves.toMatchObject({ url: `/task/one` }) + }) + + it(`denies spawn without a type spawn grant`, async () => { + const response = await globalRouter.fetch( + request(`PUT`, `/_electric/entities/task/new`, {}), + ctx() + ) + expect(response.status).toBe(401) + }) + + it(`allows users to register new entity types`, async () => { + const response = await globalRouter.fetch( + request(`POST`, `/_electric/entity-types`, { + name: `custom`, + description: `Custom`, + }), + ctx() + ) + expect(response.status).toBe(201) + }) + + it(`requires manage permission to update existing entity types through registration`, async () => { + const response = await globalRouter.fetch( + request(`POST`, `/_electric/entity-types`, { + name: `task`, + description: `Task update`, + }), + ctx() + ) + expect(response.status).toBe(401) + }) + + it(`materializes registration spawn grants for new entity types`, async () => { + const context = ctx() + const response = await globalRouter.fetch( + request(`POST`, `/_electric/entity-types`, { + name: `custom`, + description: `Custom`, + permission_grants: [ + { + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `spawn`, + }, + ], + }), + context + ) + + expect(response.status).toBe(201) + expect( + context.entityManager.registry.ensureEntityTypePermissionGrant + ).toHaveBeenCalledWith({ + entityType: `custom`, + subjectKind: `principal_kind`, + subjectValue: `user`, + permission: `spawn`, + expiresAt: undefined, + createdBy: reader.url, + }) + }) + + it(`requires parent entity spawn permission when spawning a child`, async () => { + const response = await globalRouter.fetch( + request(`PUT`, `/_electric/entities/task/new`, { + parent: `/task/parent`, + }), + ctx({ + hasEntityTypePermission: true, + parentCreatedBy: owner.url, + }) + ) + expect(response.status).toBe(401) + }) + + it(`materializes direct spawn-time grants on the new entity`, async () => { + const context = ctx({ hasEntityTypePermission: true }) + const response = await globalRouter.fetch( + request(`PUT`, `/_electric/entities/task/new`, { + grants: [ + { + subject_kind: `principal`, + subject_value: owner.url, + permission: `read`, + }, + ], + }), + context + ) + + expect(response.status).toBe(201) + expect( + context.entityManager.registry.createEntityPermissionGrant + ).toHaveBeenCalledWith({ + entityUrl: `/task/new`, + subjectKind: `principal`, + subjectValue: owner.url, + permission: `read`, + propagation: undefined, + copyToChildren: undefined, + expiresAt: undefined, + createdBy: reader.url, + }) + }) + + it(`requires parent manage permission for broad parented spawn-time grants`, async () => { + const context = ctx({ + hasEntityTypePermission: true, + hasEntityPermission: (permission) => permission === `spawn`, + }) + const response = await globalRouter.fetch( + request(`PUT`, `/_electric/entities/task/new`, { + parent: `/task/parent`, + grants: [ + { + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `read`, + }, + ], + }), + context + ) + + expect(response.status).toBe(401) + expect(context.entityManager.spawn).not.toHaveBeenCalled() + expect( + context.entityManager.registry.createEntityPermissionGrant + ).not.toHaveBeenCalled() + }) + + it(`allows broad parented spawn-time grants with parent manage permission`, async () => { + const context = ctx({ + hasEntityTypePermission: true, + hasEntityPermission: (permission) => + permission === `spawn` || permission === `manage`, + }) + const response = await globalRouter.fetch( + request(`PUT`, `/_electric/entities/task/new`, { + parent: `/task/parent`, + grants: [ + { + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `read`, + }, + ], + }), + context + ) + + expect(response.status).toBe(201) + expect( + context.entityManager.registry.createEntityPermissionGrant + ).toHaveBeenCalledWith( + expect.objectContaining({ + entityUrl: `/task/new`, + subjectKind: `principal_kind`, + subjectValue: `user`, + permission: `read`, + }) + ) + }) + + it(`denies durable entity stream reads without read permission`, async () => { + const upstreamFetch = vi.fn(async () => new Response(`ok`)) + vi.stubGlobal(`fetch`, upstreamFetch) + + const response = await globalRouter.fetch( + request(`GET`, `/task/one/main`), + ctx() + ) + + expect(response.status).toBe(401) + expect(upstreamFetch).not.toHaveBeenCalled() + }) + + it(`passes through non-Agents durable stream paths`, async () => { + const upstreamFetch = vi.fn(async () => new Response(`ok`)) + vi.stubGlobal(`fetch`, upstreamFetch) + + const response = await globalRouter.fetch( + request(`GET`, `/utility/main`), + ctx() + ) + + expect(response.status).toBe(200) + expect(upstreamFetch).toHaveBeenCalled() + }) + + it(`authorizes initial shared-state creation through the owner entity`, async () => { + const upstreamFetch = vi.fn(async () => new Response(null, { status: 201 })) + vi.stubGlobal(`fetch`, upstreamFetch) + + const response = await globalRouter.fetch( + new Request(`http://agents.test/_electric/shared-state/board-1`, { + method: `PUT`, + headers: { 'electric-owner-entity': `/task/one` }, + }), + ctx({ entityCreatedBy: reader.url, linkedSharedStateEntityUrls: [] }) + ) + + expect(response.status).toBe(201) + expect(upstreamFetch).toHaveBeenCalled() + }) + + it(`denies unsupported shared-state methods instead of passing through`, async () => { + const upstreamFetch = vi.fn(async () => new Response(null, { status: 204 })) + vi.stubGlobal(`fetch`, upstreamFetch) + + const response = await globalRouter.fetch( + new Request(`http://agents.test/_electric/shared-state/board-1`, { + method: `DELETE`, + }), + ctx({ linkedSharedStateEntityUrls: [`/task/one`] }) + ) + + expect(response.status).toBe(401) + expect(upstreamFetch).not.toHaveBeenCalled() + }) + + it(`does not let the owner-entity header override existing shared-state links`, async () => { + const upstreamFetch = vi.fn(async () => new Response(null, { status: 201 })) + vi.stubGlobal(`fetch`, upstreamFetch) + + const response = await globalRouter.fetch( + new Request(`http://agents.test/_electric/shared-state/board-1`, { + method: `PUT`, + headers: { 'electric-owner-entity': `/task/one` }, + }), + ctx({ + entityCreatedBy: reader.url, + linkedSharedStateEntityUrls: [`/task/linked`], + }) + ) + + expect(response.status).toBe(401) + expect(upstreamFetch).not.toHaveBeenCalled() + }) +}) diff --git a/packages/agents-server/test/permissions.test.ts b/packages/agents-server/test/permissions.test.ts new file mode 100644 index 0000000000..194c0cc7f7 --- /dev/null +++ b/packages/agents-server/test/permissions.test.ts @@ -0,0 +1,160 @@ +import { describe, expect, it, vi } from 'vitest' +import { + canAccessEntity, + canAccessEntityType, + canAccessSharedState, +} from '../src/permissions' +import type { + ElectricAgentsEntity, + ElectricAgentsEntityType, +} from '../src/electric-agents-types' +import type { Principal } from '../src/principal' +import type { TenantContext } from '../src/routing/context' + +const owner: Principal = { + kind: `user`, + id: `owner`, + key: `user:owner`, + url: `/principal/user%3Aowner`, +} + +const reader: Principal = { + kind: `user`, + id: `reader`, + key: `user:reader`, + url: `/principal/user%3Areader`, +} + +const agent: Principal = { + kind: `agent`, + id: `worker`, + key: `agent:worker`, + url: `/principal/agent%3Aworker`, +} + +function entity(url = `/task/one`, createdBy = owner.url) { + return { + url, + type: `task`, + status: `idle`, + streams: { main: `${url}/main`, error: `${url}/error` }, + subscription_id: `task-sub`, + write_token: `token`, + tags: {}, + created_by: createdBy, + created_at: 1, + updated_at: 1, + } satisfies ElectricAgentsEntity +} + +const entityType = { + name: `task`, + description: `Task`, + revision: 1, + created_at: new Date(0).toISOString(), + updated_at: new Date(0).toISOString(), +} satisfies ElectricAgentsEntityType + +function ctx( + principal: Principal, + opts: { + entityGrant?: boolean + typeGrant?: boolean + linkedEntity?: ElectricAgentsEntity | null + authorizeRequest?: TenantContext[`authorizeRequest`] + } = {} +): TenantContext { + const linked = opts.linkedEntity ?? entity(`/task/shared`, reader.url) + return { + service: `tenant-test`, + principal, + publicUrl: `http://agents.test`, + durableStreamsUrl: `http://streams.test`, + durableStreamsDispatcher: {} as never, + pgDb: {} as never, + entityManager: { + registry: { + pruneExpiredPermissionGrants: vi.fn(async () => undefined), + hasEntityPermission: vi.fn(async () => opts.entityGrant ?? false), + hasEntityTypePermission: vi.fn(async () => opts.typeGrant ?? false), + listSharedStateLinkedEntityUrls: vi.fn(async () => + linked ? [linked.url] : [] + ), + getEntity: vi.fn(async (url: string) => + linked && url === linked.url ? linked : null + ), + }, + } as never, + streamClient: {} as never, + runtime: {} as never, + entityBridgeManager: {} as never, + authorizeRequest: opts.authorizeRequest, + isShuttingDown: () => false, + } +} + +describe(`permission service`, () => { + it(`allows the creator by default and denies unrelated principals`, async () => { + const owned = entity() + + expect(await canAccessEntity(ctx(owner), owned, `read`)).toBe(true) + expect(await canAccessEntity(ctx(owner), owned, `delete`)).toBe(true) + expect(await canAccessEntity(ctx(reader), owned, `read`)).toBe(false) + }) + + it(`allows entity and type access through explicit grants`, async () => { + const shared = entity(`/task/shared`, owner.url) + + expect( + await canAccessEntity(ctx(reader, { entityGrant: true }), shared, `read`) + ).toBe(true) + expect( + await canAccessEntityType( + ctx(reader, { typeGrant: true }), + entityType, + `spawn` + ) + ).toBe(true) + expect(await canAccessEntityType(ctx(agent), entityType, `spawn`)).toBe( + false + ) + }) + + it(`authorizes shared state through any readable linked entity`, async () => { + expect(await canAccessSharedState(ctx(reader), `board-1`, `read`)).toBe( + true + ) + expect( + await canAccessSharedState( + ctx(reader, { linkedEntity: entity(`/task/shared`, owner.url) }), + `board-1`, + `read` + ) + ).toBe(false) + }) + + it(`lets the webhook escape hatch deny and cache point decisions`, async () => { + const hook = vi.fn(() => ({ + decision: `deny` as const, + expires_at: new Date(Date.now() + 60_000).toISOString(), + })) + const request = new Request( + `http://agents.test/_electric/entities/task/one` + ) + const owned = entity() + const hookCtx = ctx(owner, { authorizeRequest: hook }) + + expect(await canAccessEntity(hookCtx, owned, `read`, request)).toBe(false) + expect(await canAccessEntity(hookCtx, owned, `read`, request)).toBe(false) + expect(hook).toHaveBeenCalledTimes(1) + expect(hook).toHaveBeenCalledWith( + expect.objectContaining({ + tenant: `tenant-test`, + principal: owner, + verb: `read`, + builtInAllowed: true, + resource: { kind: `entity`, entity: owned }, + }) + ) + }) +}) diff --git a/packages/agents-server/test/server-utils.test.ts b/packages/agents-server/test/server-utils.test.ts index e894a89e00..c7a77d767f 100644 --- a/packages/agents-server/test/server-utils.test.ts +++ b/packages/agents-server/test/server-utils.test.ts @@ -7,6 +7,7 @@ function shapeTarget(query: string): URL { electricUrl: `http://electric.local`, tenantId: `tenant-test`, principalUrl: `/principal/user%3Aowner%40example.com`, + principalKind: `user`, }) } @@ -34,7 +35,13 @@ describe(`server utils`, () => { const columns = target.searchParams.get(`columns`) expect(columns).toContain(`"sandbox"`) expect(columns).toContain(`"dispatch_policy"`) - expect(target.searchParams.get(`where`)).toBe(`tenant_id = 'tenant-test'`) + expect(target.searchParams.get(`where`)).toContain( + `tenant_id = 'tenant-test'` + ) + expect(target.searchParams.get(`where`)).toContain(`created_by =`) + expect(target.searchParams.get(`where`)).toContain( + `FROM entity_effective_permissions` + ) }) it(`combines runner owner scoping with Electric protocol where clauses`, () => { @@ -58,4 +65,47 @@ describe(`server utils`, () => { `tenant_id = 'tenant-test' AND owner_principal = '/principal/user%3Aowner%40example.com' AND (runner_id = 'runner-1')` ) }) + + it(`scopes entity shapes to owner or read/manage effective grants with IN subqueries`, () => { + const target = shapeTarget(`table=entities`) + const where = target.searchParams.get(`where`) ?? `` + + expect(where).toContain(`tenant_id = 'tenant-test'`) + expect(where).toContain( + `created_by = '/principal/user%3Aowner%40example.com'` + ) + expect(where).toContain(`url IN (`) + expect(where).toContain(`FROM entity_effective_permissions`) + expect(where).toContain(`permission IN ('read', 'manage')`) + expect(where).toContain( + `(subject_kind = 'principal_kind' AND subject_value = 'user')` + ) + expect(where).not.toMatch(/\bEXISTS\b/i) + }) + + it(`scopes entity-url tables through readable entity URLs without correlated subqueries`, () => { + const target = shapeTarget(`table=entity_dispatch_state`) + const where = target.searchParams.get(`where`) ?? `` + + expect(where).toContain(`entity_url IN (`) + expect(where).toContain(`SELECT url`) + expect(where).toContain(`FROM entities`) + expect(where).toContain(`url IN (`) + expect(where).toContain(`FROM entity_effective_permissions`) + expect(where).not.toMatch(/\bEXISTS\b/i) + expect(where).not.toContain(`entity_dispatch_state.`) + }) + + it(`scopes entity type shapes to spawn/manage grants`, () => { + const target = shapeTarget(`table=entity_types`) + const where = target.searchParams.get(`where`) ?? `` + + expect(where).toContain(`name IN (`) + expect(where).toContain(`FROM entity_type_permission_grants`) + expect(where).toContain(`permission IN ('spawn', 'manage')`) + expect(where).toContain( + `(subject_kind = 'principal_kind' AND subject_value = 'user')` + ) + expect(where).not.toMatch(/\bEXISTS\b/i) + }) }) diff --git a/packages/agents-server/test/test-backend.ts b/packages/agents-server/test/test-backend.ts index fe091e10e8..382ac22101 100644 --- a/packages/agents-server/test/test-backend.ts +++ b/packages/agents-server/test/test-backend.ts @@ -126,6 +126,9 @@ async function ensureExpectedSchema(postgresUrl: string): Promise { hasOutboxDeadLetteredAt, hasEntityManifestSources, hasEntitiesCreatedBy, + hasEntityEffectivePermissions, + hasSharedStateLinks, + hasEntityBridgePrincipal, hasLegacyEntitiesMetadata, ] = await Promise.all([ hasColumn(postgresUrl, `entities`, `tags`), @@ -133,6 +136,9 @@ async function ensureExpectedSchema(postgresUrl: string): Promise { hasColumn(postgresUrl, `tag_stream_outbox`, `dead_lettered_at`), hasTable(postgresUrl, `entity_manifest_sources`), hasColumn(postgresUrl, `entities`, `created_by`), + hasTable(postgresUrl, `entity_effective_permissions`), + hasTable(postgresUrl, `shared_state_links`), + hasColumn(postgresUrl, `entity_bridges`, `principal_url`), hasColumn(postgresUrl, `entities`, `metadata`), ]) @@ -142,6 +148,9 @@ async function ensureExpectedSchema(postgresUrl: string): Promise { hasOutboxDeadLetteredAt && hasEntityManifestSources && hasEntitiesCreatedBy && + hasEntityEffectivePermissions && + hasSharedStateLinks && + hasEntityBridgePrincipal && !hasLegacyEntitiesMetadata ) } @@ -162,7 +171,7 @@ async function ensureExpectedSchema(postgresUrl: string): Promise { const composeProject = getElectricAgentsComposeProject() throw new Error( - `ElectricAgents test backend schema is stale: expected current tags/manifest/outbox/principals schema and no legacy entities.metadata column. Reset the matching backend with "docker compose -p ${composeProject} -f ${ELECTRIC_AGENTS_COMPOSE_FILE} down -v" and rerun the relevant Vitest project.` + `ElectricAgents test backend schema is stale: expected current tags/manifest/outbox/principals/permissions schema and no legacy entities.metadata column. Reset the matching backend with "docker compose -p ${composeProject} -f ${ELECTRIC_AGENTS_COMPOSE_FILE} down -v" and rerun the relevant Vitest project.` ) } export async function ensureElectricAgentsTestBackend(): Promise { diff --git a/packages/agents/src/agents/horton.ts b/packages/agents/src/agents/horton.ts index bfc6b0ad4a..31a78dd2a3 100644 --- a/packages/agents/src/agents/horton.ts +++ b/packages/agents/src/agents/horton.ts @@ -705,6 +705,13 @@ export function registerHorton( registry.define(`horton`, { description: `Friendly capable assistant — chat, code, research, dispatch`, creationSchema: hortonCreationSchema, + permissionGrants: [ + { + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `spawn`, + }, + ], handler: assistantHandler, }) diff --git a/packages/agents/src/agents/worker.ts b/packages/agents/src/agents/worker.ts index 7c52e9d70f..7d4b1ffcc2 100644 --- a/packages/agents/src/agents/worker.ts +++ b/packages/agents/src/agents/worker.ts @@ -292,6 +292,13 @@ export function registerWorker( const { streamFn, modelCatalog } = options registry.define(`worker`, { description: `Internal — generic worker spawned by other agents. Configure via spawn args (systemPrompt + tools + optional sharedDb).`, + permissionGrants: [ + { + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `spawn`, + }, + ], async handler(ctx) { const args = parseWorkerArgs(ctx.args) const readSet = new Set() diff --git a/packages/agents/test/builtin-pull-wake-registration.test.ts b/packages/agents/test/builtin-pull-wake-registration.test.ts index 8d6c0db366..5bfcf4e829 100644 --- a/packages/agents/test/builtin-pull-wake-registration.test.ts +++ b/packages/agents/test/builtin-pull-wake-registration.test.ts @@ -104,6 +104,34 @@ describe(`BuiltinAgentsServer pull-wake registration`, () => { ).toBe(false) }) + it(`grants all users spawn permission on built-in entity types`, async () => { + agentsServer = await startRecordingAgentsServer() + builtinServer = new BuiltinAgentsServer({ + agentServerUrl: agentsServer.url, + mockStreamFn, + pullWake: { runnerId: `test-runner` }, + }) + + await builtinServer.start() + + const horton = agentsServer.entityTypeBodies.find( + (body) => body.name === `horton` + ) + const worker = agentsServer.entityTypeBodies.find( + (body) => body.name === `worker` + ) + expect(horton?.permission_grants).toContainEqual({ + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `spawn`, + }) + expect(worker?.permission_grants).toContainEqual({ + subject_kind: `principal_kind`, + subject_value: `user`, + permission: `spawn`, + }) + }) + it(`registers through tenant path-prefixed server URLs`, async () => { agentsServer = await startRecordingAgentsServer() builtinServer = new BuiltinAgentsServer({