Skip to content

fix: panic when streamable HTTP server sends notification #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

dugenkui03
Copy link
Collaborator

@dugenkui03 dugenkui03 commented May 28, 2025

Description

Fix for ErrContentLength, details in #334.

Ref: https://pkg.go.dev/net/http
// ErrContentLength is returned by ResponseWriter.Write calls
// when a Handler set a Content-Length response header with a
// declared size and then attempted to write more bytes than
// declared.
ErrContentLength = errors.New("http: wrote more than the declared Content-Length")

This PR do not ensure that client could receive all the notification from server, and I think this is as expected, because notification could not block the response. mcp-go maybe should provide sync way to send notification( need more discussion), I provide a hook way to ensure that client could receive the all the notification(as demonstrated in the test code).

Type of Change

  • Bug fix (non-breaking change that fixes an issue)

Summary by CodeRabbit

  • New Features
    • Improved support for Server-Sent Events (SSE) notifications, enabling real-time updates to clients when supported.
  • Bug Fixes
    • Enhanced reliability and handling of notification delivery to clients using HTTP streaming.
  • Tests
    • Added new tests to validate HTTP client interactions and notification handling with real-time updates.

Copy link
Contributor

coderabbitai bot commented May 28, 2025

Walkthrough

This change introduces support for upgrading HTTP client-server communication to Server-Sent Events (SSE) when sending notifications by adding a new interface and implementation for streamable HTTP sessions. It also adds a test for the HTTP client to validate notification handling using the streamable HTTP transport.

Changes

Files/Paths Change Summary
client/http_test.go Added a test (TestHTTPClient) to validate HTTP client behavior with notifications and streamable HTTP.
server/session.go Introduced SessionWithStreamableHTTPConfig interface; updated notification methods to invoke SSE upgrade.
server/streamable_http.go Added atomic SSE upgrade flag and method; implemented interface; refined SSE header and notification logic.

Possibly related PRs

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (1.64.8)

Error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2
Failed executing command with error: you are using a configuration file for golangci-lint v2 with golangci-lint v1: please use golangci-lint v2


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dff933e and f9bbaf5.

📒 Files selected for processing (1)
  • server/streamable_http.go (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/streamable_http.go
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
client/http_test.go (2)

12-20: Good use of hooks for synchronization, but consider race condition.

The hook logic correctly waits for notifications to be processed, but the busy-wait loop on line 17-18 could be inefficient and may have timing issues. The 200ms sleep helps but is somewhat arbitrary.

Consider using a more robust synchronization mechanism:

-	// return until all the notifications are handled
-	for len(clientSession.NotificationChannel()) > 0 {
-	}
-	time.Sleep(time.Millisecond * 200)
+	// Wait for notification channel to be drained with timeout
+	ctx, cancel := context.WithTimeout(ctx, time.Second)
+	defer cancel()
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if len(clientSession.NotificationChannel()) == 0 {
+				time.Sleep(time.Millisecond * 50) // Small buffer to ensure processing
+				return
+			}
+			time.Sleep(time.Millisecond * 10)
+		}
+	}

72-75: Simple but effective notification counting.

The notification handler correctly increments the counter. The variable name notificationNum could be more descriptive, but it's acceptable for test code.

Consider a more descriptive variable name:

-	notificationNum := 0
+	notificationCount := 0
	client.OnNotification(func(notification mcp.JSONRPCNotification) {
-		notificationNum += 1
+		notificationCount++
	})
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c7c0e13 and 6c97a1b.

📒 Files selected for processing (3)
  • client/http_test.go (1 hunks)
  • server/session.go (3 hunks)
  • server/streamable_http.go (6 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
server/streamable_http.go (1)
server/session.go (1)
  • SessionWithStreamableHTTPConfig (52-65)
🔇 Additional comments (12)
client/http_test.go (2)

30-59: Well-structured tool implementation with proper error handling.

The "notify" tool correctly demonstrates the notification flow by sending a progress notification and returning a success response. The error handling for notification sending is appropriate.


104-110: Clear and appropriate test assertions.

The test correctly verifies both the tool call result and notification delivery. The assertions are clear and cover the expected behavior.

server/session.go (3)

51-65: Well-designed interface with excellent documentation.

The SessionWithStreamableHTTPConfig interface is properly designed with:

  • Clear method name that describes its purpose
  • Comprehensive documentation including protocol specification
  • Proper reference to the official specification URL
  • Clear explanation of the behavior requirements

165-168: Clean integration with proper type assertion.

The integration correctly uses type assertion to check if the session supports SSE upgrade and calls the method when available. This approach is non-breaking and follows Go interface patterns well.


217-220: Consistent implementation across notification methods.

The same integration pattern is correctly applied to SendNotificationToSpecificClient, maintaining consistency with the other notification method.

server/streamable_http.go (7)

12-12: Good addition of sync/atomic import.

The import of sync/atomic is necessary for the new atomic boolean field and is properly added.


247-247: Improved variable naming for clarity.

Renaming upgraded to upgradedHeader makes the purpose much clearer - it specifically tracks whether SSE headers have been set, not the overall upgrade state.


265-271: Proper header setting logic with race protection.

The logic correctly sets SSE headers only once using the upgradedHeader flag. This prevents duplicate header setting in the notification goroutine.


301-323: Well-implemented SSE upgrade decision logic.

The implementation correctly:

  • Checks the atomic upgradeToSSE flag to determine response type
  • Sets SSE headers if not already set
  • Falls back to JSON response when SSE upgrade is not requested
  • Handles session ID header for initialize requests appropriately

505-505: Thread-safe atomic field for upgrade state.

Using atomic.Bool is the correct approach for tracking the SSE upgrade state across goroutines, ensuring thread safety without requiring mutex locks.


546-548: Simple and correct implementation of upgrade method.

The method correctly sets the atomic flag to true, signaling that the session should upgrade to SSE mode for the response. The implementation is thread-safe and minimal.


550-550: Proper interface compliance declaration.

The compile-time interface compliance check ensures that streamableHttpSession correctly implements SessionWithStreamableHTTPConfig.

@dugenkui03 dugenkui03 changed the title Fix streamable server send notification optimization for streamable server send notification May 28, 2025
@dugenkui03 dugenkui03 marked this pull request as draft May 28, 2025 15:48
@dugenkui03 dugenkui03 marked this pull request as draft May 28, 2025 15:48
@dugenkui03 dugenkui03 force-pushed the fix_streamable_send_notification branch from 350e669 to dff933e Compare May 28, 2025 16:13
@dugenkui03 dugenkui03 marked this pull request as ready for review May 28, 2025 16:13
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
client/http_test.go (1)

64-110: Enhance test robustness with timeout handling and notification validation.

The test successfully validates the notification flow but could benefit from additional robustness and validation.

Consider the following improvements:

	t.Run("Can receive notification from server", func(t *testing.T) {
+		timeout := time.Second * 5
+		ctx, cancel := context.WithTimeout(context.Background(), timeout)
+		defer cancel()
+		
		client, err := NewStreamableHttpClient(testServer.URL)
		if err != nil {
			t.Fatalf("create client failed %v", err)
			return
		}

		notificationNum := 0
+		var receivedNotification mcp.JSONRPCNotification
		client.OnNotification(func(notification mcp.JSONRPCNotification) {
			notificationNum += 1
+			receivedNotification = notification
		})

-		ctx := context.Background()

		if err := client.Start(ctx); err != nil {
			t.Fatalf("Failed to start client: %v", err)
			return
		}

		// Initialize
		initRequest := mcp.InitializeRequest{}
		initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
		initRequest.Params.ClientInfo = mcp.Implementation{
			Name:    "test-client",
			Version: "1.0.0",
		}

		_, err = client.Initialize(ctx, initRequest)
		if err != nil {
			t.Fatalf("Failed to initialize: %v\n", err)
		}

		request := mcp.CallToolRequest{}
		request.Params.Name = "notify"
		result, err := client.CallTool(ctx, request)
		if err != nil {
			t.Fatalf("CallTool failed: %v", err)
		}

		if len(result.Content) != 1 {
			t.Errorf("Expected 1 content item, got %d", len(result.Content))
		}

		if notificationNum != 1 {
			t.Errorf("Expected 1 notification item, got %d", notificationNum)
		}
+		
+		// Validate notification content
+		if receivedNotification.Method != "notifications/progress" {
+			t.Errorf("Expected notification method 'notifications/progress', got '%s'", receivedNotification.Method)
+		}
	})

This adds:

  1. Timeout handling to prevent hanging tests
  2. Validation of the actual notification content
  3. Better context management
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6c97a1b and dff933e.

📒 Files selected for processing (2)
  • client/http_test.go (1 hunks)
  • server/streamable_http.go (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/streamable_http.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
client/http_test.go (7)
server/hooks.go (1)
  • Hooks (94-121)
mcp/tools.go (2)
  • CallToolRequest (46-53)
  • CallToolResult (36-43)
server/session.go (1)
  • ClientSessionFromContext (71-76)
server/server.go (3)
  • WithToolCapabilities (255-262)
  • WithHooks (238-242)
  • ServerFromContext (58-63)
mcp/types.go (4)
  • Content (798-800)
  • InitializeRequest (389-398)
  • Params (154-154)
  • Implementation (468-471)
server/streamable_http.go (1)
  • NewTestStreamableHTTPServer (614-618)
client/http.go (1)
  • NewStreamableHttpClient (11-17)
🔇 Additional comments (2)
client/http_test.go (2)

30-59: LGTM! Well-structured tool implementation.

The "notify" tool implementation correctly:

  • Retrieves the server from context
  • Sends a structured progress notification
  • Handles errors appropriately
  • Returns proper success result

12-111:

✅ Verification successful

Verify integration with streamable HTTP transport.

The test effectively validates the end-to-end notification flow over streamable HTTP with SSE support, which aligns with the PR objectives for optimizing streamable server send notification.

To ensure this test covers all aspects of the streamable HTTP transport integration, verify that the SSE upgrade mechanism is properly triggered:


🏁 Script executed:

#!/bin/bash
# Description: Verify SSE upgrade implementation in streamable HTTP transport

# Check for SSE-related implementation in transport layer
rg -A 5 -B 5 "Server-Sent Events|text/event-stream|EventSource" 

# Look for streamable HTTP session interface implementation
ast-grep --pattern 'interface SessionWithStreamableHTTPConfig {
  $$$
}'

# Check notification handling in streamable transport
rg -A 10 "NotificationChannel|SendNotificationToClient"

Length of output: 31218


Verification passed: SSE upgrade mechanism is correctly implemented

The end-to-end notification flow over the streamable HTTP transport is fully covered:

  • server/streamable_http.go sets Content-Type: text/event-stream, Connection: keep-alive, Cache-Control: no-cache and writes the 202 status code when notifications arrive.
  • client/http_test.go’s TestHTTPClient successfully triggers and asserts receipt of one notification.

No changes required.

Copy link
Collaborator

@pottekkat pottekkat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Only suggestion is to add some comments.

Comment on lines +51 to +65
// SessionWithStreamableHTTPConfig extends ClientSession to support streamable HTTP transport configurations
type SessionWithStreamableHTTPConfig interface {
ClientSession
// UpgradeToSSEWhenReceiveNotification upgrades the client-server communication to SSE stream when the server
// sends notifications to the client
//
// The protocol specification:
// - If the server response contains any JSON-RPC notifications, it MUST either:
// - Return Content-Type: text/event-stream to initiate an SSE stream, OR
// - Return Content-Type: application/json for a single JSON object
// - The client MUST support both response types.
//
// Reference: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#sending-messages-to-the-server
UpgradeToSSEWhenReceiveNotification()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the documentation here.

if ctx.Err() != nil {
return
}
if upgraded {
if session.upgradeToSSE.Load() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe add some inline comments to the entire streamable_http.go file for the changes you made?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. comments are added as suggested.

@dugenkui03 dugenkui03 merged commit e744c19 into mark3labs:main May 29, 2025
4 checks passed
@dugenkui03 dugenkui03 changed the title optimization for streamable server send notification fix: panic when streamable HTTP server sends notification May 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants