-
Notifications
You must be signed in to change notification settings - Fork 571
fix: panic when streamable HTTP server sends notification #348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: panic when streamable HTTP server sends notification #348
Conversation
WalkthroughThis change introduces support for upgrading HTTP client-server communication to Server-Sent Events (SSE) when sending notifications by adding a new interface and implementation for streamable HTTP sessions. It also adds a test for the HTTP client to validate notification handling using the streamable HTTP transport. Changes
Possibly related PRs
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (1.64.8)Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
client/http_test.go (2)
12-20
: Good use of hooks for synchronization, but consider race condition.The hook logic correctly waits for notifications to be processed, but the busy-wait loop on line 17-18 could be inefficient and may have timing issues. The 200ms sleep helps but is somewhat arbitrary.
Consider using a more robust synchronization mechanism:
- // return until all the notifications are handled - for len(clientSession.NotificationChannel()) > 0 { - } - time.Sleep(time.Millisecond * 200) + // Wait for notification channel to be drained with timeout + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + return + default: + if len(clientSession.NotificationChannel()) == 0 { + time.Sleep(time.Millisecond * 50) // Small buffer to ensure processing + return + } + time.Sleep(time.Millisecond * 10) + } + }
72-75
: Simple but effective notification counting.The notification handler correctly increments the counter. The variable name
notificationNum
could be more descriptive, but it's acceptable for test code.Consider a more descriptive variable name:
- notificationNum := 0 + notificationCount := 0 client.OnNotification(func(notification mcp.JSONRPCNotification) { - notificationNum += 1 + notificationCount++ })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
client/http_test.go
(1 hunks)server/session.go
(3 hunks)server/streamable_http.go
(6 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
server/streamable_http.go (1)
server/session.go (1)
SessionWithStreamableHTTPConfig
(52-65)
🔇 Additional comments (12)
client/http_test.go (2)
30-59
: Well-structured tool implementation with proper error handling.The "notify" tool correctly demonstrates the notification flow by sending a progress notification and returning a success response. The error handling for notification sending is appropriate.
104-110
: Clear and appropriate test assertions.The test correctly verifies both the tool call result and notification delivery. The assertions are clear and cover the expected behavior.
server/session.go (3)
51-65
: Well-designed interface with excellent documentation.The
SessionWithStreamableHTTPConfig
interface is properly designed with:
- Clear method name that describes its purpose
- Comprehensive documentation including protocol specification
- Proper reference to the official specification URL
- Clear explanation of the behavior requirements
165-168
: Clean integration with proper type assertion.The integration correctly uses type assertion to check if the session supports SSE upgrade and calls the method when available. This approach is non-breaking and follows Go interface patterns well.
217-220
: Consistent implementation across notification methods.The same integration pattern is correctly applied to
SendNotificationToSpecificClient
, maintaining consistency with the other notification method.server/streamable_http.go (7)
12-12
: Good addition of sync/atomic import.The import of
sync/atomic
is necessary for the new atomic boolean field and is properly added.
247-247
: Improved variable naming for clarity.Renaming
upgraded
toupgradedHeader
makes the purpose much clearer - it specifically tracks whether SSE headers have been set, not the overall upgrade state.
265-271
: Proper header setting logic with race protection.The logic correctly sets SSE headers only once using the
upgradedHeader
flag. This prevents duplicate header setting in the notification goroutine.
301-323
: Well-implemented SSE upgrade decision logic.The implementation correctly:
- Checks the atomic
upgradeToSSE
flag to determine response type- Sets SSE headers if not already set
- Falls back to JSON response when SSE upgrade is not requested
- Handles session ID header for initialize requests appropriately
505-505
: Thread-safe atomic field for upgrade state.Using
atomic.Bool
is the correct approach for tracking the SSE upgrade state across goroutines, ensuring thread safety without requiring mutex locks.
546-548
: Simple and correct implementation of upgrade method.The method correctly sets the atomic flag to true, signaling that the session should upgrade to SSE mode for the response. The implementation is thread-safe and minimal.
550-550
: Proper interface compliance declaration.The compile-time interface compliance check ensures that
streamableHttpSession
correctly implementsSessionWithStreamableHTTPConfig
.
350e669
to
dff933e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
client/http_test.go (1)
64-110
: Enhance test robustness with timeout handling and notification validation.The test successfully validates the notification flow but could benefit from additional robustness and validation.
Consider the following improvements:
t.Run("Can receive notification from server", func(t *testing.T) { + timeout := time.Second * 5 + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + client, err := NewStreamableHttpClient(testServer.URL) if err != nil { t.Fatalf("create client failed %v", err) return } notificationNum := 0 + var receivedNotification mcp.JSONRPCNotification client.OnNotification(func(notification mcp.JSONRPCNotification) { notificationNum += 1 + receivedNotification = notification }) - ctx := context.Background() if err := client.Start(ctx); err != nil { t.Fatalf("Failed to start client: %v", err) return } // Initialize initRequest := mcp.InitializeRequest{} initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION initRequest.Params.ClientInfo = mcp.Implementation{ Name: "test-client", Version: "1.0.0", } _, err = client.Initialize(ctx, initRequest) if err != nil { t.Fatalf("Failed to initialize: %v\n", err) } request := mcp.CallToolRequest{} request.Params.Name = "notify" result, err := client.CallTool(ctx, request) if err != nil { t.Fatalf("CallTool failed: %v", err) } if len(result.Content) != 1 { t.Errorf("Expected 1 content item, got %d", len(result.Content)) } if notificationNum != 1 { t.Errorf("Expected 1 notification item, got %d", notificationNum) } + + // Validate notification content + if receivedNotification.Method != "notifications/progress" { + t.Errorf("Expected notification method 'notifications/progress', got '%s'", receivedNotification.Method) + } })This adds:
- Timeout handling to prevent hanging tests
- Validation of the actual notification content
- Better context management
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
client/http_test.go
(1 hunks)server/streamable_http.go
(6 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/streamable_http.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/http_test.go (7)
server/hooks.go (1)
Hooks
(94-121)mcp/tools.go (2)
CallToolRequest
(46-53)CallToolResult
(36-43)server/session.go (1)
ClientSessionFromContext
(71-76)server/server.go (3)
WithToolCapabilities
(255-262)WithHooks
(238-242)ServerFromContext
(58-63)mcp/types.go (4)
Content
(798-800)InitializeRequest
(389-398)Params
(154-154)Implementation
(468-471)server/streamable_http.go (1)
NewTestStreamableHTTPServer
(614-618)client/http.go (1)
NewStreamableHttpClient
(11-17)
🔇 Additional comments (2)
client/http_test.go (2)
30-59
: LGTM! Well-structured tool implementation.The "notify" tool implementation correctly:
- Retrieves the server from context
- Sends a structured progress notification
- Handles errors appropriately
- Returns proper success result
12-111
:✅ Verification successful
Verify integration with streamable HTTP transport.
The test effectively validates the end-to-end notification flow over streamable HTTP with SSE support, which aligns with the PR objectives for optimizing streamable server send notification.
To ensure this test covers all aspects of the streamable HTTP transport integration, verify that the SSE upgrade mechanism is properly triggered:
🏁 Script executed:
#!/bin/bash # Description: Verify SSE upgrade implementation in streamable HTTP transport # Check for SSE-related implementation in transport layer rg -A 5 -B 5 "Server-Sent Events|text/event-stream|EventSource" # Look for streamable HTTP session interface implementation ast-grep --pattern 'interface SessionWithStreamableHTTPConfig { $$$ }' # Check notification handling in streamable transport rg -A 10 "NotificationChannel|SendNotificationToClient"Length of output: 31218
Verification passed: SSE upgrade mechanism is correctly implemented
The end-to-end notification flow over the streamable HTTP transport is fully covered:
- server/streamable_http.go sets
Content-Type: text/event-stream
,Connection: keep-alive
,Cache-Control: no-cache
and writes the 202 status code when notifications arrive.- client/http_test.go’s
TestHTTPClient
successfully triggers and asserts receipt of one notification.No changes required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Only suggestion is to add some comments.
// SessionWithStreamableHTTPConfig extends ClientSession to support streamable HTTP transport configurations | ||
type SessionWithStreamableHTTPConfig interface { | ||
ClientSession | ||
// UpgradeToSSEWhenReceiveNotification upgrades the client-server communication to SSE stream when the server | ||
// sends notifications to the client | ||
// | ||
// The protocol specification: | ||
// - If the server response contains any JSON-RPC notifications, it MUST either: | ||
// - Return Content-Type: text/event-stream to initiate an SSE stream, OR | ||
// - Return Content-Type: application/json for a single JSON object | ||
// - The client MUST support both response types. | ||
// | ||
// Reference: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sending-messages-to-the-server | ||
UpgradeToSSEWhenReceiveNotification() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the documentation here.
if ctx.Err() != nil { | ||
return | ||
} | ||
if upgraded { | ||
if session.upgradeToSSE.Load() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe add some inline comments to the entire streamable_http.go
file for the changes you made?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. comments are added as suggested.
Description
Fix for
ErrContentLength
, details in #334.This PR do not ensure that client could receive all the notification from server, and I think this is as expected, because notification could not block the response. mcp-go maybe should provide sync way to send notification( need more discussion), I provide a hook way to ensure that client could receive the all the notification(as demonstrated in the test code).
Type of Change
Summary by CodeRabbit