diff --git a/apps/cloud/src/mcp-flow.test.ts b/apps/cloud/src/mcp-flow.test.ts index a95632108..727ece824 100644 --- a/apps/cloud/src/mcp-flow.test.ts +++ b/apps/cloud/src/mcp-flow.test.ts @@ -11,8 +11,9 @@ // → env.MCP_SESSION.idFromString() → stub.handleRequest() // → the real McpSessionDO stale-session path // -// Only one seam is faked: `McpAuth.verifyBearer`. The real impl calls -// WorkOS's JWKS endpoint, which we can't reach from the test isolate. +// Two auth seams are faked: `McpAuth.verifyBearer` and the live WorkOS +// membership check. The real bearer impl calls WorkOS's JWKS endpoint, +// which we can't reach from the test isolate. // Test bearer format is `test-accept::::` // (see `makeTestBearer` in test-worker.ts). // @@ -215,6 +216,23 @@ describe("/mcp verified token without org", () => { }); }); +describe("/mcp verified token without live org access", () => { + it("returns JSON-RPC -32001 before creating a session", async () => { + const response = await mcpPost({ + bearer: makeTestBearer(nextAccountId(), `revoked_${nextOrgId()}`), + body: INITIALIZE_REQUEST, + }); + expect(response.status).toBe(403); + const body = (await response.json()) as { + jsonrpc: string; + error: { code: number; message: string }; + }; + expect(body.jsonrpc).toBe("2.0"); + expect(body.error.code).toBe(-32001); + expect(body.error.message).toMatch(/No organization/i); + }); +}); + // --------------------------------------------------------------------------- // 5. POST /mcp on an unknown session-id // --------------------------------------------------------------------------- @@ -349,6 +367,46 @@ describe("/mcp session restore", () => { expect(body.error?.code).toBe(-32003); expect(body.error?.message).toMatch(/does not belong/i); }, 15_000); + + it("clears an existing session when live org access is revoked", async () => { + const orgId = `revoked_${nextOrgId()}`; + const accountId = nextAccountId(); + const stub = env.MCP_SESSION.get(env.MCP_SESSION.newUniqueId()); + const sessionId = stub.id.toString(); + + await runInDurableObject(stub, async (_instance, state) => { + await state.storage.put(SESSION_META_KEY, { + organizationId: orgId, + organizationName: "Revoked Org", + userId: accountId, + }); + await state.storage.put(LAST_ACTIVITY_KEY, Date.now()); + await state.storage.setAlarm(Date.now() + HEARTBEAT_MS); + }); + + const revokedResponse = await mcpPost({ + bearer: makeTestBearer(accountId, orgId), + sessionId, + body: TOOLS_LIST_REQUEST, + }); + expect(revokedResponse.status).toBe(403); + const body = (await revokedResponse.json()) as { + readonly jsonrpc: string; + readonly error?: { readonly code: number; readonly message: string }; + }; + expect(body.jsonrpc).toBe("2.0"); + expect(body.error?.code).toBe(-32001); + expect(body.error?.message).toMatch(/No organization/i); + + const stored = await runInDurableObject(stub, async (_instance, state) => ({ + sessionMeta: await state.storage.get(SESSION_META_KEY), + lastActivity: await state.storage.get(LAST_ACTIVITY_KEY), + alarm: await state.storage.getAlarm(), + })); + expect(stored.sessionMeta).toBeUndefined(); + expect(stored.lastActivity).toBeUndefined(); + expect(stored.alarm).toBeNull(); + }, 15_000); }); describe("McpSessionDO alarm lifecycle", () => { diff --git a/apps/cloud/src/mcp-session.ts b/apps/cloud/src/mcp-session.ts index fa26ad870..8eece28fe 100644 --- a/apps/cloud/src/mcp-session.ts +++ b/apps/cloud/src/mcp-session.ts @@ -594,6 +594,16 @@ export class McpSessionDO extends DurableObject { return Effect.runPromise(program); } + async clearSession(incoming?: IncomingTraceHeaders): Promise { + return Effect.runPromise( + Effect.promise(() => this.cleanup()).pipe( + Effect.withSpan("McpSessionDO.clearSession"), + (eff) => withIncomingParent(incoming, eff), + Effect.provide(DoTelemetryLive), + ), + ); + } + private async runAlarm(): Promise { const lastActivityMs = await this.loadLastActivity(); const idleMs = Date.now() - lastActivityMs; diff --git a/apps/cloud/src/mcp.ts b/apps/cloud/src/mcp.ts index 5720a6945..d3e2b8bad 100644 --- a/apps/cloud/src/mcp.ts +++ b/apps/cloud/src/mcp.ts @@ -22,6 +22,10 @@ import { createRemoteJWKSet } from "jose"; import { TelemetryLive } from "./services/telemetry"; import { McpJwtVerificationError, verifyMcpAccessToken, type VerifiedToken } from "./mcp-auth"; +import { authorizeOrganization } from "./auth/authorize-organization"; +import { UserStoreService } from "./auth/context"; +import { CoreSharedServices } from "./api/core-shared-services"; +import { DbService } from "./services/db"; // --------------------------------------------------------------------------- // Constants @@ -93,12 +97,38 @@ export class McpAuth extends Context.Tag("@executor/cloud/McpAuth")< } >() {} +export class McpOrganizationAuth extends Context.Tag("@executor/cloud/McpOrganizationAuth")< + McpOrganizationAuth, + { + readonly authorize: ( + accountId: string, + organizationId: string, + ) => Effect.Effect; + } +>() {} + const verifyJwt = (token: string) => verifyMcpAccessToken(token, jwks, { issuer: AUTHKIT_DOMAIN, audience: RESOURCE_URL, }); +const DbLive = DbService.Live; +const UserStoreLive = UserStoreService.Live.pipe(Layer.provide(DbLive)); +const McpOrganizationAuthServices = Layer.mergeAll( + DbLive, + UserStoreLive, + CoreSharedServices, +); + +export const McpOrganizationAuthLive = Layer.succeed(McpOrganizationAuth, { + authorize: (accountId, organizationId) => + authorizeOrganization(accountId, organizationId).pipe( + Effect.map((org) => org !== null), + Effect.provide(McpOrganizationAuthServices), + ), +}); + export const McpAuthLive = Layer.succeed(McpAuth, { verifyBearer: Effect.fn("mcp.auth.verify_bearer")(function* (request) { const authHeader = request.headers.get("authorization"); @@ -622,14 +652,59 @@ const forwardToExistingSession = ( return HttpServerResponse.raw(withMcpResponseHeaders(annotated)); }); -const dispatchPost = (request: Request, token: VerifiedToken) => +const clearExistingSession = (request: Request, sessionId: string) => + Effect.gen(function* () { + const ns = env.MCP_SESSION; + const stub = ns.get(ns.idFromString(sessionId)); + const propagation = yield* currentPropagationHeaders(request); + yield* Effect.promise(() => stub.clearSession(propagation) as Promise).pipe( + Effect.catchAll(() => Effect.void), + Effect.withSpan("mcp.do.clear_session", { + attributes: { "mcp.request.session_id_present": true }, + }), + ); + }); + +const authorizeMcpOrganization = ( + request: Request, + token: VerifiedToken, + sessionId: string | null, +) => Effect.gen(function* () { const organizationId = token.organizationId; if (!organizationId) { return jsonRpcError(403, -32001, "No organization in session — log in via the web app first"); } + const auth = yield* McpOrganizationAuth; + const allowed = yield* auth.authorize(token.accountId, organizationId).pipe( + Effect.catchAll((error) => + Effect.gen(function* () { + yield* Effect.annotateCurrentSpan({ + "mcp.auth.organization_authorize_error": String(error), + }); + return false; + }), + ), + Effect.withSpan("mcp.auth.authorize_organization", { + attributes: { "mcp.auth.organization_id": organizationId }, + }), + ); + if (allowed) return null; + + if (sessionId) { + yield* clearExistingSession(request, sessionId); + } + return jsonRpcError(403, -32001, "No organization in session — log in via the web app first"); + }); + +const dispatchPost = (request: Request, token: VerifiedToken) => + Effect.gen(function* () { const sessionId = request.headers.get("mcp-session-id"); + const authError = yield* authorizeMcpOrganization(request, token, sessionId); + if (authError) return authError; + const organizationId = token.organizationId!; + if (sessionId) return yield* forwardToExistingSession(request, sessionId, true, token); const ns = env.MCP_SESSION; @@ -664,13 +739,21 @@ const dispatchGet = (request: Request, token: VerifiedToken) => { const sessionId = request.headers.get("mcp-session-id"); if (!sessionId) return Effect.succeed(jsonRpcError(400, -32000, "mcp-session-id header required for SSE")); - return forwardToExistingSession(request, sessionId, false, token); + return Effect.gen(function* () { + const authError = yield* authorizeMcpOrganization(request, token, sessionId); + if (authError) return authError; + return yield* forwardToExistingSession(request, sessionId, false, token); + }); }; const dispatchDelete = (request: Request, token: VerifiedToken) => { const sessionId = request.headers.get("mcp-session-id"); if (!sessionId) return Effect.succeed(HttpServerResponse.empty({ status: 204 })); - return forwardToExistingSession(request, sessionId, true, token); + return Effect.gen(function* () { + const authError = yield* authorizeMcpOrganization(request, token, sessionId); + if (authError) return authError; + return yield* forwardToExistingSession(request, sessionId, true, token); + }); }; // --------------------------------------------------------------------------- @@ -696,14 +779,13 @@ export const classifyMcpPath = (pathname: string): McpRoute => { /** * Raw Effect-native MCP app. Exported so alternate entry points (e.g. the - * vitest-pool-workers test worker) can provide their own `McpAuth` layer — - * the only dependency we deliberately swap in tests because hitting the real - * WorkOS JWKS isn't practical. Every other layer stays real. + * vitest-pool-workers test worker) can provide their own auth layers because + * hitting WorkOS JWKS / membership APIs is not practical in the isolate. */ export const mcpApp: Effect.Effect< HttpServerResponse.HttpServerResponse, never, - HttpServerRequest.HttpServerRequest | McpAuth + HttpServerRequest.HttpServerRequest | McpAuth | McpOrganizationAuth > = Effect.gen(function* () { const httpRequest = yield* HttpServerRequest.HttpServerRequest; const request = httpRequest.source as Request; @@ -744,7 +826,7 @@ export const mcpApp: Effect.Effect< ); const rawMcpFetch = HttpApp.toWebHandler( - mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, TelemetryLive))), + mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, McpOrganizationAuthLive, TelemetryLive))), ); /** diff --git a/apps/cloud/src/test-worker.ts b/apps/cloud/src/test-worker.ts index f81c0d656..bce0d562e 100644 --- a/apps/cloud/src/test-worker.ts +++ b/apps/cloud/src/test-worker.ts @@ -18,7 +18,14 @@ import { Effect, Layer } from "effect"; import { drizzle } from "drizzle-orm/postgres-js"; import postgres, { type Sql } from "postgres"; -import { McpAuth, McpAuthLive, classifyMcpPath, mcpApp } from "./mcp"; +import { + McpAuth, + McpAuthLive, + McpOrganizationAuth, + McpOrganizationAuthLive, + classifyMcpPath, + mcpApp, +} from "./mcp"; import { organizations } from "./services/schema"; import { parseTestBearer } from "./test-bearer"; import { DoTelemetryLive } from "./services/telemetry"; @@ -34,6 +41,11 @@ const TestMcpAuthLive = Layer.succeed(McpAuth, { }), }); +const TestMcpOrganizationAuthLive = Layer.succeed(McpOrganizationAuth, { + authorize: (_accountId, organizationId) => + Effect.succeed(!organizationId.startsWith("revoked_")), +}); + // --------------------------------------------------------------------------- // Test seed endpoint // --------------------------------------------------------------------------- @@ -86,11 +98,15 @@ const handleSeedOrg = async ( // instrumentation, so we reuse DoTelemetryLive (it's a plain WebSdk + // OTLPTraceExporter — not Durable-Object-specific) to stand in. const testMcpFetch = HttpApp.toWebHandler( - mcpApp.pipe(Effect.provide(Layer.mergeAll(TestMcpAuthLive, DoTelemetryLive))), + mcpApp.pipe( + Effect.provide( + Layer.mergeAll(TestMcpAuthLive, TestMcpOrganizationAuthLive, DoTelemetryLive), + ), + ), ); const realAuthMcpFetch = HttpApp.toWebHandler( - mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, DoTelemetryLive))), + mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, McpOrganizationAuthLive, DoTelemetryLive))), ); export default {