Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 27 additions & 194 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import { join } from 'path'
import { readFileSync } from 'fs'
import { tmpdir } from 'os'
import { createServer } from 'http'
import { randomUUID } from 'crypto'
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'

const __dirname = import.meta.dirname

Expand Down Expand Up @@ -67,8 +65,7 @@ async function getApiKeyInteractively (): Promise<string> {
// Initialize API key
let SOCKET_API_KEY = process.env['SOCKET_API_KEY'] || ''

// Transport management
const transports: Record<string, StreamableHTTPServerTransport> = {}
// No session management: each HTTP request is handled statelessly

// Create server instance
const server = new McpServer({
Expand Down Expand Up @@ -246,6 +243,9 @@ if (useHttp) {
// HTTP mode with Server-Sent Events
logger.info(`Starting HTTP server on port ${port}`)

// Singleton transport to preserve initialization state without explicit sessions
let httpTransport: StreamableHTTPServerTransport | null = null

const httpServer = createServer(async (req, res) => {
// Validate Origin header as required by MCP spec
const origin = req.headers.origin
Expand Down Expand Up @@ -275,9 +275,8 @@ if (useHttp) {
} else {
res.setHeader('Access-Control-Allow-Origin', 'http://localhost:3000')
}
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS')
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, mcp-session-id, Accept, Last-Event-ID')
res.setHeader('Access-Control-Expose-Headers', 'mcp-session-id')
res.setHeader('Access-Control-Allow-Methods', 'POST, OPTIONS')
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Accept')

if (req.method === 'OPTIONS') {
res.writeHead(200)
Expand Down Expand Up @@ -313,93 +312,38 @@ if (useHttp) {

if (url.pathname === '/') {
if (req.method === 'POST') {
// Validate Accept header as required by MCP spec
const acceptHeader = req.headers.accept
if (!acceptHeader || (!acceptHeader.includes('application/json') && !acceptHeader.includes('text/event-stream'))) {
logger.warn(`Invalid Accept header: ${acceptHeader}`)
res.writeHead(400, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({
jsonrpc: '2.0',
error: { code: -32000, message: 'Bad Request: Accept header must include application/json or text/event-stream' },
id: null
}))
return
}

// Handle JSON-RPC messages
// Handle JSON-RPC messages statelessly
let body = ''
req.on('data', chunk => (body += chunk))
req.on('end', async () => {
try {
const jsonData = JSON.parse(body)
const sessionId = req.headers['mcp-session-id'] as string

// Validate session ID format if provided (must contain only visible ASCII characters)
if (sessionId && !/^[\x21-\x7E]+$/.test(sessionId)) {
logger.warn(`Invalid session ID format: ${sessionId}`)
res.writeHead(400, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({
jsonrpc: '2.0',
error: { code: -32000, message: 'Bad Request: Session ID must contain only visible ASCII characters' },
id: jsonData.id || null
}))
return
}

let transport: StreamableHTTPServerTransport

if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId]
} else if (!sessionId) {
// Create new session (either for initialize request or fallback)
const newSessionId = randomUUID()
const isInit = isInitializeRequest(jsonData)

if (isInit) {
logger.info(`Creating new session for initialize request: ${newSessionId}`)
} else {
logger.warn(`Creating fallback session for non-initialize request: ${newSessionId}`)
// If this is an initialize, reset the singleton transport so clients can (re)initialize cleanly
if (jsonData && jsonData.method === 'initialize') {
if (httpTransport) {
try { httpTransport.close() } catch {}
}

transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => newSessionId,
onsessioninitialized: (id) => {
transports[id] = transport
logger.info(`Session initialized: ${id}`)
// Set session ID in response headers as required by MCP spec
res.setHeader('mcp-session-id', id)
}
httpTransport = new StreamableHTTPServerTransport({
// Stateless mode: no session management required
sessionIdGenerator: undefined,
// Return JSON responses to avoid SSE streaming
enableJsonResponse: true
})

transport.onclose = () => {
const sid = transport.sessionId
if (sid && transports[sid]) {
delete transports[sid]
logger.info(`Session closed: ${sid}`)
}
}

await server.connect(transport)
await transport.handleRequest(req, res, jsonData)
return
} else {
// Invalid request - session ID provided but not found
logger.error(`Invalid session ID: ${sessionId}. Active sessions count: ${Object.keys(transports).length}`)
res.writeHead(400)
res.end(JSON.stringify({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Invalid session ID. Please initialize a new session first.'
},
id: jsonData.id || null
}))
await server.connect(httpTransport)
await httpTransport.handleRequest(req, res, jsonData)
return
}

// Handle request with existing transport
await transport.handleRequest(req, res, jsonData)
// For non-initialize requests, ensure transport exists (client should have initialized already)
if (!httpTransport) {
httpTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
enableJsonResponse: true
})
await server.connect(httpTransport)
}
await httpTransport.handleRequest(req, res, jsonData)
} catch (error) {
logger.error(`Error processing POST request: ${error}`)
if (!res.headersSent) {
Expand All @@ -412,117 +356,6 @@ if (useHttp) {
}
}
})
} else if (req.method === 'GET') {
// Validate Accept header for SSE as required by MCP spec
const acceptHeader = req.headers.accept
if (!acceptHeader || !acceptHeader.includes('text/event-stream')) {
logger.warn(`GET request without text/event-stream Accept header: ${acceptHeader}`)
res.writeHead(405, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({
jsonrpc: '2.0',
error: { code: -32000, message: 'Method Not Allowed: GET requires Accept: text/event-stream' },
id: null
}))
return
}

// Handle SSE streams
const sessionId = req.headers['mcp-session-id'] as string

// Validate session ID format
if (sessionId && !/^[\x21-\x7E]+$/.test(sessionId)) {
logger.warn(`Invalid session ID format in GET request: ${sessionId}`)
res.writeHead(400, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({
jsonrpc: '2.0',
error: { code: -32000, message: 'Bad Request: Session ID must contain only visible ASCII characters' },
id: null
}))
return
}

if (!sessionId || !transports[sessionId]) {
logger.warn(`SSE request with invalid session ID: ${sessionId}`)
res.writeHead(400, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({
jsonrpc: '2.0',
error: { code: -32000, message: 'Bad Request: Invalid or missing session ID for SSE stream' },
id: null
}))
return
}

// Check for Last-Event-ID header for resumability (optional MCP feature)
const lastEventId = req.headers['last-event-id'] as string
if (lastEventId) {
logger.info(`SSE resumability requested with Last-Event-ID: ${lastEventId}`)
// Note: Actual resumability implementation would require message storage
// For now, we log the request but don't implement full resumability
}

logger.info(`Opening SSE stream for session: ${sessionId}`)

// Prevent connection timeout and keep it alive
req.socket?.setTimeout(0)
req.socket?.setKeepAlive(true, 30000)

let streamClosed = false

// Handle client disconnection gracefully
req.on('close', () => {
streamClosed = true
logger.info(`Client disconnected SSE stream for session: ${sessionId}`)
})

req.on('aborted', () => {
streamClosed = true
logger.info(`Client aborted SSE stream for session: ${sessionId}`)
})

// Let the MCP transport handle the SSE stream completely
const transport = transports[sessionId]

try {
await transport.handleRequest(req, res)

// If the transport completes without the client disconnecting,
// it might have closed the stream prematurely. Keep it open with heartbeat.
if (!streamClosed && !res.destroyed) {
logger.info(`Transport completed, maintaining SSE stream for session: ${sessionId}`)

// Send periodic heartbeat to keep connection alive
const heartbeat = setInterval(() => {
if (streamClosed || res.destroyed) {
clearInterval(heartbeat)
return
}

try {
res.write(': heartbeat\n\n')
} catch (error) {
logger.error(error, `Heartbeat error for session ${sessionId}:`)
clearInterval(heartbeat)
}
}, 30000)

// Clean up heartbeat when connection closes
req.on('close', () => clearInterval(heartbeat))
res.on('close', () => clearInterval(heartbeat))
}
} catch (error) {
logger.error(error, `SSE transport error for session ${sessionId}:`)
}
} else if (req.method === 'DELETE') {
// Handle session termination
const sessionId = req.headers['mcp-session-id'] as string
if (!sessionId || !transports[sessionId]) {
res.writeHead(400)
res.end('Invalid or missing session ID')
return
}

const transport = transports[sessionId]
await transport.handleRequest(req, res)
} else {
res.writeHead(405)
res.end('Method not allowed')
Expand Down
51 changes: 15 additions & 36 deletions mock-client/http-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ async function parseResponse (response: any) {
// Simple HTTP client for testing MCP server in HTTP mode
async function testHTTPMode () {
const baseUrl = (process.env['MCP_URL'] || 'http://localhost:3000').replace(/\/$/, '') // Remove trailing slash
const sessionId = `test-session-${Date.now()}`

console.log('Testing Socket MCP in HTTP mode...')
console.log(`Server URL: ${baseUrl}`)
console.log(`Session ID: ${sessionId}`)

try {
// 1. Initialize connection
// 1. Initialize connection (stateless)
console.log('\n1. Initializing connection...')
const initRequest = {
jsonrpc: '2.0',
Expand All @@ -48,6 +46,7 @@ async function testHTTPMode () {
method: 'POST',
headers: {
'Content-Type': 'application/json',
// SDK requires Accept to include both types even if server returns JSON
Accept: 'application/json, text/event-stream',
'User-Agent': 'socket-mcp-debug-client/1.0.0'
},
Expand All @@ -57,10 +56,7 @@ async function testHTTPMode () {
const initResult = await parseResponse(initResponse)
console.log('Initialize response:', JSON.stringify(initResult, null, 2))

// Extract session ID from response headers
const serverSessionId = initResponse.headers.get('mcp-session-id')
const actualSessionId = serverSessionId || sessionId
console.log('Session ID:', actualSessionId)
console.log('Initialized (stateless)')

// 2. List tools
console.log('\n2. Listing available tools...')
Expand All @@ -75,14 +71,22 @@ async function testHTTPMode () {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'application/json, text/event-stream',
'mcp-session-id': actualSessionId
Accept: 'application/json, text/event-stream'
},
body: JSON.stringify(toolsRequest)
})

const toolsResult = await parseResponse(toolsResponse)
console.log('Available tools:', JSON.stringify(toolsResult, null, 2))
// Assert that the 'depscore' tool exists in the toolsResult
if (
!toolsResult ||
!toolsResult.result ||
!Array.isArray(toolsResult.result.tools) ||
!toolsResult.result.tools.some((tool: any) => tool.name === 'depscore')
) {
throw new Error('depscore tool not found in available tools')
}

// 3. Call depscore
console.log('\n3. Calling depscore tool...')
Expand All @@ -106,40 +110,15 @@ async function testHTTPMode () {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'application/json, text/event-stream',
'mcp-session-id': actualSessionId
Accept: 'application/json, text/event-stream'
},
body: JSON.stringify(depscoreRequest)
})

const depscoreResult = await parseResponse(depscoreResponse)
console.log('Depscore result:', JSON.stringify(depscoreResult, null, 2))

// 4. Test SSE stream (optional)
console.log('\n4. Testing SSE stream connection...')
const sseResponse = await fetch(`${baseUrl}/`, {
method: 'GET',
headers: {
'mcp-session-id': actualSessionId,
Accept: 'text/event-stream'
}
})

if (sseResponse.ok) {
console.log('SSE stream connected successfully')
// Note: In a real implementation, you'd parse the SSE stream
}

// 5. Clean up session
console.log('\n5. Cleaning up session...')
const cleanupResponse = await fetch(`${baseUrl}/`, {
method: 'DELETE',
headers: {
'mcp-session-id': actualSessionId
}
})

console.log('Session cleanup:', cleanupResponse.status === 200 ? 'Success' : 'Failed')
console.log('\n4. HTTP mode test complete (no sessions)')
} catch (error) {
console.error('Error:', error)
}
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading