Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions src/transports/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,22 @@ export class HttpStreamTransport extends AbstractTransport {
authData = (authResult as AuthResult).data as RequestContextData || {};
}

// Allow re-initialization even when a stale session ID is provided.
// Clients like Cline may keep sending the old session ID header after
// a session is lost (server restart, transport error, etc.).
const isReInitialize = sessionId && !this._transports[sessionId] && body && isInitializeRequest(body);

// Handle different request scenarios
if (sessionId && this._transports[sessionId]) {
// Existing session
transport = this._transports[sessionId];
logger.debug(`Reusing existing session: ${sessionId}`);
} else if (isInitialize) {
// New session initialization
logger.info('Creating new session for initialization request');
} else if (isInitialize || isReInitialize) {
if (isReInitialize) {
logger.info(`Stale session ID ${sessionId} — creating new session for re-initialization`);
} else {
logger.info('Creating new session for initialization request');
}

transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
Expand All @@ -161,10 +169,11 @@ export class HttpStreamTransport extends AbstractTransport {
};

transport.onerror = (error) => {
logger.error(`Transport error for session: ${error}`);
if (transport.sessionId) {
delete this._transports[transport.sessionId];
}
// Log the error but do NOT remove the session. The SDK fires onerror
// for transient issues (parse errors, failed SSE writes) that don't
// invalidate the session. Removing the session here causes "Session
// not found" errors on subsequent requests from the same client.
logger.error(`Transport error for session ${transport.sessionId}: ${error}`);
};

transport.onmessage = async (message: JSONRPCMessage) => {
Expand All @@ -182,7 +191,7 @@ export class HttpStreamTransport extends AbstractTransport {
this.sendError(res, 400, -32000, 'Bad Request: No valid session ID provided');
return;
} else {
// Session ID provided but not found
// Session ID provided but not found (and not an initialize request)
this.sendError(res, 404, -32001, 'Session not found');
return;
}
Expand Down Expand Up @@ -268,8 +277,11 @@ export class HttpStreamTransport extends AbstractTransport {
}

if (failedSessions.length > 0) {
failedSessions.forEach((sessionId) => delete this._transports[sessionId]);
logger.warn(`Failed to send message to ${failedSessions.length} sessions.`);
// Log but don't remove sessions on transient send failures.
// The SDK throws when no SSE stream is currently open for a request ID,
// which is a normal condition (e.g. client momentarily between requests).
// The session itself remains valid for future requests.
logger.warn(`Failed to broadcast to ${failedSessions.length} session(s) — sessions preserved for future requests.`);
}
}

Expand Down
261 changes: 261 additions & 0 deletions tests/transports/http/server-session-resilience.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import { describe, it, expect, beforeEach, afterEach } from '@jest/globals';
import { HttpStreamTransport } from '../../../src/transports/http/server.js';
import http from 'node:http';

/**
* Regression tests for session resilience in HttpStreamTransport.
*
* These cover two bugs that caused "Session not found" (-32001) errors
* for clients like Cline after a session was established:
*
* 1. onerror callback destroyed sessions on transient SDK errors (parse errors,
* failed SSE writes) — the session should survive these.
* 2. Re-initialization with a stale session ID was rejected with 404 instead
* of creating a new session.
*/
describe('HttpStreamTransport — Session Resilience', () => {
let transport: HttpStreamTransport;
let testPort: number;
// Track open requests so we can clean them up before closing the transport
let openRequests: http.ClientRequest[];

const initializeBody = {
jsonrpc: '2.0',
method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {},
clientInfo: { name: 'test-client', version: '1.0.0' },
},
id: 1,
};

beforeEach(() => {
testPort = 4000 + Math.floor(Math.random() * 1000);
openRequests = [];
transport = new HttpStreamTransport({
port: testPort,
endpoint: '/mcp',
responseMode: 'stream',
});
});

afterEach(async () => {
// Destroy any open HTTP connections so the server can shut down cleanly
for (const req of openRequests) {
req.destroy();
}
openRequests = [];

if (transport.isRunning()) {
await transport.close();
}
});

function getTransports(): Record<string, any> {
return (transport as any)._transports;
}

function wait(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Polls until at least one session exists in the transport map.
* Throws after the timeout if no session appears.
*/
async function waitForSession(timeoutMs = 2000): Promise<string> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
const ids = Object.keys(getTransports());
if (ids.length > 0) return ids[0];
await wait(20);
}
throw new Error('Timed out waiting for session to be created');
}

/**
* Fire-and-forget POST. The SSE response may never complete (no MCP server),
* so we don't await the response — we track the request for cleanup.
*/
function firePost(body: any, sessionId?: string): void {
const headers: http.OutgoingHttpHeaders = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
};
if (sessionId) {
headers['Mcp-Session-Id'] = sessionId;
}
const bodyStr = JSON.stringify(body);

const req = http.request({
hostname: 'localhost',
port: testPort,
path: '/mcp',
method: 'POST',
headers: { ...headers, 'Content-Length': Buffer.byteLength(bodyStr) },
});
req.on('error', () => {});
req.write(bodyStr);
req.end();

openRequests.push(req);
}

/**
* Full request/response for non-streaming responses (errors, 404s, etc.)
*/
function makeRequest(
body: any,
sessionId?: string,
): Promise<{ statusCode: number; headers: http.IncomingHttpHeaders; body: string }> {
return new Promise((resolve, reject) => {
const headers: http.OutgoingHttpHeaders = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
};
if (sessionId) {
headers['Mcp-Session-Id'] = sessionId;
}
const bodyStr = JSON.stringify(body);

const req = http.request(
{
hostname: 'localhost',
port: testPort,
path: '/mcp',
method: 'POST',
headers: { ...headers, 'Content-Length': Buffer.byteLength(bodyStr) },
},
(res) => {
let responseBody = '';
res.on('data', (chunk: Buffer) => {
responseBody += chunk.toString();
});
res.on('end', () => {
resolve({
statusCode: res.statusCode!,
headers: res.headers,
body: responseBody,
});
});
},
);
req.on('error', reject);
req.write(bodyStr);
req.end();

openRequests.push(req);
});
}

// ---------------------------------------------------------------------------
// Bug 1: onerror must NOT destroy sessions
// ---------------------------------------------------------------------------
describe('onerror should not destroy sessions', () => {
it('should keep session alive after onerror fires on the internal transport', async () => {
await transport.start();
transport.onmessage = async () => {};

firePost(initializeBody);
const sessionId = await waitForSession();

// Simulate the SDK firing onerror (e.g. parse error on a bad request)
const internalTransport = getTransports()[sessionId];
internalTransport.onerror?.(new Error('Simulated transient error'));

// Session must still be in the map
expect(getTransports()[sessionId]).toBeDefined();
});

it('should keep session alive after multiple onerror events', async () => {
await transport.start();
transport.onmessage = async () => {};

firePost(initializeBody);
const sessionId = await waitForSession();
const internalTransport = getTransports()[sessionId];

internalTransport.onerror?.(new Error('Error 1'));
internalTransport.onerror?.(new Error('Error 2'));
internalTransport.onerror?.(new Error('Error 3'));

expect(getTransports()[sessionId]).toBeDefined();
});
});

// ---------------------------------------------------------------------------
// Bug 2: Re-initialization with stale session ID
// ---------------------------------------------------------------------------
describe('re-initialization with stale session ID', () => {
it('should create a new session instead of returning 404', async () => {
await transport.start();
transport.onmessage = async () => {};

// Send an initialize request with a session ID that doesn't exist
firePost(initializeBody, 'stale-session-id-that-does-not-exist');
const sessionId = await waitForSession();

// A new session was created (not rejected with 404)
expect(sessionId).not.toBe('stale-session-id-that-does-not-exist');
expect(getTransports()[sessionId]).toBeDefined();
});

it('should still reject non-initialize requests with unknown session IDs', async () => {
await transport.start();
transport.onmessage = async () => {};

const response = await makeRequest(
{ jsonrpc: '2.0', method: 'tools/list', id: 1 },
'nonexistent-session-id',
);

expect(response.statusCode).toBe(404);
expect(response.body).toContain('Session not found');
});
});

// ---------------------------------------------------------------------------
// onclose SHOULD still clean up sessions (correct behavior preserved)
// ---------------------------------------------------------------------------
describe('onclose should still remove sessions', () => {
it('should remove session when transport is closed', async () => {
await transport.start();
transport.onmessage = async () => {};

firePost(initializeBody);
const sessionId = await waitForSession();
expect(getTransports()[sessionId]).toBeDefined();

// Simulate the SDK calling close (as it does for DELETE requests)
const internalTransport = getTransports()[sessionId];
await internalTransport.close();

expect(getTransports()[sessionId]).toBeUndefined();
});
});

// ---------------------------------------------------------------------------
// Broadcast send failures should not destroy sessions
// ---------------------------------------------------------------------------
describe('broadcast failures should not destroy sessions', () => {
it('should preserve sessions after a failed broadcast send', async () => {
await transport.start();
transport.onmessage = async () => {};

firePost(initializeBody);
const sessionId = await waitForSession();

// Monkey-patch the internal transport's send to throw
const internalTransport = getTransports()[sessionId];
internalTransport.send = async () => {
throw new Error('Simulated send failure');
};

// Broadcast — should NOT remove the session
await transport.send({ jsonrpc: '2.0', method: 'notification/test' });

expect(getTransports()[sessionId]).toBeDefined();
});
});
});