Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/desktop-default-pull-wake-principal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-ax/agents-desktop': patch
---

Align the default pull-wake `owner_principal` in agents-desktop with the agents-server dev fallback (`system:dev-local`), so connecting to a local server without auth no longer fails with `owner_principal must match the authenticated principal`.
5 changes: 5 additions & 0 deletions .changeset/release-pull-wake-claim-after-dispatch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-ax/agents-server': patch
---

Fix pull-wake claims leaking in `consumer_claims` after dispatch. The release path in `callback-forward` was gated entirely on the in-memory write-token state, so any condition that lost or evicted the token (server restart, a newer wake on the same stream) would prevent `materializeReleasedClaim` from running and leave the DB row pinned at `status='active'`. The fix decouples the durable-row release (keyed by `consumerId + epoch`) from in-memory token cleanup, and uses `entityCleared || stillOwnsClaim` to gate the entity status transition back to `idle`. Includes regression tests in `test/webhook-forward-routing.test.ts`.
12 changes: 6 additions & 6 deletions packages/agents-desktop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ This starts both the UI dev server (with HMR) and the Electron main process.

### Environment variables

| Variable | Default | Description |
| -------------------------------------------- | ----------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `ELECTRIC_DESKTOP_PRINCIPAL` | _(none)_ | Sets the `electric-principal` header on all requests to the agents-server. Use `system:dev-local` for local development without auth. |
| `ELECTRIC_DESKTOP_PULL_WAKE_OWNER_PRINCIPAL` | `/principal/system%3Alocal-desktop` | Override the `owner_principal` used when registering the pull-wake runner. When `ELECTRIC_DESKTOP_PRINCIPAL` is set, this is derived from it automatically. |
| `ELECTRIC_DESKTOP_PULL_WAKE_RUNNER_ID` | _(auto-generated)_ | Fixed runner ID for the pull-wake runner. |
| `ELECTRIC_DESKTOP_PULL_WAKE_REGISTER_RUNNER` | `true` | Set to `false` to skip runner registration (runner must already exist on the server). |
| Variable | Default | Description |
| -------------------------------------------- | ------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `ELECTRIC_DESKTOP_PRINCIPAL` | _(none)_ | Sets the `electric-principal` header on all requests to the agents-server. Use `system:dev-local` for local development without auth. |
| `ELECTRIC_DESKTOP_PULL_WAKE_OWNER_PRINCIPAL` | `/principal/system%3Adev-local` | Override the `owner_principal` used when registering the pull-wake runner. When `ELECTRIC_DESKTOP_PRINCIPAL` is set, this is derived from it automatically. |
| `ELECTRIC_DESKTOP_PULL_WAKE_RUNNER_ID` | _(auto-generated)_ | Fixed runner ID for the pull-wake runner. |
| `ELECTRIC_DESKTOP_PULL_WAKE_REGISTER_RUNNER` | `true` | Set to `false` to skip runner registration (runner must already exist on the server). |

### Settings

Expand Down
2 changes: 1 addition & 1 deletion packages/agents-desktop/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ const PULL_WAKE_REGISTER_RUNNER =
)
const PULL_WAKE_OWNER_PRINCIPAL =
process.env.ELECTRIC_DESKTOP_PULL_WAKE_OWNER_PRINCIPAL?.trim() ||
`/principal/system%3Alocal-desktop`
`/principal/system%3Adev-local`
const DEV_PRINCIPAL = ((): string | null => {
const raw = process.env.ELECTRIC_DESKTOP_PRINCIPAL?.trim() || null
if (!raw) return null
Expand Down
13 changes: 10 additions & 3 deletions packages/agents-server/src/entity-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ export class PostgresRegistry {

async materializeReleasedClaim(
input: MaterializeReleasedClaimInput
): Promise<ConsumerClaim | null> {
): Promise<{ claim: ConsumerClaim | null; entityCleared: boolean }> {
const releasedAt = input.releasedAt ?? new Date()
const rows = await this.db
.update(consumerClaims)
Expand All @@ -345,8 +345,13 @@ export class PostgresRegistry {
.returning()

const claim = rows[0] ? this.rowToConsumerClaim(rows[0]) : null
let entityCleared = false
if (claim) {
await this.db
// entityCleared distinguishes "we were the active dispatch and now it's
// empty" from "a newer claim was already active for this entity." The
// WHERE clause matches our (consumerId, epoch) so an evicted-by-newer
// case correctly returns zero rows.
const cleared = await this.db
.update(entityDispatchState)
.set({
activeConsumerId: null,
Expand All @@ -366,8 +371,10 @@ export class PostgresRegistry {
eq(entityDispatchState.activeEpoch, input.epoch)
)
)
.returning({ entityUrl: entityDispatchState.entityUrl })
entityCleared = cleared.length > 0
}
return claim
return { claim, entityCleared }
}

async getActiveClaimsForRunner(
Expand Down
46 changes: 33 additions & 13 deletions packages/agents-server/src/routing/internal-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,15 @@ async function callbackForward(
const entity = await ctx.entityManager.registry.getEntityByStream(
target.primaryStream
)
if (entity && stillOwnsClaim) {
if (epoch !== undefined) {

// Release the consumer_claims row by its DB identity (consumerId,
// epoch). The in-memory write token is a separate concern (write
// authorization during the run); release of the durable row must
// succeed even if the token was lost (server restart) or evicted
// (a later wake re-minted for the same stream).
let entityCleared = false
if (epoch !== undefined) {
const result =
await ctx.entityManager.registry.materializeReleasedClaim?.({
consumerId,
epoch,
Expand All @@ -639,28 +646,41 @@ async function callbackForward(
})
: undefined,
})
}
entityCleared = result?.entityCleared ?? false
}

// Transition entity back to idle when either signal says it's safe:
// - entityCleared: our release just cleared the entity's active
// dispatch state, so no in-flight wake remains.
// - stillOwnsClaim: this consumer is still the in-memory write-token
// owner, so no newer wake has displaced it. Covers two cases:
// (a) retry of a failed done (first attempt cleared the DB state
// but failed to update status), (b) server restart scenarios where
// the token is intact even though entityDispatchState may diverge.
// If both are false, a newer wake owns the entity — leave status as-is.
if (entity && (entityCleared || stillOwnsClaim)) {
await ctx.entityManager.registry.updateStatus(entity.url, `idle`)
ctx.runtime.claimWriteTokens.clearStream(
ctx.service,
target.primaryStream
)
await ctx.entityBridgeManager.onEntityChanged(entity.url)
serverLog.info(
`[callback-forward] status updated to idle for ${entity.url}`
)
} else if (stillOwnsClaim) {
} else if (!entity) {
serverLog.warn(
`[callback-forward] done received but no entity found for stream=${target.primaryStream}`
)
}

// Clear the in-memory write token only if this consumer still owns it.
// If a newer wake has taken over, that newer wake owns the token now
// and we must not clear it out from under it.
if (stillOwnsClaim) {
ctx.runtime.claimWriteTokens.clearStream(
ctx.service,
target.primaryStream
)
} else if (entity) {
serverLog.info(
`[callback-forward] done ignored for stale claim stream=${target.primaryStream} consumer=${consumerId}`
)
} else {
serverLog.warn(
`[callback-forward] done received but no entity found for stream=${target.primaryStream}`
`[callback-forward] done arrived after in-memory token evicted (stream=${target.primaryStream} consumer=${consumerId})`
)
}
} else if (requestBody?.done === true) {
Expand Down
195 changes: 195 additions & 0 deletions packages/agents-server/test/webhook-forward-routing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ function buildContext(overrides: Partial<TenantContext> = {}): TenantContext {
registry: {
getEntityByStream: vi.fn().mockResolvedValue(entity),
updateStatus: vi.fn().mockResolvedValue(undefined),
materializeReleasedClaim: vi.fn().mockResolvedValue({
claim: null,
entityCleared: true,
}),
materializeHeartbeatClaim: vi.fn().mockResolvedValue(undefined),
},
enrichPayload: vi.fn(async (payload: Record<string, unknown>) => ({
...payload,
Expand Down Expand Up @@ -578,4 +583,194 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => {
fetchSpy.mockRestore()
}
})

describe(`claim release on done callback (regression for #4340)`, () => {
const upstreamOk = (): void => {
vi.spyOn(globalThis, `fetch`).mockResolvedValue(
new Response(JSON.stringify({ ok: true, next_wake: false }), {
headers: { 'content-type': `application/json` },
})
)
}

it(`marks the consumer_claims row released and the entity idle on done`, async () => {
const select = selectDb([
{
callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`,
primaryStream: `/horton/demo/main`,
},
])
upstreamOk()
const ctx = buildContext({
pgDb: { select: select.select } as any,
})
// Simulate the runtime's claim phase having minted a token.
ctx.runtime.claimWriteTokens.mint(
`tenant-a`,
`/horton/demo/main`,
`wake-1`
)

try {
const response = await globalRouter.fetch(
new Request(`http://agents.local/_electric/callback-forward/wake-1`, {
method: `POST`,
headers: {
'content-type': `application/json`,
authorization: `Bearer callback-token`,
},
body: JSON.stringify({
epoch: 7,
acks: [{ path: `/horton/demo/main`, offset: `1` }],
done: true,
}),
}),
ctx
)

expect(response.status).toBe(200)
expect(
ctx.entityManager.registry.materializeReleasedClaim
).toHaveBeenCalledWith(
expect.objectContaining({
consumerId: `wake-1`,
epoch: 7,
ackedStreams: [{ path: `/horton/demo/main`, offset: `1` }],
})
)
expect(ctx.entityManager.registry.updateStatus).toHaveBeenCalledWith(
`/horton/demo`,
`idle`
)
} finally {
vi.mocked(globalThis.fetch).mockRestore()
}
})

it(`still releases the consumer_claims row when the in-memory write token is missing (e.g. server restart)`, async () => {
// Reproduces the production failure mode where the in-memory
// ClaimWriteTokenStore is empty (server restarted between dispatch and
// done, or another wake evicted the token) but the consumer_claims row
// is still active in the DB. Today the release path skips
// materializeReleasedClaim under this condition and the row leaks.
const select = selectDb([
{
callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`,
primaryStream: `/horton/demo/main`,
},
])
upstreamOk()
const ctx = buildContext({
pgDb: { select: select.select } as any,
})
// Intentionally do NOT mint a write token — simulates the token
// being lost between the runtime's claim phase and its done call.

try {
const response = await globalRouter.fetch(
new Request(`http://agents.local/_electric/callback-forward/wake-1`, {
method: `POST`,
headers: {
'content-type': `application/json`,
authorization: `Bearer callback-token`,
},
body: JSON.stringify({
epoch: 7,
acks: [{ path: `/horton/demo/main`, offset: `1` }],
done: true,
}),
}),
ctx
)

expect(response.status).toBe(200)
// The DB row identity (consumerId, epoch) is sufficient to release
// the claim — release must not depend on in-memory write-token state.
expect(
ctx.entityManager.registry.materializeReleasedClaim
).toHaveBeenCalledWith(
expect.objectContaining({
consumerId: `wake-1`,
epoch: 7,
})
)
// The entity should transition back to idle so the UI no longer
// shows it as "running" after the agent finishes.
expect(ctx.entityManager.registry.updateStatus).toHaveBeenCalledWith(
`/horton/demo`,
`idle`
)
} finally {
vi.mocked(globalThis.fetch).mockRestore()
}
})

it(`releases an earlier wake's claim even after a later wake evicted its in-memory token`, async () => {
// Same defect, different cause: two wakes for the same entity arrive
// close together. The second's mint evicts the first's token. The
// first wake's done call then finds stillOwnsClaim=false and skips
// the release.
const select = selectDb([
{
callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`,
primaryStream: `/horton/demo/main`,
},
])
upstreamOk()
const ctx = buildContext({
pgDb: { select: select.select } as any,
})
// Simulate the DB state where consumer-old's row is still active but
// entityDispatchState's active claim is consumer-new (the later wake).
;(
ctx.entityManager.registry.materializeReleasedClaim as any
).mockResolvedValue({ claim: null, entityCleared: false })

ctx.runtime.claimWriteTokens.mint(
`tenant-a`,
`/horton/demo/main`,
`wake-1`
)
// A second wake arrives and re-mints for the same stream, evicting
// wake-1 from the in-memory store.
ctx.runtime.claimWriteTokens.mint(
`tenant-a`,
`/horton/demo/main`,
`wake-2`
)

try {
await globalRouter.fetch(
new Request(`http://agents.local/_electric/callback-forward/wake-1`, {
method: `POST`,
headers: {
'content-type': `application/json`,
authorization: `Bearer callback-token`,
},
body: JSON.stringify({
epoch: 7,
acks: [{ path: `/horton/demo/main`, offset: `1` }],
done: true,
}),
}),
ctx
)

// The DB row for wake-1 must still be released.
expect(
ctx.entityManager.registry.materializeReleasedClaim
).toHaveBeenCalledWith(
expect.objectContaining({
consumerId: `wake-1`,
epoch: 7,
})
)
// But the entity status must NOT be set to idle — wake-2 is still
// in flight and owns the entity's active dispatch.
expect(ctx.entityManager.registry.updateStatus).not.toHaveBeenCalled()
} finally {
vi.mocked(globalThis.fetch).mockRestore()
}
})
})
})
Loading