From 91cea5c31a653b54405523834fa45a5fec018ba2 Mon Sep 17 00:00:00 2001 From: DivyMohan14 Date: Wed, 27 Aug 2025 20:47:55 +0530 Subject: [PATCH 1/3] feat(sse): add middleware to prevent proxy buffering of SSE connections --- lib/httpapi/server.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/lib/httpapi/server.go b/lib/httpapi/server.go index 2d25fca..8753f73 100644 --- a/lib/httpapi/server.go +++ b/lib/httpapi/server.go @@ -188,6 +188,23 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { }) router.Use(corsMiddleware.Handler) + // Add SSE middleware to prevent proxy buffering + sseMiddleware := func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/events") || strings.HasSuffix(r.URL.Path, "/screen") { + // Disable proxy buffering for SSE endpoints + w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + w.Header().Set("Pragma", "no-cache") + w.Header().Set("Expires", "0") + w.Header().Set("X-Accel-Buffering", "no") // nginx + w.Header().Set("X-Proxy-Buffering", "no") // generic proxy + w.Header().Set("Connection", "keep-alive") + } + next.ServeHTTP(w, r) + }) + } + router.Use(sseMiddleware) + humaConfig := huma.DefaultConfig("AgentAPI", "0.6.1") humaConfig.Info.Description = "HTTP API for Claude Code, Goose, and Aider.\n\nhttps://github.com/coder/agentapi" api := humachi.New(router, humaConfig) @@ -388,6 +405,7 @@ func (s *Server) subscribeEvents(ctx context.Context, input *struct{}, send sse. return } } + for { select { case event, ok := <-ch: From d90bbcb97e8e4e7d69954aeb2842c29f11b88faa Mon Sep 17 00:00:00 2001 From: DivyMohan14 Date: Fri, 12 Sep 2025 00:16:57 +0530 Subject: [PATCH 2/3] chore(middleware): use huma middleware --- lib/httpapi/server.go | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/lib/httpapi/server.go b/lib/httpapi/server.go index b4d94f1..90fd48f 100644 --- a/lib/httpapi/server.go +++ b/lib/httpapi/server.go @@ -189,23 +189,6 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { }) router.Use(corsMiddleware.Handler) - // Add SSE middleware to prevent proxy buffering - sseMiddleware := func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if strings.HasSuffix(r.URL.Path, "/events") || strings.HasSuffix(r.URL.Path, "/screen") { - // Disable proxy buffering for SSE endpoints - w.Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") - w.Header().Set("Pragma", "no-cache") - w.Header().Set("Expires", "0") - w.Header().Set("X-Accel-Buffering", "no") // nginx - w.Header().Set("X-Proxy-Buffering", "no") // generic proxy - w.Header().Set("Connection", "keep-alive") - } - next.ServeHTTP(w, r) - }) - } - router.Use(sseMiddleware) - humaConfig := huma.DefaultConfig("AgentAPI", version.Version) humaConfig.Info.Description = "HTTP API for Claude Code, Goose, and Aider.\n\nhttps://github.com/coder/agentapi" api := humachi.New(router, humaConfig) @@ -280,6 +263,19 @@ func hostAuthorizationMiddleware(allowedHosts []string, badHostHandler http.Hand } } +// sseMiddleware creates middleware that prevents proxy buffering for SSE endpoints +func sseMiddleware(ctx huma.Context, next func(huma.Context)) { + // Disable proxy buffering for SSE endpoints + ctx.SetHeader("Cache-Control", "no-cache, no-store, must-revalidate") + ctx.SetHeader("Pragma", "no-cache") + ctx.SetHeader("Expires", "0") + ctx.SetHeader("X-Accel-Buffering", "no") // nginx + ctx.SetHeader("X-Proxy-Buffering", "no") // generic proxy + ctx.SetHeader("Connection", "keep-alive") + + next(ctx) +} + func (s *Server) StartSnapshotLoop(ctx context.Context) { s.conversation.StartSnapshotLoop(ctx) go func() { @@ -316,6 +312,7 @@ func (s *Server) registerRoutes() { Path: "/events", Summary: "Subscribe to events", Description: "The events are sent as Server-Sent Events (SSE). Initially, the endpoint returns a list of events needed to reconstruct the current state of the conversation and the agent's status. After that, it only returns events that have occurred since the last event was sent.\n\nNote: When an agent is running, the last message in the conversation history is updated frequently, and the endpoint sends a new message update event each time.", + Middlewares: []func(huma.Context, func(huma.Context)){sseMiddleware}, }, map[string]any{ // Mapping of event type name to Go struct for that event. "message_update": MessageUpdateBody{}, @@ -328,6 +325,7 @@ func (s *Server) registerRoutes() { Path: "/internal/screen", Summary: "Subscribe to screen", Hidden: true, + Middlewares: []func(huma.Context, func(huma.Context)){sseMiddleware}, }, map[string]any{ "screen": ScreenUpdateBody{}, }, s.subscribeScreen) From ac486ae97e2190fcce044353136b975f8de63462 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Fri, 12 Sep 2025 14:53:04 +0100 Subject: [PATCH 3/3] chore(lib/httpapi): add unit tests for SSE middleware --- lib/httpapi/server_test.go | 48 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/lib/httpapi/server_test.go b/lib/httpapi/server_test.go index bc50d3e..3778fc7 100644 --- a/lib/httpapi/server_test.go +++ b/lib/httpapi/server_test.go @@ -15,6 +15,7 @@ import ( "github.com/coder/agentapi/lib/httpapi" "github.com/coder/agentapi/lib/logctx" "github.com/coder/agentapi/lib/msgfmt" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -631,3 +632,50 @@ func TestServer_CORSPreflightOrigins(t *testing.T) { }) } } + +func TestServer_SSEMiddleware_Events(t *testing.T) { + t.Parallel() + ctx := logctx.WithLogger(context.Background(), slog.New(slog.NewTextHandler(os.Stdout, nil))) + srv, err := httpapi.NewServer(ctx, httpapi.ServerConfig{ + AgentType: msgfmt.AgentTypeClaude, + Process: nil, + Port: 0, + ChatBasePath: "/chat", + AllowedHosts: []string{"*"}, + AllowedOrigins: []string{"*"}, + }) + require.NoError(t, err) + tsServer := httptest.NewServer(srv.Handler()) + t.Cleanup(tsServer.Close) + + t.Run("events", func(t *testing.T) { + t.Parallel() + resp, err := tsServer.Client().Get(tsServer.URL + "/events") + require.NoError(t, err) + t.Cleanup(func() { + _ = resp.Body.Close() + }) + assertSSEHeaders(t, resp) + }) + + t.Run("internal/screen", func(t *testing.T) { + t.Parallel() + + resp, err := tsServer.Client().Get(tsServer.URL + "/internal/screen") + require.NoError(t, err) + t.Cleanup(func() { + _ = resp.Body.Close() + }) + assertSSEHeaders(t, resp) + }) +} + +func assertSSEHeaders(t testing.TB, resp *http.Response) { + t.Helper() + assert.Equal(t, "no-cache, no-store, must-revalidate", resp.Header.Get("Cache-Control")) + assert.Equal(t, "no-cache", resp.Header.Get("Pragma")) + assert.Equal(t, "0", resp.Header.Get("Expires")) + assert.Equal(t, "no", resp.Header.Get("X-Accel-Buffering")) + assert.Equal(t, "no", resp.Header.Get("X-Proxy-Buffering")) + assert.Equal(t, "keep-alive", resp.Header.Get("Connection")) +}