Skip to content

Commit f0c3543

Browse files
committed
fix: refine SSE stream lifecycle handling
1 parent e71b08f commit f0c3543

3 files changed

Lines changed: 38 additions & 3 deletions

File tree

docs/middleware/sse.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ app.Get("/events", sse.New(sse.Config{
9999
| OnClose | `func(fiber.Ctx, error)` | Called when the stream ends, with `nil` when the handler returned successfully and no stream write failed. | `nil` |
100100
| Retry | `time.Duration` | Initial EventSource reconnect delay. | `0` |
101101
| HeartbeatInterval | `time.Duration` | Interval for SSE comment heartbeats. | `15 * time.Second` |
102-
| DisableHeartbeat | `bool` | Disable automatic heartbeat comments. | `false` |
102+
| DisableHeartbeat | `bool` | Disable automatic heartbeat comments. When disabled, disconnected clients may not be detected until the next write. | `false` |
103103

104104
## Default Config
105105

@@ -128,4 +128,6 @@ func (s *Stream) LastEventID() string
128128

129129
Every write is flushed. A failed flush closes `Done`, stores the error returned by `Err`, and lets the handler stop without relying on `fasthttp.RequestCtx.Done`, which is not a per-client disconnect signal. After a normal handler return, `Done` is closed and `Context()` is canceled while `Err()` remains `nil`; writes after that return `sse: stream closed`.
130130

131+
Automatic heartbeat comments keep idle streams active and make silent client disconnects observable through the next flush error. If heartbeats are disabled, a handler waiting on an external source might not notice a disconnected client until it writes again. Stopping a stream waits for an in-flight heartbeat write to finish, so a very slow client can delay shutdown until the underlying write unblocks.
132+
131133
`Config.Retry` sends the initial reconnect delay when the stream opens. `Event.Retry` changes the reconnect delay for a specific event, following the SSE wire format.

middleware/sse/sse.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ func New(config ...Config) fiber.Handler {
3737
c.Set(fiber.HeaderConnection, "keep-alive")
3838
c.Set("X-Accel-Buffering", "no")
3939

40-
c.Abandon()
41-
4240
streamContext := c.Context()
4341
lastEventID := c.Get(fiber.HeaderLastEventID)
4442

43+
c.Abandon()
44+
4545
return c.SendStreamWriter(func(w *bufio.Writer) {
4646
stream := newStream(streamContext, w, lastEventID, c.App().Config().JSONEncoder)
4747
var streamErr error
@@ -54,6 +54,11 @@ func New(config ...Config) fiber.Handler {
5454
cfg.OnClose(c, finalErr)
5555
}
5656
}()
57+
defer func() {
58+
if recovered := recover(); recovered != nil {
59+
streamErr = fmt.Errorf("sse: handler panic: %v", recovered)
60+
}
61+
}()
5762
defer stream.closeStream()
5863

5964
if cfg.Retry > 0 {

middleware/sse/sse_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,34 @@ func Test_SSE_HandlerErrorCallsOnClose(t *testing.T) {
500500
}
501501
}
502502

503+
func Test_SSE_HandlerPanicCallsOnClose(t *testing.T) {
504+
t.Parallel()
505+
506+
closed := make(chan error, 1)
507+
508+
app := fiber.New()
509+
app.Get("/events", New(Config{
510+
DisableHeartbeat: true,
511+
Handler: func(fiber.Ctx, *Stream) error {
512+
panic("boom")
513+
},
514+
OnClose: func(_ fiber.Ctx, err error) {
515+
closed <- err
516+
},
517+
}))
518+
519+
resp, err := app.Test(httptest.NewRequest(fiber.MethodGet, "/events", http.NoBody))
520+
require.NoError(t, err)
521+
require.Equal(t, fiber.StatusOK, resp.StatusCode)
522+
523+
select {
524+
case err := <-closed:
525+
require.EqualError(t, err, "sse: handler panic: boom")
526+
case <-time.After(time.Second):
527+
t.Fatal("OnClose was not called")
528+
}
529+
}
530+
503531
func Test_SSE_NewPanicsWithoutHandler(t *testing.T) {
504532
t.Parallel()
505533

0 commit comments

Comments
 (0)