Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions apps/cloud/src/mcp-flow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
// → env.MCP_SESSION.idFromString() → stub.handleRequest()
// → the real McpSessionDO stale-session path
//
// Only one seam is faked: `McpAuth.verifyBearer`. The real impl calls
// WorkOS's JWKS endpoint, which we can't reach from the test isolate.
// Two auth seams are faked: `McpAuth.verifyBearer` and the live WorkOS
// membership check. The real bearer impl calls WorkOS's JWKS endpoint,
// which we can't reach from the test isolate.
// Test bearer format is `test-accept::<accountId>::<orgId|none>`
// (see `makeTestBearer` in test-worker.ts).
//
Expand Down Expand Up @@ -215,6 +216,23 @@ describe("/mcp verified token without org", () => {
});
});

describe("/mcp verified token without live org access", () => {
it("returns JSON-RPC -32001 before creating a session", async () => {
const response = await mcpPost({
bearer: makeTestBearer(nextAccountId(), `revoked_${nextOrgId()}`),
body: INITIALIZE_REQUEST,
});
expect(response.status).toBe(403);
const body = (await response.json()) as {
jsonrpc: string;
error: { code: number; message: string };
};
expect(body.jsonrpc).toBe("2.0");
expect(body.error.code).toBe(-32001);
expect(body.error.message).toMatch(/No organization/i);
});
});

// ---------------------------------------------------------------------------
// 5. POST /mcp on an unknown session-id
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -349,6 +367,46 @@ describe("/mcp session restore", () => {
expect(body.error?.code).toBe(-32003);
expect(body.error?.message).toMatch(/does not belong/i);
}, 15_000);

it("clears an existing session when live org access is revoked", async () => {
const orgId = `revoked_${nextOrgId()}`;
const accountId = nextAccountId();
const stub = env.MCP_SESSION.get(env.MCP_SESSION.newUniqueId());
const sessionId = stub.id.toString();

await runInDurableObject(stub, async (_instance, state) => {
await state.storage.put(SESSION_META_KEY, {
organizationId: orgId,
organizationName: "Revoked Org",
userId: accountId,
});
await state.storage.put(LAST_ACTIVITY_KEY, Date.now());
await state.storage.setAlarm(Date.now() + HEARTBEAT_MS);
});

const revokedResponse = await mcpPost({
bearer: makeTestBearer(accountId, orgId),
sessionId,
body: TOOLS_LIST_REQUEST,
});
expect(revokedResponse.status).toBe(403);
const body = (await revokedResponse.json()) as {
readonly jsonrpc: string;
readonly error?: { readonly code: number; readonly message: string };
};
expect(body.jsonrpc).toBe("2.0");
expect(body.error?.code).toBe(-32001);
expect(body.error?.message).toMatch(/No organization/i);

const stored = await runInDurableObject(stub, async (_instance, state) => ({
sessionMeta: await state.storage.get(SESSION_META_KEY),
lastActivity: await state.storage.get(LAST_ACTIVITY_KEY),
alarm: await state.storage.getAlarm(),
}));
expect(stored.sessionMeta).toBeUndefined();
expect(stored.lastActivity).toBeUndefined();
expect(stored.alarm).toBeNull();
}, 15_000);
});

describe("McpSessionDO alarm lifecycle", () => {
Expand Down
10 changes: 10 additions & 0 deletions apps/cloud/src/mcp-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,16 @@ export class McpSessionDO extends DurableObject {
return Effect.runPromise(program);
}

async clearSession(incoming?: IncomingTraceHeaders): Promise<void> {
return Effect.runPromise(
Effect.promise(() => this.cleanup()).pipe(
Effect.withSpan("McpSessionDO.clearSession"),
(eff) => withIncomingParent(incoming, eff),
Effect.provide(DoTelemetryLive),
),
);
}

private async runAlarm(): Promise<void> {
const lastActivityMs = await this.loadLastActivity();
const idleMs = Date.now() - lastActivityMs;
Expand Down
98 changes: 90 additions & 8 deletions apps/cloud/src/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import { createRemoteJWKSet } from "jose";

import { TelemetryLive } from "./services/telemetry";
import { McpJwtVerificationError, verifyMcpAccessToken, type VerifiedToken } from "./mcp-auth";
import { authorizeOrganization } from "./auth/authorize-organization";
import { UserStoreService } from "./auth/context";
import { CoreSharedServices } from "./api/core-shared-services";
import { DbService } from "./services/db";

// ---------------------------------------------------------------------------
// Constants
Expand Down Expand Up @@ -93,12 +97,38 @@ export class McpAuth extends Context.Tag("@executor/cloud/McpAuth")<
}
>() {}

export class McpOrganizationAuth extends Context.Tag("@executor/cloud/McpOrganizationAuth")<
McpOrganizationAuth,
{
readonly authorize: (
accountId: string,
organizationId: string,
) => Effect.Effect<boolean, unknown>;
}
>() {}

const verifyJwt = (token: string) =>
verifyMcpAccessToken(token, jwks, {
issuer: AUTHKIT_DOMAIN,
audience: RESOURCE_URL,
});

const DbLive = DbService.Live;
const UserStoreLive = UserStoreService.Live.pipe(Layer.provide(DbLive));
const McpOrganizationAuthServices = Layer.mergeAll(
DbLive,
UserStoreLive,
CoreSharedServices,
);

export const McpOrganizationAuthLive = Layer.succeed(McpOrganizationAuth, {
authorize: (accountId, organizationId) =>
authorizeOrganization(accountId, organizationId).pipe(
Effect.map((org) => org !== null),
Effect.provide(McpOrganizationAuthServices),
),
});

export const McpAuthLive = Layer.succeed(McpAuth, {
verifyBearer: Effect.fn("mcp.auth.verify_bearer")(function* (request) {
const authHeader = request.headers.get("authorization");
Expand Down Expand Up @@ -622,14 +652,59 @@ const forwardToExistingSession = (
return HttpServerResponse.raw(withMcpResponseHeaders(annotated));
});

const dispatchPost = (request: Request, token: VerifiedToken) =>
const clearExistingSession = (request: Request, sessionId: string) =>
Effect.gen(function* () {
const ns = env.MCP_SESSION;
const stub = ns.get(ns.idFromString(sessionId));
const propagation = yield* currentPropagationHeaders(request);
yield* Effect.promise(() => stub.clearSession(propagation) as Promise<void>).pipe(
Effect.catchAll(() => Effect.void),
Effect.withSpan("mcp.do.clear_session", {
attributes: { "mcp.request.session_id_present": true },
}),
);
});

const authorizeMcpOrganization = (
request: Request,
token: VerifiedToken,
sessionId: string | null,
) =>
Effect.gen(function* () {
const organizationId = token.organizationId;
if (!organizationId) {
return jsonRpcError(403, -32001, "No organization in session — log in via the web app first");
}

const auth = yield* McpOrganizationAuth;
const allowed = yield* auth.authorize(token.accountId, organizationId).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.annotateCurrentSpan({
"mcp.auth.organization_authorize_error": String(error),
});
return false;
}),
),
Effect.withSpan("mcp.auth.authorize_organization", {
attributes: { "mcp.auth.organization_id": organizationId },
}),
);
if (allowed) return null;

if (sessionId) {
yield* clearExistingSession(request, sessionId);
}
return jsonRpcError(403, -32001, "No organization in session — log in via the web app first");
});

const dispatchPost = (request: Request, token: VerifiedToken) =>
Effect.gen(function* () {
const sessionId = request.headers.get("mcp-session-id");
const authError = yield* authorizeMcpOrganization(request, token, sessionId);
if (authError) return authError;
const organizationId = token.organizationId!;

if (sessionId) return yield* forwardToExistingSession(request, sessionId, true, token);

const ns = env.MCP_SESSION;
Expand Down Expand Up @@ -664,13 +739,21 @@ const dispatchGet = (request: Request, token: VerifiedToken) => {
const sessionId = request.headers.get("mcp-session-id");
if (!sessionId)
return Effect.succeed(jsonRpcError(400, -32000, "mcp-session-id header required for SSE"));
return forwardToExistingSession(request, sessionId, false, token);
return Effect.gen(function* () {
const authError = yield* authorizeMcpOrganization(request, token, sessionId);
if (authError) return authError;
return yield* forwardToExistingSession(request, sessionId, false, token);
});
};

const dispatchDelete = (request: Request, token: VerifiedToken) => {
const sessionId = request.headers.get("mcp-session-id");
if (!sessionId) return Effect.succeed(HttpServerResponse.empty({ status: 204 }));
return forwardToExistingSession(request, sessionId, true, token);
return Effect.gen(function* () {
const authError = yield* authorizeMcpOrganization(request, token, sessionId);
if (authError) return authError;
return yield* forwardToExistingSession(request, sessionId, true, token);
});
};

// ---------------------------------------------------------------------------
Expand All @@ -696,14 +779,13 @@ export const classifyMcpPath = (pathname: string): McpRoute => {

/**
* Raw Effect-native MCP app. Exported so alternate entry points (e.g. the
* vitest-pool-workers test worker) can provide their own `McpAuth` layer —
* the only dependency we deliberately swap in tests because hitting the real
* WorkOS JWKS isn't practical. Every other layer stays real.
* vitest-pool-workers test worker) can provide their own auth layers because
* hitting WorkOS JWKS / membership APIs is not practical in the isolate.
*/
export const mcpApp: Effect.Effect<
HttpServerResponse.HttpServerResponse,
never,
HttpServerRequest.HttpServerRequest | McpAuth
HttpServerRequest.HttpServerRequest | McpAuth | McpOrganizationAuth
> = Effect.gen(function* () {
const httpRequest = yield* HttpServerRequest.HttpServerRequest;
const request = httpRequest.source as Request;
Expand Down Expand Up @@ -744,7 +826,7 @@ export const mcpApp: Effect.Effect<
);

const rawMcpFetch = HttpApp.toWebHandler(
mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, TelemetryLive))),
mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, McpOrganizationAuthLive, TelemetryLive))),
);

/**
Expand Down
22 changes: 19 additions & 3 deletions apps/cloud/src/test-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ import { Effect, Layer } from "effect";
import { drizzle } from "drizzle-orm/postgres-js";
import postgres, { type Sql } from "postgres";

import { McpAuth, McpAuthLive, classifyMcpPath, mcpApp } from "./mcp";
import {
McpAuth,
McpAuthLive,
McpOrganizationAuth,
McpOrganizationAuthLive,
classifyMcpPath,
mcpApp,
} from "./mcp";
import { organizations } from "./services/schema";
import { parseTestBearer } from "./test-bearer";
import { DoTelemetryLive } from "./services/telemetry";
Expand All @@ -34,6 +41,11 @@ const TestMcpAuthLive = Layer.succeed(McpAuth, {
}),
});

const TestMcpOrganizationAuthLive = Layer.succeed(McpOrganizationAuth, {
authorize: (_accountId, organizationId) =>
Effect.succeed(!organizationId.startsWith("revoked_")),
});

// ---------------------------------------------------------------------------
// Test seed endpoint
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -86,11 +98,15 @@ const handleSeedOrg = async (
// instrumentation, so we reuse DoTelemetryLive (it's a plain WebSdk +
// OTLPTraceExporter — not Durable-Object-specific) to stand in.
const testMcpFetch = HttpApp.toWebHandler(
mcpApp.pipe(Effect.provide(Layer.mergeAll(TestMcpAuthLive, DoTelemetryLive))),
mcpApp.pipe(
Effect.provide(
Layer.mergeAll(TestMcpAuthLive, TestMcpOrganizationAuthLive, DoTelemetryLive),
),
),
);

const realAuthMcpFetch = HttpApp.toWebHandler(
mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, DoTelemetryLive))),
mcpApp.pipe(Effect.provide(Layer.mergeAll(McpAuthLive, McpOrganizationAuthLive, DoTelemetryLive))),
);

export default {
Expand Down
Loading